pyoutlineapi

PyOutlineAPI: A modern, async-first Python client for the Outline VPN Server API.

Copyright (c) 2025 Denis Rozhnovskiy pytelemonbot@mail.ru All rights reserved.

This software is licensed under the MIT License.

You can find the full license text at:

https://opensource.org/licenses/MIT

Source code repository:

https://github.com/orenlab/pyoutlineapi

Quick Start:

from pyoutlineapi import AsyncOutlineClient

# From environment variables
async with AsyncOutlineClient.from_env() as client:
    server = await client.get_server_info()
    print(f"Server: {server.name}")

# Prefer from_env for production usage
async with AsyncOutlineClient.from_env() as client:
    keys = await client.get_access_keys()

Advanced Usage - Type Hints:

from pyoutlineapi import (
    AsyncOutlineClient,
    AuditLogger,
    AuditDetails,
    MetricsCollector,
    MetricsTags,
)

class CustomAuditLogger:
    def log_action(
        self,
        action: str,
        resource: str,
        *,
        user: str | None = None,
        details: AuditDetails | None = None,
        correlation_id: str | None = None,
    ) -> None:
        print(f"[AUDIT] {action} on {resource}")

async with AsyncOutlineClient.from_env(
    audit_logger=CustomAuditLogger(),
) as client:
    await client.create_access_key(name="test")
  1"""PyOutlineAPI: A modern, async-first Python client for the Outline VPN Server API.
  2
  3Copyright (c) 2025 Denis Rozhnovskiy <pytelemonbot@mail.ru>
  4All rights reserved.
  5
  6This software is licensed under the MIT License.
  7You can find the full license text at:
  8    https://opensource.org/licenses/MIT
  9
 10Source code repository:
 11    https://github.com/orenlab/pyoutlineapi
 12
 13Quick Start:
 14
 15```python
 16from pyoutlineapi import AsyncOutlineClient
 17
 18# From environment variables
 19async with AsyncOutlineClient.from_env() as client:
 20    server = await client.get_server_info()
 21    print(f"Server: {server.name}")
 22
 23# Prefer from_env for production usage
 24async with AsyncOutlineClient.from_env() as client:
 25    keys = await client.get_access_keys()
 26```
 27
 28Advanced Usage - Type Hints:
 29
 30```python
 31from pyoutlineapi import (
 32    AsyncOutlineClient,
 33    AuditLogger,
 34    AuditDetails,
 35    MetricsCollector,
 36    MetricsTags,
 37)
 38
 39class CustomAuditLogger:
 40    def log_action(
 41        self,
 42        action: str,
 43        resource: str,
 44        *,
 45        user: str | None = None,
 46        details: AuditDetails | None = None,
 47        correlation_id: str | None = None,
 48    ) -> None:
 49        print(f"[AUDIT] {action} on {resource}")
 50
 51async with AsyncOutlineClient.from_env(
 52    audit_logger=CustomAuditLogger(),
 53) as client:
 54    await client.create_access_key(name="test")
 55```
 56"""
 57
 58from __future__ import annotations
 59
 60from importlib import metadata
 61from typing import TYPE_CHECKING, Final, NoReturn
 62
 63# Core imports
 64from .audit import (
 65    AuditContext,
 66    AuditLogger,
 67    DefaultAuditLogger,
 68    NoOpAuditLogger,
 69    audited,
 70    get_audit_logger,
 71    get_or_create_audit_logger,
 72    set_audit_logger,
 73)
 74from .base_client import MetricsCollector, NoOpMetrics, correlation_id
 75from .circuit_breaker import CircuitConfig, CircuitMetrics, CircuitState
 76from .client import (
 77    AsyncOutlineClient,
 78    MultiServerManager,
 79    create_client,
 80    create_multi_server_manager,
 81)
 82from .common_types import (
 83    DEFAULT_SENSITIVE_KEYS,
 84    AuditDetails,
 85    ConfigOverrides,
 86    Constants,
 87    CredentialSanitizer,
 88    JsonPayload,
 89    MetricsTags,
 90    QueryParams,
 91    ResponseData,
 92    SecureIDGenerator,
 93    TimestampMs,
 94    TimestampSec,
 95    Validators,
 96    build_config_overrides,
 97    is_json_serializable,
 98    is_valid_bytes,
 99    is_valid_port,
100    mask_sensitive_data,
101)
102from .config import (
103    DevelopmentConfig,
104    OutlineClientConfig,
105    ProductionConfig,
106    create_env_template,
107    load_config,
108)
109from .exceptions import (
110    APIError,
111    CircuitOpenError,
112    ConfigurationError,
113    OutlineConnectionError,
114    OutlineError,
115    OutlineTimeoutError,
116    ValidationError,
117    format_error_chain,
118    get_retry_delay,
119    get_safe_error_dict,
120    is_retryable,
121)
122from .models import (
123    AccessKey,
124    AccessKeyCreateRequest,
125    AccessKeyList,
126    AccessKeyMetric,
127    AccessKeyNameRequest,
128    BandwidthData,
129    BandwidthDataValue,
130    BandwidthInfo,
131    ConnectionInfo,
132    DataLimit,
133    DataLimitRequest,
134    DataTransferred,
135    ErrorResponse,
136    ExperimentalMetrics,
137    HealthCheckResult,
138    HostnameRequest,
139    LocationMetric,
140    MetricsEnabledRequest,
141    MetricsStatusResponse,
142    PeakDeviceCount,
143    PortRequest,
144    Server,
145    ServerExperimentalMetric,
146    ServerMetrics,
147    ServerNameRequest,
148    ServerSummary,
149    TunnelTime,
150)
151from .response_parser import JsonDict, ResponseParser
152
153# Package metadata
154try:
155    __version__: str = metadata.version("pyoutlineapi")
156except metadata.PackageNotFoundError:
157    __version__ = "0.4.0-dev"
158
159__author__: Final[str] = "Denis Rozhnovskiy"
160__email__: Final[str] = "pytelemonbot@mail.ru"
161__license__: Final[str] = "MIT"
162
163# Public API
164__all__: Final[list[str]] = [
165    "DEFAULT_SENSITIVE_KEYS",
166    "APIError",
167    "AccessKey",
168    "AccessKeyCreateRequest",
169    "AccessKeyList",
170    "AccessKeyMetric",
171    "AccessKeyNameRequest",
172    "AsyncOutlineClient",
173    "AuditContext",
174    "AuditLogger",
175    "BandwidthData",
176    "BandwidthDataValue",
177    "BandwidthInfo",
178    "CircuitConfig",
179    "CircuitMetrics",
180    "CircuitOpenError",
181    "CircuitState",
182    "ConfigOverrides",
183    "ConfigurationError",
184    "Constants",
185    "CredentialSanitizer",
186    "DataLimit",
187    "DataLimitRequest",
188    "DataTransferred",
189    "DefaultAuditLogger",
190    "DevelopmentConfig",
191    "ErrorResponse",
192    "ExperimentalMetrics",
193    "HealthCheckResult",
194    "HostnameRequest",
195    "JsonDict",
196    "JsonPayload",
197    "LocationMetric",
198    "MetricsCollector",
199    "MetricsEnabledRequest",
200    "MetricsStatusResponse",
201    "MetricsTags",
202    "MultiServerManager",
203    "NoOpAuditLogger",
204    "NoOpMetrics",
205    "OutlineClientConfig",
206    "OutlineConnectionError",
207    "OutlineError",
208    "OutlineTimeoutError",
209    "PeakDeviceCount",
210    "PortRequest",
211    "ProductionConfig",
212    "QueryParams",
213    "ResponseData",
214    "ResponseParser",
215    "SecureIDGenerator",
216    "Server",
217    "ServerExperimentalMetric",
218    "ServerMetrics",
219    "ServerNameRequest",
220    "ServerSummary",
221    "TimestampMs",
222    "TimestampSec",
223    "TunnelTime",
224    "ValidationError",
225    "Validators",
226    "__author__",
227    "__email__",
228    "__license__",
229    "__version__",
230    "audited",
231    "build_config_overrides",
232    "correlation_id",
233    "create_client",
234    "create_env_template",
235    "create_multi_server_manager",
236    "format_error_chain",
237    "get_audit_logger",
238    "get_or_create_audit_logger",
239    "get_retry_delay",
240    "get_safe_error_dict",
241    "get_version",
242    "is_json_serializable",
243    "is_retryable",
244    "is_valid_bytes",
245    "is_valid_port",
246    "load_config",
247    "mask_sensitive_data",
248    "print_type_info",
249    "quick_setup",
250    "set_audit_logger",
251]
252
253
254# ===== Convenience Functions =====
255
256
257def get_version() -> str:
258    """Get package version string.
259
260    :return: Package version
261    """
262    return __version__
263
264
265def quick_setup() -> None:
266    """Create configuration template file for quick setup.
267
268    Creates `.env.example` file with all available configuration options.
269    """
270    create_env_template()
271    print("✅ Created .env.example")
272    print("📝 Edit the file with your server details")
273    print("🚀 Then use: AsyncOutlineClient.from_env()")
274
275
276def print_type_info() -> None:
277    """Print information about available type aliases for advanced usage."""
278    info = """
279🎯 PyOutlineAPI Type Aliases for Advanced Usage
280===============================================
281
282For creating custom AuditLogger:
283    from pyoutlineapi import AuditLogger, AuditDetails
284
285    class MyAuditLogger:
286        def log_action(
287            self,
288            action: str,
289            resource: str,
290            *,
291            details: AuditDetails | None = None,
292            ...
293        ) -> None: ...
294
295        async def alog_action(
296            self,
297            action: str,
298            resource: str,
299            *,
300            details: AuditDetails | None = None,
301            ...
302        ) -> None: ...
303
304For creating custom MetricsCollector:
305    from pyoutlineapi import MetricsCollector, MetricsTags
306
307    class MyMetrics:
308        def increment(
309            self,
310            metric: str,
311            *,
312            tags: MetricsTags | None = None
313        ) -> None: ...
314
315Available Type Aliases:
316    - TimestampMs, TimestampSec  # Unix timestamps
317    - JsonPayload, ResponseData  # JSON data types
318    - QueryParams                # URL query parameters
319    - AuditDetails               # Audit log details
320    - MetricsTags                # Metrics tags
321
322Constants and Validators:
323    from pyoutlineapi import Constants, Validators
324
325    # Access constants
326    Constants.RETRY_STATUS_CODES
327    Constants.MIN_PORT, Constants.MAX_PORT
328
329    # Use validators
330    Validators.validate_port(8080)
331    Validators.validate_key_id("my-key")
332
333Utility Classes:
334    from pyoutlineapi import (
335        CredentialSanitizer,
336        SecureIDGenerator,
337        ResponseParser,
338    )
339
340    # Sanitize sensitive data
341    safe_url = CredentialSanitizer.sanitize(url)
342
343    # Generate secure IDs
344    secure_id = SecureIDGenerator.generate()
345
346    # Parse API responses
347    parsed = ResponseParser.parse(data, Model)
348
349📖 Documentation: https://github.com/orenlab/pyoutlineapi
350    """
351    print(info)
352
353
354# ===== Better Error Messages =====
355
356
357def __getattr__(name: str) -> NoReturn:
358    """Provide helpful error messages for common mistakes.
359
360    :param name: Attribute name
361    :raises AttributeError: If attribute not found
362    """
363    mistakes = {
364        "OutlineClient": "Use 'AsyncOutlineClient' instead",
365        "OutlineSettings": "Use 'OutlineClientConfig' instead",
366        "create_resilient_client": (
367            "Use 'AsyncOutlineClient.from_env()' with 'enable_circuit_breaker=True'"
368        ),
369    }
370
371    if name in mistakes:
372        raise AttributeError(f"{name} not available. {mistakes[name]}")
373
374    raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
375
376
377# ===== Interactive Help =====
378
379if TYPE_CHECKING:
380    import sys
381
382    if hasattr(sys, "ps1"):
383        # Show help in interactive mode
384        print(f"🚀 PyOutlineAPI v{__version__}")
385        print("💡 Quick start: pyoutlineapi.quick_setup()")
386        print("🎯 Type hints: pyoutlineapi.print_type_info()")
387        print("📚 Help: help(pyoutlineapi.AsyncOutlineClient)")
DEFAULT_SENSITIVE_KEYS = frozenset({'access_url', 'apikey', 'apiKey', 'accessUrl', 'cert_sha256', 'token', 'secret', 'authorization', 'certSha256', 'apiUrl', 'password', 'api_key', 'api_url'})
class APIError(pyoutlineapi.OutlineError):
171class APIError(OutlineError):
172    """HTTP API request failure.
173
174    Automatically determines retry eligibility based on HTTP status code.
175
176    Attributes:
177        status_code: HTTP status code (if available)
178        endpoint: API endpoint that failed
179        response_data: Raw response data (may contain sensitive info)
180
181    Example:
182        >>> error = APIError("Not found", status_code=404, endpoint="/server")
183        >>> error.is_client_error  # True
184        >>> error.is_retryable  # False
185    """
186
187    __slots__ = ("endpoint", "response_data", "status_code")
188
189    def __init__(
190        self,
191        message: str,
192        *,
193        status_code: int | None = None,
194        endpoint: str | None = None,
195        response_data: dict[str, Any] | None = None,
196    ) -> None:
197        """Initialize API error with sanitized endpoint.
198
199        Args:
200            message: Error message
201            status_code: HTTP status code
202            endpoint: API endpoint (will be sanitized)
203            response_data: Response data (may contain sensitive info)
204        """
205        from .common_types import Validators
206
207        # Sanitize endpoint for safe logging
208        safe_endpoint = (
209            Validators.sanitize_endpoint_for_logging(endpoint) if endpoint else None
210        )
211
212        # Build safe details (optimization: avoid dict creation if all None)
213        safe_details: dict[str, Any] | None = None
214        if status_code is not None or safe_endpoint is not None:
215            safe_details = {}
216            if status_code is not None:
217                safe_details["status_code"] = status_code
218            if safe_endpoint is not None:
219                safe_details["endpoint"] = safe_endpoint
220
221        # Build internal details (optimization: avoid dict creation if all None)
222        details: dict[str, Any] | None = None
223        if status_code is not None or endpoint is not None:
224            details = {}
225            if status_code is not None:
226                details["status_code"] = status_code
227            if endpoint is not None:
228                details["endpoint"] = endpoint
229
230        super().__init__(message, details=details, safe_details=safe_details)
231
232        # Store attributes directly (faster access than dict lookups)
233        self.status_code = status_code
234        self.endpoint = endpoint
235        self.response_data = response_data
236
237    @property
238    def is_retryable(self) -> bool:
239        """Check if error is retryable based on status code."""
240        return (
241            self.status_code in Constants.RETRY_STATUS_CODES
242            if self.status_code
243            else False
244        )
245
246    @property
247    def is_client_error(self) -> bool:
248        """Check if error is a client error (4xx status).
249
250        Returns:
251            True if status code is 400-499
252        """
253        return self.status_code is not None and 400 <= self.status_code < 500
254
255    @property
256    def is_server_error(self) -> bool:
257        """Check if error is a server error (5xx status).
258
259        Returns:
260            True if status code is 500-599
261        """
262        return self.status_code is not None and 500 <= self.status_code < 600
263
264    @property
265    def is_rate_limit_error(self) -> bool:
266        """Check if error is a rate limit error (429 status).
267
268        Returns:
269            True if status code is 429
270        """
271        return self.status_code == 429

HTTP API request failure.

Automatically determines retry eligibility based on HTTP status code.

Attributes:
  • status_code: HTTP status code (if available)
  • endpoint: API endpoint that failed
  • response_data: Raw response data (may contain sensitive info)
Example:
>>> error = APIError("Not found", status_code=404, endpoint="/server")
>>> error.is_client_error  # True
>>> error.is_retryable  # False
APIError( message: str, *, status_code: int | None = None, endpoint: str | None = None, response_data: dict[str, typing.Any] | None = None)
189    def __init__(
190        self,
191        message: str,
192        *,
193        status_code: int | None = None,
194        endpoint: str | None = None,
195        response_data: dict[str, Any] | None = None,
196    ) -> None:
197        """Initialize API error with sanitized endpoint.
198
199        Args:
200            message: Error message
201            status_code: HTTP status code
202            endpoint: API endpoint (will be sanitized)
203            response_data: Response data (may contain sensitive info)
204        """
205        from .common_types import Validators
206
207        # Sanitize endpoint for safe logging
208        safe_endpoint = (
209            Validators.sanitize_endpoint_for_logging(endpoint) if endpoint else None
210        )
211
212        # Build safe details (optimization: avoid dict creation if all None)
213        safe_details: dict[str, Any] | None = None
214        if status_code is not None or safe_endpoint is not None:
215            safe_details = {}
216            if status_code is not None:
217                safe_details["status_code"] = status_code
218            if safe_endpoint is not None:
219                safe_details["endpoint"] = safe_endpoint
220
221        # Build internal details (optimization: avoid dict creation if all None)
222        details: dict[str, Any] | None = None
223        if status_code is not None or endpoint is not None:
224            details = {}
225            if status_code is not None:
226                details["status_code"] = status_code
227            if endpoint is not None:
228                details["endpoint"] = endpoint
229
230        super().__init__(message, details=details, safe_details=safe_details)
231
232        # Store attributes directly (faster access than dict lookups)
233        self.status_code = status_code
234        self.endpoint = endpoint
235        self.response_data = response_data

Initialize API error with sanitized endpoint.

Arguments:
  • message: Error message
  • status_code: HTTP status code
  • endpoint: API endpoint (will be sanitized)
  • response_data: Response data (may contain sensitive info)
status_code
endpoint
response_data
is_retryable: bool
237    @property
238    def is_retryable(self) -> bool:
239        """Check if error is retryable based on status code."""
240        return (
241            self.status_code in Constants.RETRY_STATUS_CODES
242            if self.status_code
243            else False
244        )

Check if error is retryable based on status code.

is_client_error: bool
246    @property
247    def is_client_error(self) -> bool:
248        """Check if error is a client error (4xx status).
249
250        Returns:
251            True if status code is 400-499
252        """
253        return self.status_code is not None and 400 <= self.status_code < 500

Check if error is a client error (4xx status).

Returns:

True if status code is 400-499

is_server_error: bool
255    @property
256    def is_server_error(self) -> bool:
257        """Check if error is a server error (5xx status).
258
259        Returns:
260            True if status code is 500-599
261        """
262        return self.status_code is not None and 500 <= self.status_code < 600

Check if error is a server error (5xx status).

Returns:

True if status code is 500-599

is_rate_limit_error: bool
264    @property
265    def is_rate_limit_error(self) -> bool:
266        """Check if error is a rate limit error (429 status).
267
268        Returns:
269            True if status code is 429
270        """
271        return self.status_code == 429

Check if error is a rate limit error (429 status).

Returns:

True if status code is 429

class AccessKey(pyoutlineapi.common_types.BaseValidatedModel):
137class AccessKey(BaseValidatedModel):
138    """Access key model matching API schema with optimized properties.
139
140    SCHEMA: Based on OpenAPI /access-keys endpoint
141    """
142
143    id: str
144    name: str | None = None
145    password: str
146    port: Port
147    method: str
148    access_url: str = Field(alias="accessUrl")
149    data_limit: DataLimit | None = Field(None, alias="dataLimit")
150
151    @field_validator("name", mode="before")
152    @classmethod
153    def validate_name(cls, v: str | None) -> str | None:
154        """Validate and normalize name from API."""
155        if v is None:
156            return None
157
158        if isinstance(v, str):
159            stripped = v.strip()
160            if not stripped:
161                return None
162
163            if len(stripped) > Constants.MAX_NAME_LENGTH:
164                raise ValueError(
165                    f"Name too long: {len(stripped)} (max {Constants.MAX_NAME_LENGTH})"
166                )
167            return stripped
168
169    @field_validator("id")
170    @classmethod
171    def validate_id(cls, v: str) -> str:
172        """Validate key ID.
173
174        :param v: Key ID
175        :return: Validated key ID
176        :raises ValueError: If ID is invalid
177        """
178        return Validators.validate_key_id(v)
179
180    @property
181    def has_data_limit(self) -> bool:
182        """Check if key has data limit (optimized None check).
183
184        :return: True if data limit exists
185        """
186        return self.data_limit is not None
187
188    @property
189    def display_name(self) -> str:
190        """Get display name with optimized conditional.
191
192        :return: Display name
193        """
194        return self.name if self.name else f"Key-{self.id}"

Access key model matching API schema with optimized properties.

SCHEMA: Based on OpenAPI /access-keys endpoint

id: str = PydanticUndefined
name: str | None = None
password: str = PydanticUndefined
port: Annotated[int, FieldInfo(annotation=NoneType, required=True, description='Port number (1-65535)', metadata=[Ge(ge=1), Le(le=65535)])] = PydanticUndefined

Port number (1-65535)

method: str = PydanticUndefined
access_url: str = PydanticUndefined
data_limit: DataLimit | None = None
@field_validator('name', mode='before')
@classmethod
def validate_name(cls, v: str | None) -> str | None:
151    @field_validator("name", mode="before")
152    @classmethod
153    def validate_name(cls, v: str | None) -> str | None:
154        """Validate and normalize name from API."""
155        if v is None:
156            return None
157
158        if isinstance(v, str):
159            stripped = v.strip()
160            if not stripped:
161                return None
162
163            if len(stripped) > Constants.MAX_NAME_LENGTH:
164                raise ValueError(
165                    f"Name too long: {len(stripped)} (max {Constants.MAX_NAME_LENGTH})"
166                )
167            return stripped

Validate and normalize name from API.

@field_validator('id')
@classmethod
def validate_id(cls, v: str) -> str:
169    @field_validator("id")
170    @classmethod
171    def validate_id(cls, v: str) -> str:
172        """Validate key ID.
173
174        :param v: Key ID
175        :return: Validated key ID
176        :raises ValueError: If ID is invalid
177        """
178        return Validators.validate_key_id(v)

Validate key ID.

Parameters
  • v: Key ID
Returns

Validated key ID

Raises
  • ValueError: If ID is invalid
has_data_limit: bool
180    @property
181    def has_data_limit(self) -> bool:
182        """Check if key has data limit (optimized None check).
183
184        :return: True if data limit exists
185        """
186        return self.data_limit is not None

Check if key has data limit (optimized None check).

Returns

True if data limit exists

display_name: str
188    @property
189    def display_name(self) -> str:
190        """Get display name with optimized conditional.
191
192        :return: Display name
193        """
194        return self.name if self.name else f"Key-{self.id}"

Get display name with optimized conditional.

Returns

Display name

class AccessKeyCreateRequest(pyoutlineapi.common_types.BaseValidatedModel):
491class AccessKeyCreateRequest(BaseValidatedModel):
492    """Request model for creating access keys.
493
494    SCHEMA: Based on POST /access-keys request body
495    """
496
497    name: str | None = Field(default=None, min_length=1, max_length=255)
498    method: str | None = None
499    password: str | None = None
500    port: Port | None = None
501    limit: DataLimit | None = None

Request model for creating access keys.

SCHEMA: Based on POST /access-keys request body

name: str | None = None
method: str | None = None
password: str | None = None
port: Optional[Annotated[int, FieldInfo(annotation=NoneType, required=True, description='Port number (1-65535)', metadata=[Ge(ge=1), Le(le=65535)])]] = None
limit: DataLimit | None = None
class AccessKeyList(pyoutlineapi.common_types.BaseValidatedModel):
197class AccessKeyList(BaseValidatedModel):
198    """List of access keys with optimized utility methods.
199
200    SCHEMA: Based on GET /access-keys response
201    """
202
203    access_keys: list[AccessKey] = Field(alias="accessKeys")
204
205    @cached_property
206    def count(self) -> int:
207        """Get number of access keys (cached).
208
209        NOTE: Cached because list is immutable after creation
210
211        :return: Key count
212        """
213        return len(self.access_keys)
214
215    @property
216    def is_empty(self) -> bool:
217        """Check if list is empty (uses cached count).
218
219        :return: True if no keys
220        """
221        return self.count == 0
222
223    def get_by_id(self, key_id: str) -> AccessKey | None:
224        """Get key by ID with early return optimization.
225
226        :param key_id: Access key ID
227        :return: Access key or None if not found
228        """
229        for key in self.access_keys:
230            if key.id == key_id:
231                return key
232        return None
233
234    def get_by_name(self, name: str) -> list[AccessKey]:
235        """Get keys by name with optimized list comprehension.
236
237        :param name: Key name
238        :return: List of matching keys (may be multiple)
239        """
240        return [key for key in self.access_keys if key.name == name]
241
242    def filter_with_limits(self) -> list[AccessKey]:
243        """Get keys with data limits (optimized comprehension).
244
245        :return: List of keys with limits
246        """
247        return [key for key in self.access_keys if key.has_data_limit]
248
249    def filter_without_limits(self) -> list[AccessKey]:
250        """Get keys without data limits (optimized comprehension).
251
252        :return: List of keys without limits
253        """
254        return [key for key in self.access_keys if not key.has_data_limit]

List of access keys with optimized utility methods.

SCHEMA: Based on GET /access-keys response

access_keys: list[AccessKey] = PydanticUndefined
count: int
205    @cached_property
206    def count(self) -> int:
207        """Get number of access keys (cached).
208
209        NOTE: Cached because list is immutable after creation
210
211        :return: Key count
212        """
213        return len(self.access_keys)

Get number of access keys (cached).

NOTE: Cached because list is immutable after creation

Returns

Key count

is_empty: bool
215    @property
216    def is_empty(self) -> bool:
217        """Check if list is empty (uses cached count).
218
219        :return: True if no keys
220        """
221        return self.count == 0

Check if list is empty (uses cached count).

Returns

True if no keys

def get_by_id(self, key_id: str) -> AccessKey | None:
223    def get_by_id(self, key_id: str) -> AccessKey | None:
224        """Get key by ID with early return optimization.
225
226        :param key_id: Access key ID
227        :return: Access key or None if not found
228        """
229        for key in self.access_keys:
230            if key.id == key_id:
231                return key
232        return None

Get key by ID with early return optimization.

Parameters
  • key_id: Access key ID
Returns

Access key or None if not found

def get_by_name(self, name: str) -> list[AccessKey]:
234    def get_by_name(self, name: str) -> list[AccessKey]:
235        """Get keys by name with optimized list comprehension.
236
237        :param name: Key name
238        :return: List of matching keys (may be multiple)
239        """
240        return [key for key in self.access_keys if key.name == name]

Get keys by name with optimized list comprehension.

Parameters
  • name: Key name
Returns

List of matching keys (may be multiple)

def filter_with_limits(self) -> list[AccessKey]:
242    def filter_with_limits(self) -> list[AccessKey]:
243        """Get keys with data limits (optimized comprehension).
244
245        :return: List of keys with limits
246        """
247        return [key for key in self.access_keys if key.has_data_limit]

Get keys with data limits (optimized comprehension).

Returns

List of keys with limits

def filter_without_limits(self) -> list[AccessKey]:
249    def filter_without_limits(self) -> list[AccessKey]:
250        """Get keys without data limits (optimized comprehension).
251
252        :return: List of keys without limits
253        """
254        return [key for key in self.access_keys if not key.has_data_limit]

Get keys without data limits (optimized comprehension).

Returns

List of keys without limits

class AccessKeyMetric(pyoutlineapi.common_types.BaseValidatedModel):
443class AccessKeyMetric(BaseValidatedModel):
444    """Per-key experimental metrics.
445
446    SCHEMA: Based on experimental metrics accessKeys array item
447    """
448
449    access_key_id: str = Field(alias="accessKeyId")
450    tunnel_time: TunnelTime = Field(alias="tunnelTime")
451    data_transferred: DataTransferred = Field(alias="dataTransferred")
452    connection: ConnectionInfo

Per-key experimental metrics.

SCHEMA: Based on experimental metrics accessKeys array item

access_key_id: str = PydanticUndefined
tunnel_time: TunnelTime = PydanticUndefined
data_transferred: DataTransferred = PydanticUndefined
connection: pyoutlineapi.models.ConnectionInfo = PydanticUndefined
class AccessKeyNameRequest(pyoutlineapi.common_types.BaseValidatedModel):
531class AccessKeyNameRequest(BaseValidatedModel):
532    """Request model for renaming access key.
533
534    SCHEMA: Based on PUT /access-keys/{id}/name request body
535    """
536
537    name: str = Field(min_length=1, max_length=255)

Request model for renaming access key.

SCHEMA: Based on PUT /access-keys/{id}/name request body

name: str = PydanticUndefined
class AsyncOutlineClient(pyoutlineapi.base_client.BaseHTTPClient, pyoutlineapi.api_mixins.ServerMixin, pyoutlineapi.api_mixins.AccessKeyMixin, pyoutlineapi.api_mixins.DataLimitMixin, pyoutlineapi.api_mixins.MetricsMixin):
 46class AsyncOutlineClient(
 47    BaseHTTPClient,
 48    ServerMixin,
 49    AccessKeyMixin,
 50    DataLimitMixin,
 51    MetricsMixin,
 52):
 53    """High-performance async client for Outline VPN Server API."""
 54
 55    __slots__ = (
 56        "_audit_logger_instance",
 57        "_config",
 58        "_default_json_format",
 59    )
 60
 61    def __init__(
 62        self,
 63        config: OutlineClientConfig | None = None,
 64        *,
 65        api_url: str | None = None,
 66        cert_sha256: str | None = None,
 67        audit_logger: AuditLogger | None = None,
 68        metrics: MetricsCollector | None = None,
 69        **overrides: Unpack[ConfigOverrides],
 70    ) -> None:
 71        """Initialize Outline client with modern configuration approach.
 72
 73        Uses structural pattern matching for configuration resolution.
 74
 75        :param config: Client configuration object
 76        :param api_url: API URL (alternative to config)
 77        :param cert_sha256: Certificate fingerprint (alternative to config)
 78        :param audit_logger: Custom audit logger
 79        :param metrics: Custom metrics collector
 80        :param overrides: Configuration overrides (timeout, retry_attempts, etc.)
 81        :raises ConfigurationError: If configuration is invalid
 82
 83        Example:
 84            >>> async with AsyncOutlineClient.from_env() as client:
 85            ...     info = await client.get_server_info()
 86        """
 87        # Build config_kwargs using utility function (DRY)
 88        config_kwargs = build_config_overrides(**overrides)
 89
 90        # Validate configuration using pattern matching
 91        resolved_config = self._resolve_configuration(
 92            config, api_url, cert_sha256, config_kwargs
 93        )
 94
 95        self._config = resolved_config
 96        self._audit_logger_instance = audit_logger
 97        self._default_json_format = resolved_config.json_format
 98
 99        # Initialize base HTTP client
100        super().__init__(
101            api_url=resolved_config.api_url,
102            cert_sha256=resolved_config.cert_sha256,
103            timeout=resolved_config.timeout,
104            retry_attempts=resolved_config.retry_attempts,
105            max_connections=resolved_config.max_connections,
106            user_agent=resolved_config.user_agent,
107            enable_logging=resolved_config.enable_logging,
108            circuit_config=resolved_config.circuit_config,
109            rate_limit=resolved_config.rate_limit,
110            allow_private_networks=resolved_config.allow_private_networks,
111            resolve_dns_for_ssrf=resolved_config.resolve_dns_for_ssrf,
112            audit_logger=audit_logger,
113            metrics=metrics,
114        )
115
116        # Cache instance for weak reference tracking (automatic cleanup)
117        _client_cache[id(self)] = self
118
119        if resolved_config.enable_logging and logger.isEnabledFor(logging.INFO):
120            safe_url = Validators.sanitize_url_for_logging(self.api_url)
121            logger.info("Client initialized for %s", safe_url)
122
123    @staticmethod
124    def _resolve_configuration(
125        config: OutlineClientConfig | None,
126        api_url: str | None,
127        cert_sha256: str | None,
128        kwargs: dict[str, Any],
129    ) -> OutlineClientConfig:
130        """Resolve and validate configuration using pattern matching.
131
132        :param config: Configuration object
133        :param api_url: Direct API URL
134        :param cert_sha256: Direct certificate
135        :param kwargs: Additional kwargs
136        :return: Resolved configuration
137        :raises ConfigurationError: If configuration is invalid
138        """
139        match config, api_url, cert_sha256:
140            # Pattern 1: Direct parameters provided (most common case)
141            case None, str(url), str(cert) if url and cert:
142                return OutlineClientConfig.create_minimal(url, cert, **kwargs)
143
144            # Pattern 2: Config object provided
145            case OutlineClientConfig() as cfg, None, None:
146                return cfg
147
148            # Pattern 3: Missing required parameters
149            case None, None, _:
150                raise ConfigurationError(
151                    "Missing required 'api_url'",
152                    field="api_url",
153                    security_issue=False,
154                )
155            case None, _, None:
156                raise ConfigurationError(
157                    "Missing required 'cert_sha256'",
158                    field="cert_sha256",
159                    security_issue=True,
160                )
161
162            # Pattern 4: Conflicting parameters
163            case OutlineClientConfig(), str() | None, str() | None:
164                raise ConfigurationError(
165                    "Cannot specify both 'config' and direct parameters"
166                )
167
168            # Pattern 5: Invalid combination (catch-all)
169            case _:
170                raise ConfigurationError("Invalid parameter combination")
171
172    @property
173    def config(self) -> OutlineClientConfig:
174        """Get immutable copy of configuration.
175
176        :return: Deep copy of configuration
177        """
178        return self._config.model_copy_immutable()
179
180    @property
181    def get_sanitized_config(self) -> dict[str, Any]:
182        """Delegate to config's sanitized representation.
183
184        See: OutlineClientConfig.get_sanitized_config().
185
186        :return: Sanitized configuration from underlying config object
187        """
188        return self._config.get_sanitized_config
189
190    @property
191    def json_format(self) -> bool:
192        """Get JSON format preference.
193
194        :return: True if raw JSON format is preferred
195        """
196        return self._default_json_format
197
198    # ===== Factory Methods =====
199
200    @classmethod
201    @asynccontextmanager
202    async def create(
203        cls,
204        api_url: str | None = None,
205        cert_sha256: str | None = None,
206        *,
207        config: OutlineClientConfig | None = None,
208        audit_logger: AuditLogger | None = None,
209        metrics: MetricsCollector | None = None,
210        **overrides: Unpack[ConfigOverrides],
211    ) -> AsyncGenerator[AsyncOutlineClient, None]:
212        """Create and initialize client as async context manager.
213
214        Automatically handles initialization and cleanup.
215        Recommended way to create clients in async contexts.
216
217        :param api_url: API URL
218        :param cert_sha256: Certificate fingerprint
219        :param config: Configuration object
220        :param audit_logger: Custom audit logger
221        :param metrics: Custom metrics collector
222        :param overrides: Configuration overrides (timeout, retry_attempts, etc.)
223        :yield: Initialized client instance
224        :raises ConfigurationError: If configuration is invalid
225
226        Example:
227            >>> async with AsyncOutlineClient.from_env() as client:
228            ...     keys = await client.get_access_keys()
229        """
230        if config is not None:
231            client = cls(config=config, audit_logger=audit_logger, metrics=metrics)
232        else:
233            client = cls(
234                api_url=api_url,
235                cert_sha256=cert_sha256,
236                audit_logger=audit_logger,
237                metrics=metrics,
238                **overrides,
239            )
240
241        async with client:
242            yield client
243
244    @classmethod
245    def from_env(
246        cls,
247        *,
248        env_file: str | Path | None = None,
249        audit_logger: AuditLogger | None = None,
250        metrics: MetricsCollector | None = None,
251        **overrides: Unpack[ConfigOverrides],
252    ) -> AsyncOutlineClient:
253        """Create client from environment variables.
254
255        Reads configuration from environment or .env file.
256        Modern approach using **overrides for runtime configuration.
257
258        :param env_file: Path to environment file (.env)
259        :param audit_logger: Custom audit logger
260        :param metrics: Custom metrics collector
261        :param overrides: Configuration overrides (timeout, enable_logging, etc.)
262        :return: Configured client instance
263        :raises ConfigurationError: If environment configuration is invalid
264
265        Example:
266            >>> async with AsyncOutlineClient.from_env(
267            ...     env_file=".env.production",
268            ...     timeout=20,
269            ... ) as client:
270            ...     info = await client.get_server_info()
271        """
272        config = OutlineClientConfig.from_env(env_file=env_file, **overrides)
273        return cls(config=config, audit_logger=audit_logger, metrics=metrics)
274
275    # ===== Context Manager Methods =====
276
277    async def __aexit__(
278        self,
279        exc_type: type[BaseException] | None,
280        exc_val: BaseException | None,
281        exc_tb: object | None,
282    ) -> None:
283        """Async context manager exit with comprehensive cleanup.
284
285        Ensures graceful shutdown even on exceptions. Uses ordered cleanup
286        sequence for proper resource deallocation.
287
288        Cleanup order:
289        1. Audit logger shutdown (drain queue)
290        2. HTTP client shutdown (close connections)
291        3. Emergency cleanup if steps 1-2 failed
292
293        :param exc_type: Exception type if error occurred
294        :param exc_val: Exception instance if error occurred
295        :param exc_tb: Exception traceback
296        :return: False to propagate exceptions
297        """
298        cleanup_errors: list[str] = []
299
300        # Step 1: Graceful audit logger shutdown
301        if self._audit_logger_instance is not None:
302            try:
303                if hasattr(self._audit_logger_instance, "shutdown"):
304                    shutdown_method = self._audit_logger_instance.shutdown
305                    if asyncio.iscoroutinefunction(shutdown_method):
306                        await shutdown_method()
307            except Exception as e:
308                error_msg = f"Audit logger shutdown error: {e}"
309                cleanup_errors.append(error_msg)
310                if logger.isEnabledFor(logging.WARNING):
311                    logger.warning(error_msg)
312
313        # Step 2: Shutdown HTTP client
314        try:
315            await self.shutdown(timeout=30.0)
316        except Exception as e:
317            error_msg = f"HTTP client shutdown error: {e}"
318            cleanup_errors.append(error_msg)
319            if logger.isEnabledFor(logging.ERROR):
320                logger.error(error_msg)
321
322        # Step 3: Emergency cleanup if shutdown failed
323        if cleanup_errors and hasattr(self, "_session"):
324            try:
325                if self._session and not self._session.closed:
326                    await self._session.close()
327                    if logger.isEnabledFor(logging.DEBUG):
328                        logger.debug("Emergency session cleanup completed")
329            except Exception as e:
330                if logger.isEnabledFor(logging.DEBUG):
331                    logger.debug("Emergency cleanup error: %s", e)
332
333        # Log summary of cleanup issues
334        if cleanup_errors and logger.isEnabledFor(logging.WARNING):
335            logger.warning(
336                "Cleanup completed with %d error(s): %s",
337                len(cleanup_errors),
338                "; ".join(cleanup_errors),
339            )
340
341        # Always propagate the original exception
342        return None
343
344    # ===== Utility Methods =====
345
346    async def health_check(self) -> dict[str, Any]:
347        """Perform basic health check.
348
349        Non-intrusive check that tests server connectivity without
350        modifying any state. Returns comprehensive health metrics.
351
352        :return: Health check result dictionary with response time
353
354        Example result:
355            {
356                "timestamp": 1234567890.123,
357                "healthy": True,
358                "response_time_ms": 45.2,
359                "connected": True,
360                "circuit_state": "closed",
361                "active_requests": 2,
362                "rate_limit_available": 98
363            }
364        """
365        import time
366
367        health_data: dict[str, Any] = {
368            "timestamp": time.time(),
369            "connected": self.is_connected,
370            "circuit_state": self.circuit_state,
371            "active_requests": self.active_requests,
372            "rate_limit_available": self.available_slots,
373        }
374
375        try:
376            start_time = time.monotonic()
377            await self.get_server_info()
378            duration = time.monotonic() - start_time
379
380            health_data["healthy"] = True
381            health_data["response_time_ms"] = round(duration * 1000, 2)
382
383        except Exception as e:
384            health_data["healthy"] = False
385            health_data["error"] = str(e)
386            health_data["error_type"] = type(e).__name__
387
388        return health_data
389
390    async def get_server_summary(self) -> dict[str, Any]:
391        """Get comprehensive server overview.
392
393        Aggregates multiple API calls into a single summary.
394        Continues on partial failures to return maximum information.
395        Executes non-dependent calls concurrently for performance.
396
397        :return: Server summary dictionary with aggregated data
398
399        Example result:
400            {
401                "timestamp": 1234567890.123,
402                "healthy": True,
403                "server": {...},
404                "access_keys_count": 10,
405                "metrics_enabled": True,
406                "transfer_metrics": {...},
407                "client_status": {...},
408                "errors": []
409            }
410        """
411        import time
412
413        summary: dict[str, Any] = {
414            "timestamp": time.time(),
415            "healthy": True,
416            "errors": [],
417        }
418
419        server_task = self.get_server_info(as_json=True)
420        keys_task = self.get_access_keys(as_json=True)
421        metrics_status_task = self.get_metrics_status(as_json=True)
422
423        server_result, keys_result, metrics_status_result = await asyncio.gather(
424            server_task, keys_task, metrics_status_task, return_exceptions=True
425        )
426
427        # Process server info
428        if isinstance(server_result, Exception):
429            summary["healthy"] = False
430            summary["errors"].append(f"Server info error: {server_result}")
431            if logger.isEnabledFor(logging.DEBUG):
432                logger.debug("Failed to fetch server info: %s", server_result)
433        else:
434            summary["server"] = server_result
435
436        # Process access keys
437        if isinstance(keys_result, Exception):
438            summary["healthy"] = False
439            summary["errors"].append(f"Access keys error: {keys_result}")
440            if logger.isEnabledFor(logging.DEBUG):
441                logger.debug("Failed to fetch access keys: %s", keys_result)
442        elif isinstance(keys_result, dict):
443            keys_list = keys_result.get("accessKeys", [])
444            summary["access_keys_count"] = (
445                len(keys_list) if isinstance(keys_list, list) else 0
446            )
447        elif isinstance(keys_result, AccessKeyList):
448            summary["access_keys_count"] = len(keys_result.access_keys)
449        else:
450            summary["access_keys_count"] = 0
451
452        # Process metrics status
453        if isinstance(metrics_status_result, Exception):
454            summary["errors"].append(f"Metrics status error: {metrics_status_result}")
455            if logger.isEnabledFor(logging.DEBUG):
456                logger.debug(
457                    "Failed to fetch metrics status: %s", metrics_status_result
458                )
459        elif isinstance(metrics_status_result, dict):
460            metrics_enabled = bool(metrics_status_result.get("metricsEnabled", False))
461            summary["metrics_enabled"] = metrics_enabled
462
463            # Fetch transfer metrics if enabled (dependent call - sequential)
464            if metrics_enabled:
465                try:
466                    transfer = await self.get_transfer_metrics(as_json=True)
467                    summary["transfer_metrics"] = transfer
468                except Exception as e:
469                    summary["errors"].append(f"Transfer metrics error: {e}")
470                    if logger.isEnabledFor(logging.DEBUG):
471                        logger.debug("Failed to fetch transfer metrics: %s", e)
472        elif isinstance(metrics_status_result, MetricsStatusResponse):
473            summary["metrics_enabled"] = metrics_status_result.metrics_enabled
474            if metrics_status_result.metrics_enabled:
475                try:
476                    transfer = await self.get_transfer_metrics(as_json=True)
477                    summary["transfer_metrics"] = transfer
478                except Exception as e:
479                    summary["errors"].append(f"Transfer metrics error: {e}")
480                    if logger.isEnabledFor(logging.DEBUG):
481                        logger.debug("Failed to fetch transfer metrics: %s", e)
482        else:
483            summary["metrics_enabled"] = False
484
485        # Add client status (synchronous, no API call)
486        summary["client_status"] = {
487            "connected": self.is_connected,
488            "circuit_state": self.circuit_state,
489            "active_requests": self.active_requests,
490            "rate_limit": {
491                "limit": self.rate_limit,
492                "available": self.available_slots,
493            },
494        }
495
496        return summary
497
498    def get_status(self) -> dict[str, Any]:
499        """Get current client status (synchronous).
500
501        Returns immediate status without making API calls.
502        Useful for monitoring and debugging.
503
504        :return: Status dictionary with all client metrics
505
506        Example result:
507            {
508                "connected": True,
509                "circuit_state": "closed",
510                "active_requests": 2,
511                "rate_limit": {
512                    "limit": 100,
513                    "available": 98,
514                    "active": 2
515                },
516                "circuit_metrics": {...}
517            }
518        """
519        return {
520            "connected": self.is_connected,
521            "circuit_state": self.circuit_state,
522            "active_requests": self.active_requests,
523            "rate_limit": {
524                "limit": self.rate_limit,
525                "available": self.available_slots,
526                "active": self.active_requests,
527            },
528            "circuit_metrics": self.get_circuit_metrics(),
529        }
530
531    def __repr__(self) -> str:
532        """Safe string representation without secrets.
533
534        Does not expose any sensitive information (URLs, certificates, tokens).
535
536        :return: String representation
537        """
538        status = "connected" if self.is_connected else "disconnected"
539        parts = [f"status={status}"]
540
541        if self.circuit_state:
542            parts.append(f"circuit={self.circuit_state}")
543
544        if self.active_requests:
545            parts.append(f"requests={self.active_requests}")
546
547        return f"AsyncOutlineClient({', '.join(parts)})"

High-performance async client for Outline VPN Server API.

AsyncOutlineClient( config: OutlineClientConfig | None = None, *, api_url: str | None = None, cert_sha256: str | None = None, audit_logger: AuditLogger | None = None, metrics: MetricsCollector | None = None, **overrides: Unpack[ConfigOverrides])
 61    def __init__(
 62        self,
 63        config: OutlineClientConfig | None = None,
 64        *,
 65        api_url: str | None = None,
 66        cert_sha256: str | None = None,
 67        audit_logger: AuditLogger | None = None,
 68        metrics: MetricsCollector | None = None,
 69        **overrides: Unpack[ConfigOverrides],
 70    ) -> None:
 71        """Initialize Outline client with modern configuration approach.
 72
 73        Uses structural pattern matching for configuration resolution.
 74
 75        :param config: Client configuration object
 76        :param api_url: API URL (alternative to config)
 77        :param cert_sha256: Certificate fingerprint (alternative to config)
 78        :param audit_logger: Custom audit logger
 79        :param metrics: Custom metrics collector
 80        :param overrides: Configuration overrides (timeout, retry_attempts, etc.)
 81        :raises ConfigurationError: If configuration is invalid
 82
 83        Example:
 84            >>> async with AsyncOutlineClient.from_env() as client:
 85            ...     info = await client.get_server_info()
 86        """
 87        # Build config_kwargs using utility function (DRY)
 88        config_kwargs = build_config_overrides(**overrides)
 89
 90        # Validate configuration using pattern matching
 91        resolved_config = self._resolve_configuration(
 92            config, api_url, cert_sha256, config_kwargs
 93        )
 94
 95        self._config = resolved_config
 96        self._audit_logger_instance = audit_logger
 97        self._default_json_format = resolved_config.json_format
 98
 99        # Initialize base HTTP client
100        super().__init__(
101            api_url=resolved_config.api_url,
102            cert_sha256=resolved_config.cert_sha256,
103            timeout=resolved_config.timeout,
104            retry_attempts=resolved_config.retry_attempts,
105            max_connections=resolved_config.max_connections,
106            user_agent=resolved_config.user_agent,
107            enable_logging=resolved_config.enable_logging,
108            circuit_config=resolved_config.circuit_config,
109            rate_limit=resolved_config.rate_limit,
110            allow_private_networks=resolved_config.allow_private_networks,
111            resolve_dns_for_ssrf=resolved_config.resolve_dns_for_ssrf,
112            audit_logger=audit_logger,
113            metrics=metrics,
114        )
115
116        # Cache instance for weak reference tracking (automatic cleanup)
117        _client_cache[id(self)] = self
118
119        if resolved_config.enable_logging and logger.isEnabledFor(logging.INFO):
120            safe_url = Validators.sanitize_url_for_logging(self.api_url)
121            logger.info("Client initialized for %s", safe_url)

Initialize Outline client with modern configuration approach.

Uses structural pattern matching for configuration resolution.

Parameters
  • config: Client configuration object
  • api_url: API URL (alternative to config)
  • cert_sha256: Certificate fingerprint (alternative to config)
  • audit_logger: Custom audit logger
  • metrics: Custom metrics collector
  • overrides: Configuration overrides (timeout, retry_attempts, etc.)
Raises
  • ConfigurationError: If configuration is invalid
Example:
>>> async with AsyncOutlineClient.from_env() as client:
...     info = await client.get_server_info()
config: OutlineClientConfig
172    @property
173    def config(self) -> OutlineClientConfig:
174        """Get immutable copy of configuration.
175
176        :return: Deep copy of configuration
177        """
178        return self._config.model_copy_immutable()

Get immutable copy of configuration.

Returns

Deep copy of configuration

get_sanitized_config: dict[str, typing.Any]
180    @property
181    def get_sanitized_config(self) -> dict[str, Any]:
182        """Delegate to config's sanitized representation.
183
184        See: OutlineClientConfig.get_sanitized_config().
185
186        :return: Sanitized configuration from underlying config object
187        """
188        return self._config.get_sanitized_config

Delegate to config's sanitized representation.

See: OutlineClientConfig.get_sanitized_config().

Returns

Sanitized configuration from underlying config object

json_format: bool
190    @property
191    def json_format(self) -> bool:
192        """Get JSON format preference.
193
194        :return: True if raw JSON format is preferred
195        """
196        return self._default_json_format

Get JSON format preference.

Returns

True if raw JSON format is preferred

@classmethod
@asynccontextmanager
def create( cls, api_url: str | None = None, cert_sha256: str | None = None, *, config: OutlineClientConfig | None = None, audit_logger: AuditLogger | None = None, metrics: MetricsCollector | None = None, **overrides: Unpack[ConfigOverrides]) -> AsyncGenerator[AsyncOutlineClient, None]:
200    @classmethod
201    @asynccontextmanager
202    async def create(
203        cls,
204        api_url: str | None = None,
205        cert_sha256: str | None = None,
206        *,
207        config: OutlineClientConfig | None = None,
208        audit_logger: AuditLogger | None = None,
209        metrics: MetricsCollector | None = None,
210        **overrides: Unpack[ConfigOverrides],
211    ) -> AsyncGenerator[AsyncOutlineClient, None]:
212        """Create and initialize client as async context manager.
213
214        Automatically handles initialization and cleanup.
215        Recommended way to create clients in async contexts.
216
217        :param api_url: API URL
218        :param cert_sha256: Certificate fingerprint
219        :param config: Configuration object
220        :param audit_logger: Custom audit logger
221        :param metrics: Custom metrics collector
222        :param overrides: Configuration overrides (timeout, retry_attempts, etc.)
223        :yield: Initialized client instance
224        :raises ConfigurationError: If configuration is invalid
225
226        Example:
227            >>> async with AsyncOutlineClient.from_env() as client:
228            ...     keys = await client.get_access_keys()
229        """
230        if config is not None:
231            client = cls(config=config, audit_logger=audit_logger, metrics=metrics)
232        else:
233            client = cls(
234                api_url=api_url,
235                cert_sha256=cert_sha256,
236                audit_logger=audit_logger,
237                metrics=metrics,
238                **overrides,
239            )
240
241        async with client:
242            yield client

Create and initialize client as async context manager.

Automatically handles initialization and cleanup. Recommended way to create clients in async contexts.

Parameters
  • api_url: API URL
  • cert_sha256: Certificate fingerprint
  • config: Configuration object
  • audit_logger: Custom audit logger
  • metrics: Custom metrics collector
  • overrides: Configuration overrides (timeout, retry_attempts, etc.) :yield: Initialized client instance
Raises
  • ConfigurationError: If configuration is invalid
Example:
>>> async with AsyncOutlineClient.from_env() as client:
...     keys = await client.get_access_keys()
@classmethod
def from_env( cls, *, env_file: str | pathlib._local.Path | None = None, audit_logger: AuditLogger | None = None, metrics: MetricsCollector | None = None, **overrides: Unpack[ConfigOverrides]) -> AsyncOutlineClient:
244    @classmethod
245    def from_env(
246        cls,
247        *,
248        env_file: str | Path | None = None,
249        audit_logger: AuditLogger | None = None,
250        metrics: MetricsCollector | None = None,
251        **overrides: Unpack[ConfigOverrides],
252    ) -> AsyncOutlineClient:
253        """Create client from environment variables.
254
255        Reads configuration from environment or .env file.
256        Modern approach using **overrides for runtime configuration.
257
258        :param env_file: Path to environment file (.env)
259        :param audit_logger: Custom audit logger
260        :param metrics: Custom metrics collector
261        :param overrides: Configuration overrides (timeout, enable_logging, etc.)
262        :return: Configured client instance
263        :raises ConfigurationError: If environment configuration is invalid
264
265        Example:
266            >>> async with AsyncOutlineClient.from_env(
267            ...     env_file=".env.production",
268            ...     timeout=20,
269            ... ) as client:
270            ...     info = await client.get_server_info()
271        """
272        config = OutlineClientConfig.from_env(env_file=env_file, **overrides)
273        return cls(config=config, audit_logger=audit_logger, metrics=metrics)

Create client from environment variables.

Reads configuration from environment or .env file. Modern approach using **overrides for runtime configuration.

Parameters
  • env_file: Path to environment file (.env)
  • audit_logger: Custom audit logger
  • metrics: Custom metrics collector
  • overrides: Configuration overrides (timeout, enable_logging, etc.)
Returns

Configured client instance

Raises
  • ConfigurationError: If environment configuration is invalid
Example:
>>> async with AsyncOutlineClient.from_env(
...     env_file=".env.production",
...     timeout=20,
... ) as client:
...     info = await client.get_server_info()
async def health_check(self) -> dict[str, typing.Any]:
346    async def health_check(self) -> dict[str, Any]:
347        """Perform basic health check.
348
349        Non-intrusive check that tests server connectivity without
350        modifying any state. Returns comprehensive health metrics.
351
352        :return: Health check result dictionary with response time
353
354        Example result:
355            {
356                "timestamp": 1234567890.123,
357                "healthy": True,
358                "response_time_ms": 45.2,
359                "connected": True,
360                "circuit_state": "closed",
361                "active_requests": 2,
362                "rate_limit_available": 98
363            }
364        """
365        import time
366
367        health_data: dict[str, Any] = {
368            "timestamp": time.time(),
369            "connected": self.is_connected,
370            "circuit_state": self.circuit_state,
371            "active_requests": self.active_requests,
372            "rate_limit_available": self.available_slots,
373        }
374
375        try:
376            start_time = time.monotonic()
377            await self.get_server_info()
378            duration = time.monotonic() - start_time
379
380            health_data["healthy"] = True
381            health_data["response_time_ms"] = round(duration * 1000, 2)
382
383        except Exception as e:
384            health_data["healthy"] = False
385            health_data["error"] = str(e)
386            health_data["error_type"] = type(e).__name__
387
388        return health_data

Perform basic health check.

Non-intrusive check that tests server connectivity without modifying any state. Returns comprehensive health metrics.

Returns

Health check result dictionary with response time

Example result:

{ "timestamp": 1234567890.123, "healthy": True, "response_time_ms": 45.2, "connected": True, "circuit_state": "closed", "active_requests": 2, "rate_limit_available": 98 }

async def get_server_summary(self) -> dict[str, typing.Any]:
390    async def get_server_summary(self) -> dict[str, Any]:
391        """Get comprehensive server overview.
392
393        Aggregates multiple API calls into a single summary.
394        Continues on partial failures to return maximum information.
395        Executes non-dependent calls concurrently for performance.
396
397        :return: Server summary dictionary with aggregated data
398
399        Example result:
400            {
401                "timestamp": 1234567890.123,
402                "healthy": True,
403                "server": {...},
404                "access_keys_count": 10,
405                "metrics_enabled": True,
406                "transfer_metrics": {...},
407                "client_status": {...},
408                "errors": []
409            }
410        """
411        import time
412
413        summary: dict[str, Any] = {
414            "timestamp": time.time(),
415            "healthy": True,
416            "errors": [],
417        }
418
419        server_task = self.get_server_info(as_json=True)
420        keys_task = self.get_access_keys(as_json=True)
421        metrics_status_task = self.get_metrics_status(as_json=True)
422
423        server_result, keys_result, metrics_status_result = await asyncio.gather(
424            server_task, keys_task, metrics_status_task, return_exceptions=True
425        )
426
427        # Process server info
428        if isinstance(server_result, Exception):
429            summary["healthy"] = False
430            summary["errors"].append(f"Server info error: {server_result}")
431            if logger.isEnabledFor(logging.DEBUG):
432                logger.debug("Failed to fetch server info: %s", server_result)
433        else:
434            summary["server"] = server_result
435
436        # Process access keys
437        if isinstance(keys_result, Exception):
438            summary["healthy"] = False
439            summary["errors"].append(f"Access keys error: {keys_result}")
440            if logger.isEnabledFor(logging.DEBUG):
441                logger.debug("Failed to fetch access keys: %s", keys_result)
442        elif isinstance(keys_result, dict):
443            keys_list = keys_result.get("accessKeys", [])
444            summary["access_keys_count"] = (
445                len(keys_list) if isinstance(keys_list, list) else 0
446            )
447        elif isinstance(keys_result, AccessKeyList):
448            summary["access_keys_count"] = len(keys_result.access_keys)
449        else:
450            summary["access_keys_count"] = 0
451
452        # Process metrics status
453        if isinstance(metrics_status_result, Exception):
454            summary["errors"].append(f"Metrics status error: {metrics_status_result}")
455            if logger.isEnabledFor(logging.DEBUG):
456                logger.debug(
457                    "Failed to fetch metrics status: %s", metrics_status_result
458                )
459        elif isinstance(metrics_status_result, dict):
460            metrics_enabled = bool(metrics_status_result.get("metricsEnabled", False))
461            summary["metrics_enabled"] = metrics_enabled
462
463            # Fetch transfer metrics if enabled (dependent call - sequential)
464            if metrics_enabled:
465                try:
466                    transfer = await self.get_transfer_metrics(as_json=True)
467                    summary["transfer_metrics"] = transfer
468                except Exception as e:
469                    summary["errors"].append(f"Transfer metrics error: {e}")
470                    if logger.isEnabledFor(logging.DEBUG):
471                        logger.debug("Failed to fetch transfer metrics: %s", e)
472        elif isinstance(metrics_status_result, MetricsStatusResponse):
473            summary["metrics_enabled"] = metrics_status_result.metrics_enabled
474            if metrics_status_result.metrics_enabled:
475                try:
476                    transfer = await self.get_transfer_metrics(as_json=True)
477                    summary["transfer_metrics"] = transfer
478                except Exception as e:
479                    summary["errors"].append(f"Transfer metrics error: {e}")
480                    if logger.isEnabledFor(logging.DEBUG):
481                        logger.debug("Failed to fetch transfer metrics: %s", e)
482        else:
483            summary["metrics_enabled"] = False
484
485        # Add client status (synchronous, no API call)
486        summary["client_status"] = {
487            "connected": self.is_connected,
488            "circuit_state": self.circuit_state,
489            "active_requests": self.active_requests,
490            "rate_limit": {
491                "limit": self.rate_limit,
492                "available": self.available_slots,
493            },
494        }
495
496        return summary

Get comprehensive server overview.

Aggregates multiple API calls into a single summary. Continues on partial failures to return maximum information. Executes non-dependent calls concurrently for performance.

Returns

Server summary dictionary with aggregated data

Example result:

{ "timestamp": 1234567890.123, "healthy": True, "server": {...}, "access_keys_count": 10, "metrics_enabled": True, "transfer_metrics": {...}, "client_status": {...}, "errors": [] }

def get_status(self) -> dict[str, typing.Any]:
498    def get_status(self) -> dict[str, Any]:
499        """Get current client status (synchronous).
500
501        Returns immediate status without making API calls.
502        Useful for monitoring and debugging.
503
504        :return: Status dictionary with all client metrics
505
506        Example result:
507            {
508                "connected": True,
509                "circuit_state": "closed",
510                "active_requests": 2,
511                "rate_limit": {
512                    "limit": 100,
513                    "available": 98,
514                    "active": 2
515                },
516                "circuit_metrics": {...}
517            }
518        """
519        return {
520            "connected": self.is_connected,
521            "circuit_state": self.circuit_state,
522            "active_requests": self.active_requests,
523            "rate_limit": {
524                "limit": self.rate_limit,
525                "available": self.available_slots,
526                "active": self.active_requests,
527            },
528            "circuit_metrics": self.get_circuit_metrics(),
529        }

Get current client status (synchronous).

Returns immediate status without making API calls. Useful for monitoring and debugging.

Returns

Status dictionary with all client metrics

Example result:

{ "connected": True, "circuit_state": "closed", "active_requests": 2, "rate_limit": { "limit": 100, "available": 98, "active": 2 }, "circuit_metrics": {...} }

@dataclass(slots=True, frozen=True)
class AuditContext:
 57@dataclass(slots=True, frozen=True)
 58class AuditContext:
 59    """Immutable audit context extracted from function call.
 60
 61    Uses structural pattern matching and signature inspection for smart extraction.
 62    """
 63
 64    action: str
 65    resource: str
 66    success: bool
 67    details: dict[str, Any] = field(default_factory=dict)
 68    correlation_id: str | None = None
 69
 70    @classmethod
 71    def from_call(
 72        cls,
 73        func: Callable[..., Any],
 74        instance: object,
 75        args: tuple[Any, ...],
 76        kwargs: dict[str, Any],
 77        result: object = None,
 78        exception: Exception | None = None,
 79    ) -> AuditContext:
 80        """Build audit context from function call with intelligent extraction.
 81
 82        :param func: Function being audited
 83        :param instance: Instance (self) for methods
 84        :param args: Positional arguments
 85        :param kwargs: Keyword arguments
 86        :param result: Function result (if successful)
 87        :param exception: Exception (if failed)
 88        :return: Complete audit context
 89        """
 90        success = exception is None
 91
 92        # Extract action from function name (snake_case -> action)
 93        action = func.__name__
 94
 95        # Smart resource extraction
 96        resource = cls._extract_resource(func, args, kwargs, result, success)
 97
 98        # Smart details extraction with automatic sanitization
 99        details = cls._extract_details(func, args, kwargs, result, exception, success)
100
101        # Correlation ID from instance if available
102        correlation_id = getattr(instance, "_correlation_id", None)
103
104        return cls(
105            action=action,
106            resource=resource,
107            success=success,
108            details=details,
109            correlation_id=correlation_id,
110        )
111
112    @staticmethod
113    def _extract_resource(
114        func: Callable[..., Any],
115        args: tuple[Any, ...],
116        kwargs: dict[str, Any],
117        result: object,
118        success: bool,
119    ) -> str:
120        """Smart resource extraction using structural pattern matching.
121
122        Priority:
123        1. result.id (for create operations)
124        2. Known resource parameter names (key_id, id, resource_id)
125        3. First meaningful argument
126        4. Function name analysis
127        5. 'unknown' fallback
128
129        :param func: Function being audited
130        :param args: Positional arguments
131        :param kwargs: Keyword arguments
132        :param result: Function result
133        :param success: Whether operation succeeded
134        :return: Resource identifier
135        """
136        # Pattern 1: Extract from successful result
137        if success and result is not None:
138            match result:
139                case _ if hasattr(result, "id"):
140                    return str(result.id)
141                case dict() if "id" in result:
142                    return str(result["id"])
143
144        # Pattern 2: Extract from known parameter names
145        sig = inspect.signature(func)
146        params = list(sig.parameters.keys())
147
148        # Skip 'self' and 'cls'
149        params = [p for p in params if p not in ("self", "cls")]
150
151        # Try common resource identifiers in priority order
152        for resource_param in ("key_id", "id", "resource_id", "user_id", "name"):
153            if resource_param in kwargs:
154                return str(kwargs[resource_param])
155
156        # Pattern 3: First meaningful parameter
157        if params and params[0] in kwargs:
158            return str(kwargs[params[0]])
159
160        # Pattern 4: First positional argument (after self)
161        if args:
162            return str(args[0])
163
164        # Pattern 5: Analyze function name for hints
165        func_name = func.__name__.lower()
166        if any(keyword in func_name for keyword in ("server", "global", "system")):
167            return "server"
168
169        return "unknown"
170
171    @staticmethod
172    def _extract_details(
173        func: Callable[..., Any],
174        args: tuple[Any, ...],
175        kwargs: dict[str, Any],
176        result: object,
177        exception: Exception | None,
178        success: bool,
179    ) -> dict[str, Any]:
180        """Smart details extraction using signature introspection.
181
182        Only includes meaningful parameters (excludes technical ones and None values).
183        Automatically sanitizes sensitive data.
184
185        :param func: Function being audited
186        :param args: Positional arguments
187        :param kwargs: Keyword arguments
188        :param result: Function result
189        :param exception: Exception if failed
190        :param success: Whether operation succeeded
191        :return: Sanitized details dictionary
192        """
193        details: dict[str, Any] = {"success": success}
194
195        # Signature-based extraction
196        sig = inspect.signature(func)
197
198        # Parameters to exclude from details
199        excluded = {"self", "cls", "as_json", "return_raw"}
200
201        for param_name, param in sig.parameters.items():
202            if param_name in excluded:
203                continue
204
205            # Get actual value
206            value = kwargs.get(param_name)
207
208            # Only include meaningful values (not None, not default)
209            if value is not None and value != param.default:
210                # Convert complex objects to simple representations
211                match value:
212                    case _ if hasattr(value, "model_dump"):
213                        # Pydantic models
214                        details[param_name] = value.model_dump(exclude_none=True)
215                    case dict():
216                        details[param_name] = value
217                    case list() | tuple():
218                        details[param_name] = len(value)  # Count, not content
219                    case _:
220                        details[param_name] = value
221
222        # Add error information if present
223        if exception:
224            details["error"] = str(exception)
225            details["error_type"] = type(exception).__name__
226
227        # Sanitize sensitive data
228        return _sanitize_details(details)

Immutable audit context extracted from function call.

Uses structural pattern matching and signature inspection for smart extraction.

AuditContext( action: str, resource: str, success: bool, details: dict[str, typing.Any] = <factory>, correlation_id: str | None = None)
action: str
resource: str
success: bool
details: dict[str, typing.Any]
correlation_id: str | None
@classmethod
def from_call( cls, func: Callable[..., typing.Any], instance: object, args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any], result: object = None, exception: Exception | None = None) -> AuditContext:
 70    @classmethod
 71    def from_call(
 72        cls,
 73        func: Callable[..., Any],
 74        instance: object,
 75        args: tuple[Any, ...],
 76        kwargs: dict[str, Any],
 77        result: object = None,
 78        exception: Exception | None = None,
 79    ) -> AuditContext:
 80        """Build audit context from function call with intelligent extraction.
 81
 82        :param func: Function being audited
 83        :param instance: Instance (self) for methods
 84        :param args: Positional arguments
 85        :param kwargs: Keyword arguments
 86        :param result: Function result (if successful)
 87        :param exception: Exception (if failed)
 88        :return: Complete audit context
 89        """
 90        success = exception is None
 91
 92        # Extract action from function name (snake_case -> action)
 93        action = func.__name__
 94
 95        # Smart resource extraction
 96        resource = cls._extract_resource(func, args, kwargs, result, success)
 97
 98        # Smart details extraction with automatic sanitization
 99        details = cls._extract_details(func, args, kwargs, result, exception, success)
100
101        # Correlation ID from instance if available
102        correlation_id = getattr(instance, "_correlation_id", None)
103
104        return cls(
105            action=action,
106            resource=resource,
107            success=success,
108            details=details,
109            correlation_id=correlation_id,
110        )

Build audit context from function call with intelligent extraction.

Parameters
  • func: Function being audited
  • instance: Instance (self) for methods
  • args: Positional arguments
  • kwargs: Keyword arguments
  • result: Function result (if successful)
  • exception: Exception (if failed)
Returns

Complete audit context

@runtime_checkable
class AuditLogger(typing.Protocol):
234@runtime_checkable
235class AuditLogger(Protocol):
236    """Protocol for audit logging implementations.
237
238    Designed for async-first applications with sync fallback support.
239    """
240
241    async def alog_action(
242        self,
243        action: str,
244        resource: str,
245        *,
246        user: str | None = None,
247        details: dict[str, Any] | None = None,
248        correlation_id: str | None = None,
249    ) -> None:
250        """Log auditable action asynchronously (primary method)."""
251        ...  # pragma: no cover
252
253    def log_action(
254        self,
255        action: str,
256        resource: str,
257        *,
258        user: str | None = None,
259        details: dict[str, Any] | None = None,
260        correlation_id: str | None = None,
261    ) -> None:
262        """Log auditable action synchronously (fallback method)."""
263        ...  # pragma: no cover
264
265    async def shutdown(self) -> None:
266        """Gracefully shutdown logger."""
267        ...  # pragma: no cover

Protocol for audit logging implementations.

Designed for async-first applications with sync fallback support.

AuditLogger(*args, **kwargs)
1957def _no_init_or_replace_init(self, *args, **kwargs):
1958    cls = type(self)
1959
1960    if cls._is_protocol:
1961        raise TypeError('Protocols cannot be instantiated')
1962
1963    # Already using a custom `__init__`. No need to calculate correct
1964    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1965    if cls.__init__ is not _no_init_or_replace_init:
1966        return
1967
1968    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1969    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1970    # searches for a proper new `__init__` in the MRO. The new `__init__`
1971    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1972    # instantiation of the protocol subclass will thus use the new
1973    # `__init__` and no longer call `_no_init_or_replace_init`.
1974    for base in cls.__mro__:
1975        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1976        if init is not _no_init_or_replace_init:
1977            cls.__init__ = init
1978            break
1979    else:
1980        # should not happen
1981        cls.__init__ = object.__init__
1982
1983    cls.__init__(self, *args, **kwargs)
async def alog_action( self, action: str, resource: str, *, user: str | None = None, details: dict[str, typing.Any] | None = None, correlation_id: str | None = None) -> None:
241    async def alog_action(
242        self,
243        action: str,
244        resource: str,
245        *,
246        user: str | None = None,
247        details: dict[str, Any] | None = None,
248        correlation_id: str | None = None,
249    ) -> None:
250        """Log auditable action asynchronously (primary method)."""
251        ...  # pragma: no cover

Log auditable action asynchronously (primary method).

def log_action( self, action: str, resource: str, *, user: str | None = None, details: dict[str, typing.Any] | None = None, correlation_id: str | None = None) -> None:
253    def log_action(
254        self,
255        action: str,
256        resource: str,
257        *,
258        user: str | None = None,
259        details: dict[str, Any] | None = None,
260        correlation_id: str | None = None,
261    ) -> None:
262        """Log auditable action synchronously (fallback method)."""
263        ...  # pragma: no cover

Log auditable action synchronously (fallback method).

async def shutdown(self) -> None:
265    async def shutdown(self) -> None:
266        """Gracefully shutdown logger."""
267        ...  # pragma: no cover

Gracefully shutdown logger.

class BandwidthData(pyoutlineapi.common_types.BaseValidatedModel):
390class BandwidthData(BaseValidatedModel):
391    """Bandwidth measurement data.
392
393    SCHEMA: Based on experimental metrics bandwidth current/peak object
394    """
395
396    data: BandwidthDataValue
397    timestamp: TimestampSec | None = None

Bandwidth measurement data.

SCHEMA: Based on experimental metrics bandwidth current/peak object

data: BandwidthDataValue = PydanticUndefined
timestamp: Optional[Annotated[int, FieldInfo(annotation=NoneType, required=True, description='Unix timestamp in seconds', metadata=[Ge(ge=0)])]] = None
class BandwidthDataValue(pyoutlineapi.common_types.BaseValidatedModel):
381class BandwidthDataValue(BaseValidatedModel):
382    """Bandwidth data value.
383
384    SCHEMA: Based on experimental metrics bandwidth data object
385    """
386
387    bytes: int

Bandwidth data value.

SCHEMA: Based on experimental metrics bandwidth data object

bytes: int = PydanticUndefined
class BandwidthInfo(pyoutlineapi.common_types.BaseValidatedModel):
400class BandwidthInfo(BaseValidatedModel):
401    """Current and peak bandwidth information.
402
403    SCHEMA: Based on experimental metrics bandwidth object
404    """
405
406    current: BandwidthData
407    peak: BandwidthData

Current and peak bandwidth information.

SCHEMA: Based on experimental metrics bandwidth object

current: BandwidthData = PydanticUndefined
peak: BandwidthData = PydanticUndefined
@dataclass(frozen=True, slots=True)
class CircuitConfig:
49@dataclass(frozen=True, slots=True)
50class CircuitConfig:
51    """Circuit breaker configuration with validation.
52
53    Immutable configuration to prevent runtime modification.
54    Uses slots for memory efficiency (~40 bytes per instance).
55    """
56
57    failure_threshold: int = 5
58    recovery_timeout: float = 60.0
59    success_threshold: int = 2
60    call_timeout: float = 10.0
61
62    def __post_init__(self) -> None:
63        """Validate configuration at creation time.
64
65        :raises ValueError: If any configuration value is invalid
66        """
67        if self.failure_threshold < 1:
68            raise ValueError("failure_threshold must be >= 1")
69        if self.recovery_timeout < 1.0:
70            raise ValueError("recovery_timeout must be >= 1.0")
71        if self.success_threshold < 1:
72            raise ValueError("success_threshold must be >= 1")
73        if self.call_timeout < 0.1:
74            raise ValueError("call_timeout must be >= 0.1")

Circuit breaker configuration with validation.

Immutable configuration to prevent runtime modification. Uses slots for memory efficiency (~40 bytes per instance).

CircuitConfig( failure_threshold: int = 5, recovery_timeout: float = 60.0, success_threshold: int = 2, call_timeout: float = 10.0)
failure_threshold: int
recovery_timeout: float
success_threshold: int
call_timeout: float
@dataclass(slots=True)
class CircuitMetrics:
 77@dataclass(slots=True)
 78class CircuitMetrics:
 79    """Circuit breaker metrics with efficient storage.
 80
 81    Uses slots for memory efficiency (~80 bytes per instance).
 82    All calculations are O(1) with no allocations.
 83    """
 84
 85    total_calls: int = 0
 86    successful_calls: int = 0
 87    failed_calls: int = 0
 88    state_changes: int = 0
 89    last_failure_time: float = 0.0
 90    last_success_time: float = 0.0
 91
 92    @property
 93    def success_rate(self) -> float:
 94        """Calculate success rate (O(1), no allocations).
 95
 96        :return: Success rate as decimal (0.0 to 1.0)
 97        """
 98        if self.total_calls == 0:
 99            return 1.0
100        return self.successful_calls / self.total_calls
101
102    @property
103    def failure_rate(self) -> float:
104        """Calculate failure rate (O(1), no allocations).
105
106        :return: Failure rate as decimal (0.0 to 1.0)
107        """
108        return 1.0 - self.success_rate
109
110    def to_dict(self) -> dict[str, int | float]:
111        """Convert metrics to dictionary for serialization.
112
113        Pre-computes rates to avoid repeated calculations.
114
115        :return: Dictionary representation
116        """
117        success_rate = self.success_rate  # Calculate once
118        return {
119            "total_calls": self.total_calls,
120            "successful_calls": self.successful_calls,
121            "failed_calls": self.failed_calls,
122            "state_changes": self.state_changes,
123            "success_rate": success_rate,
124            "failure_rate": 1.0 - success_rate,  # Reuse calculation
125            "last_failure_time": self.last_failure_time,
126            "last_success_time": self.last_success_time,
127        }

Circuit breaker metrics with efficient storage.

Uses slots for memory efficiency (~80 bytes per instance). All calculations are O(1) with no allocations.

CircuitMetrics( total_calls: int = 0, successful_calls: int = 0, failed_calls: int = 0, state_changes: int = 0, last_failure_time: float = 0.0, last_success_time: float = 0.0)
total_calls: int
successful_calls: int
failed_calls: int
state_changes: int
last_failure_time: float
last_success_time: float
success_rate: float
 92    @property
 93    def success_rate(self) -> float:
 94        """Calculate success rate (O(1), no allocations).
 95
 96        :return: Success rate as decimal (0.0 to 1.0)
 97        """
 98        if self.total_calls == 0:
 99            return 1.0
100        return self.successful_calls / self.total_calls

Calculate success rate (O(1), no allocations).

Returns

Success rate as decimal (0.0 to 1.0)

failure_rate: float
102    @property
103    def failure_rate(self) -> float:
104        """Calculate failure rate (O(1), no allocations).
105
106        :return: Failure rate as decimal (0.0 to 1.0)
107        """
108        return 1.0 - self.success_rate

Calculate failure rate (O(1), no allocations).

Returns

Failure rate as decimal (0.0 to 1.0)

def to_dict(self) -> dict[str, int | float]:
110    def to_dict(self) -> dict[str, int | float]:
111        """Convert metrics to dictionary for serialization.
112
113        Pre-computes rates to avoid repeated calculations.
114
115        :return: Dictionary representation
116        """
117        success_rate = self.success_rate  # Calculate once
118        return {
119            "total_calls": self.total_calls,
120            "successful_calls": self.successful_calls,
121            "failed_calls": self.failed_calls,
122            "state_changes": self.state_changes,
123            "success_rate": success_rate,
124            "failure_rate": 1.0 - success_rate,  # Reuse calculation
125            "last_failure_time": self.last_failure_time,
126            "last_success_time": self.last_success_time,
127        }

Convert metrics to dictionary for serialization.

Pre-computes rates to avoid repeated calculations.

Returns

Dictionary representation

class CircuitOpenError(pyoutlineapi.OutlineError):
274class CircuitOpenError(OutlineError):
275    """Circuit breaker is open due to repeated failures.
276
277    Indicates temporary service unavailability. Clients should wait
278    for ``retry_after`` seconds before retrying.
279
280    Attributes:
281        retry_after: Seconds to wait before retry
282
283    Example:
284        >>> error = CircuitOpenError("Circuit open", retry_after=60.0)
285        >>> error.is_retryable  # True
286        >>> error.retry_after  # 60.0
287    """
288
289    __slots__ = ("retry_after",)
290
291    _is_retryable: ClassVar[bool] = True
292
293    def __init__(self, message: str, *, retry_after: float = 60.0) -> None:
294        """Initialize circuit open error.
295
296        Args:
297            message: Error message
298            retry_after: Seconds to wait before retry
299
300        Raises:
301            ValueError: If retry_after is negative
302        """
303        if retry_after < 0:
304            raise ValueError("retry_after must be non-negative")
305
306        # Pre-round for safe_details (avoid repeated rounding)
307        rounded_retry = round(retry_after, 2)
308        safe_details = {"retry_after": rounded_retry}
309        super().__init__(message, safe_details=safe_details)
310
311        self.retry_after = retry_after
312
313    @property
314    def default_retry_delay(self) -> float:
315        """Suggested delay before retry."""
316        return self.retry_after

Circuit breaker is open due to repeated failures.

Indicates temporary service unavailability. Clients should wait for retry_after seconds before retrying.

Attributes:
  • retry_after: Seconds to wait before retry
Example:
>>> error = CircuitOpenError("Circuit open", retry_after=60.0)
>>> error.is_retryable  # True
>>> error.retry_after  # 60.0
CircuitOpenError(message: str, *, retry_after: float = 60.0)
293    def __init__(self, message: str, *, retry_after: float = 60.0) -> None:
294        """Initialize circuit open error.
295
296        Args:
297            message: Error message
298            retry_after: Seconds to wait before retry
299
300        Raises:
301            ValueError: If retry_after is negative
302        """
303        if retry_after < 0:
304            raise ValueError("retry_after must be non-negative")
305
306        # Pre-round for safe_details (avoid repeated rounding)
307        rounded_retry = round(retry_after, 2)
308        safe_details = {"retry_after": rounded_retry}
309        super().__init__(message, safe_details=safe_details)
310
311        self.retry_after = retry_after

Initialize circuit open error.

Arguments:
  • message: Error message
  • retry_after: Seconds to wait before retry
Raises:
  • ValueError: If retry_after is negative
retry_after
default_retry_delay: float
313    @property
314    def default_retry_delay(self) -> float:
315        """Suggested delay before retry."""
316        return self.retry_after

Suggested delay before retry.

class CircuitState(enum.Enum):
36class CircuitState(Enum):
37    """Circuit breaker states.
38
39    CLOSED: Normal operation, requests pass through (hot path)
40    OPEN: Failures exceeded threshold, requests blocked
41    HALF_OPEN: Testing recovery, limited requests allowed
42    """
43
44    CLOSED = auto()
45    OPEN = auto()
46    HALF_OPEN = auto()

Circuit breaker states.

CLOSED: Normal operation, requests pass through (hot path) OPEN: Failures exceeded threshold, requests blocked HALF_OPEN: Testing recovery, limited requests allowed

CLOSED = <CircuitState.CLOSED: 1>
OPEN = <CircuitState.OPEN: 2>
HALF_OPEN = <CircuitState.HALF_OPEN: 3>
class ConfigOverrides(typing.TypedDict):
736class ConfigOverrides(TypedDict, total=False):
737    """Type-safe configuration overrides.
738
739    All fields are optional, allowing selective parameter overriding
740    while maintaining type safety.
741    """
742
743    timeout: int
744    retry_attempts: int
745    max_connections: int
746    rate_limit: int
747    user_agent: str
748    enable_circuit_breaker: bool
749    circuit_failure_threshold: int
750    circuit_recovery_timeout: float
751    circuit_success_threshold: int
752    circuit_call_timeout: float
753    enable_logging: bool
754    json_format: bool
755    allow_private_networks: bool
756    resolve_dns_for_ssrf: bool

Type-safe configuration overrides.

All fields are optional, allowing selective parameter overriding while maintaining type safety.

timeout: int
retry_attempts: int
max_connections: int
rate_limit: int
user_agent: str
enable_circuit_breaker: bool
circuit_failure_threshold: int
circuit_recovery_timeout: float
circuit_success_threshold: int
circuit_call_timeout: float
enable_logging: bool
json_format: bool
allow_private_networks: bool
resolve_dns_for_ssrf: bool
class ConfigurationError(pyoutlineapi.OutlineError):
319class ConfigurationError(OutlineError):
320    """Invalid or missing configuration.
321
322    Attributes:
323        field: Configuration field name that failed
324        security_issue: Whether this is a security-related issue
325
326    Example:
327        >>> error = ConfigurationError(
328        ...     "Missing API URL", field="api_url", security_issue=True
329        ... )
330    """
331
332    __slots__ = ("field", "security_issue")
333
334    def __init__(
335        self,
336        message: str,
337        *,
338        field: str | None = None,
339        security_issue: bool = False,
340    ) -> None:
341        """Initialize configuration error.
342
343        Args:
344            message: Error message
345            field: Configuration field name
346            security_issue: Whether this is a security issue
347        """
348        safe_details: dict[str, Any] | None = None
349        if field or security_issue:
350            safe_details = {}
351            if field:
352                safe_details["field"] = field
353            if security_issue:
354                safe_details["security_issue"] = True
355
356        super().__init__(message, safe_details=safe_details)
357
358        self.field = field
359        self.security_issue = security_issue

Invalid or missing configuration.

Attributes:
  • field: Configuration field name that failed
  • security_issue: Whether this is a security-related issue
Example:
>>> error = ConfigurationError(
...     "Missing API URL", field="api_url", security_issue=True
... )
ConfigurationError( message: str, *, field: str | None = None, security_issue: bool = False)
334    def __init__(
335        self,
336        message: str,
337        *,
338        field: str | None = None,
339        security_issue: bool = False,
340    ) -> None:
341        """Initialize configuration error.
342
343        Args:
344            message: Error message
345            field: Configuration field name
346            security_issue: Whether this is a security issue
347        """
348        safe_details: dict[str, Any] | None = None
349        if field or security_issue:
350            safe_details = {}
351            if field:
352                safe_details["field"] = field
353            if security_issue:
354                safe_details["security_issue"] = True
355
356        super().__init__(message, safe_details=safe_details)
357
358        self.field = field
359        self.security_issue = security_issue

Initialize configuration error.

Arguments:
  • message: Error message
  • field: Configuration field name
  • security_issue: Whether this is a security issue
field
security_issue
class Constants:
 84class Constants:
 85    """Application-wide constants with security limits."""
 86
 87    # Port constraints
 88    MIN_PORT: Final[int] = 1
 89    MAX_PORT: Final[int] = 65535
 90
 91    # Length limits
 92    MAX_NAME_LENGTH: Final[int] = 255
 93    CERT_FINGERPRINT_LENGTH: Final[int] = 64
 94    MAX_KEY_ID_LENGTH: Final[int] = 255
 95    MAX_URL_LENGTH: Final[int] = 2048
 96
 97    # Network defaults
 98    DEFAULT_TIMEOUT: Final[int] = 10
 99    DEFAULT_RETRY_ATTEMPTS: Final[int] = 2
100    DEFAULT_MIN_CONNECTIONS: Final[int] = 1
101    DEFAULT_MAX_CONNECTIONS: Final[int] = 100
102    DEFAULT_RETRY_DELAY: Final[float] = 1.0
103    DEFAULT_MIN_TIMEOUT: Final[int] = 1
104    DEFAULT_MAX_TIMEOUT: Final[int] = 300
105    DEFAULT_USER_AGENT: Final[str] = "PyOutlineAPI/0.4.0"
106    _MIN_RATE_LIMIT: Final[int] = 1
107    _MAX_RATE_LIMIT: Final[int] = 1000
108    _SAFETY_MARGIN: Final[float] = 10.0
109
110    # Resource limits
111    MAX_RECURSION_DEPTH: Final[int] = 10
112    MAX_SNAPSHOT_SIZE_MB: Final[int] = 10
113
114    # HTTP retry codes
115    RETRY_STATUS_CODES: Final[frozenset[int]] = frozenset(
116        {408, 429, 500, 502, 503, 504}
117    )
118
119    # Logging levels
120    LOG_LEVEL_DEBUG: Final[int] = logging.DEBUG
121    LOG_LEVEL_INFO: Final[int] = logging.INFO
122    LOG_LEVEL_WARNING: Final[int] = logging.WARNING
123    LOG_LEVEL_ERROR: Final[int] = logging.ERROR
124
125    # ===== Security limits =====
126
127    # Response size protection (DoS prevention)
128    MAX_RESPONSE_SIZE: Final[int] = 10 * 1024 * 1024  # 10 MB
129    MAX_RESPONSE_CHUNK_SIZE: Final[int] = 8192  # 8 KB chunks
130
131    # Rate limiting defaults
132    DEFAULT_RATE_LIMIT_RPS: Final[float] = 100.0  # Requests per second
133    DEFAULT_RATE_LIMIT_BURST: Final[int] = 200  # Burst capacity
134    DEFAULT_RATE_LIMIT: Final[int] = 100  # Concurrent requests
135
136    # Connection limits
137    MAX_CONNECTIONS_PER_HOST: Final[int] = 50
138    DNS_CACHE_TTL: Final[int] = 300  # 5 minutes
139
140    # Timeout strategies
141    TIMEOUT_WARNING_RATIO: Final[float] = 0.8  # Warn at 80% of timeout
142    MAX_TIMEOUT: Final[int] = 300  # 5 minutes absolute max

Application-wide constants with security limits.

MIN_PORT: Final[int] = 1
MAX_PORT: Final[int] = 65535
MAX_NAME_LENGTH: Final[int] = 255
CERT_FINGERPRINT_LENGTH: Final[int] = 64
MAX_KEY_ID_LENGTH: Final[int] = 255
MAX_URL_LENGTH: Final[int] = 2048
DEFAULT_TIMEOUT: Final[int] = 10
DEFAULT_RETRY_ATTEMPTS: Final[int] = 2
DEFAULT_MIN_CONNECTIONS: Final[int] = 1
DEFAULT_MAX_CONNECTIONS: Final[int] = 100
DEFAULT_RETRY_DELAY: Final[float] = 1.0
DEFAULT_MIN_TIMEOUT: Final[int] = 1
DEFAULT_MAX_TIMEOUT: Final[int] = 300
DEFAULT_USER_AGENT: Final[str] = 'PyOutlineAPI/0.4.0'
MAX_RECURSION_DEPTH: Final[int] = 10
MAX_SNAPSHOT_SIZE_MB: Final[int] = 10
RETRY_STATUS_CODES: Final[frozenset[int]] = frozenset({500, 502, 503, 408, 504, 429})
LOG_LEVEL_DEBUG: Final[int] = 10
LOG_LEVEL_INFO: Final[int] = 20
LOG_LEVEL_WARNING: Final[int] = 30
LOG_LEVEL_ERROR: Final[int] = 40
MAX_RESPONSE_SIZE: Final[int] = 10485760
MAX_RESPONSE_CHUNK_SIZE: Final[int] = 8192
DEFAULT_RATE_LIMIT_RPS: Final[float] = 100.0
DEFAULT_RATE_LIMIT_BURST: Final[int] = 200
DEFAULT_RATE_LIMIT: Final[int] = 100
MAX_CONNECTIONS_PER_HOST: Final[int] = 50
DNS_CACHE_TTL: Final[int] = 300
TIMEOUT_WARNING_RATIO: Final[float] = 0.8
MAX_TIMEOUT: Final[int] = 300
class CredentialSanitizer:
281class CredentialSanitizer:
282    """Sanitize credentials from strings and exceptions."""
283
284    # Patterns for detecting credentials
285    PATTERNS: Final[list[tuple[re.Pattern[str], str]]] = [
286        (
287            re.compile(
288                r'api[_-]?key["\']?\s*[:=]\s*["\']?([a-zA-Z0-9_\-\.]{20,})',
289                re.IGNORECASE,
290            ),
291            "***API_KEY***",
292        ),
293        (
294            re.compile(
295                r'token["\']?\s*[:=]\s*["\']?([a-zA-Z0-9_\-.]{20,})', re.IGNORECASE
296            ),
297            "***TOKEN***",
298        ),
299        (
300            re.compile(r'password["\']?\s*[:=]\s*["\']?([^\s"\']+)', re.IGNORECASE),
301            "***PASSWORD***",
302        ),
303        (
304            re.compile(
305                r'cert[_-]?sha256["\']?\s*[:=]\s*["\']?([a-f0-9]{64})', re.IGNORECASE
306            ),
307            "***CERT***",
308        ),
309        (
310            re.compile(r"bearer\s+([a-zA-Z0-9\-._~+/]+=*)", re.IGNORECASE),
311            "Bearer ***TOKEN***",
312        ),
313        (
314            re.compile(r"access_url['\"]?\s*[:=]\s*['\"]?([^\s'\"]+)", re.IGNORECASE),
315            "***ACCESS_URL***",
316        ),
317    ]
318
319    @classmethod
320    @lru_cache(maxsize=512)
321    def sanitize(cls, text: str) -> str:
322        """Remove credentials from string.
323
324        :param text: Text that may contain credentials
325        :return: Sanitized text
326        """
327        if not text:
328            return text
329
330        sanitized = text
331        for pattern, replacement in cls.PATTERNS:
332            sanitized = pattern.sub(replacement, sanitized)
333        return sanitized

Sanitize credentials from strings and exceptions.

PATTERNS: Final[list[tuple[re.Pattern[str], str]]] = [(re.compile('api[_-]?key["\\\']?\\s*[:=]\\s*["\\\']?([a-zA-Z0-9_\\-\\.]{20,})', re.IGNORECASE), '***API_KEY***'), (re.compile('token["\\\']?\\s*[:=]\\s*["\\\']?([a-zA-Z0-9_\\-.]{20,})', re.IGNORECASE), '***TOKEN***'), (re.compile('password["\\\']?\\s*[:=]\\s*["\\\']?([^\\s"\\\']+)', re.IGNORECASE), '***PASSWORD***'), (re.compile('cert[_-]?sha256["\\\']?\\s*[:=]\\s*["\\\']?([a-f0-9]{64})', re.IGNORECASE), '***CERT***'), (re.compile('bearer\\s+([a-zA-Z0-9\\-._~+/]+=*)', re.IGNORECASE), 'Bearer ***TOKEN***'), (re.compile('access_url[\'\\"]?\\s*[:=]\\s*[\'\\"]?([^\\s\'\\"]+)', re.IGNORECASE), '***ACCESS_URL***')]
@classmethod
@lru_cache(maxsize=512)
def sanitize(cls, text: str) -> str:
319    @classmethod
320    @lru_cache(maxsize=512)
321    def sanitize(cls, text: str) -> str:
322        """Remove credentials from string.
323
324        :param text: Text that may contain credentials
325        :return: Sanitized text
326        """
327        if not text:
328            return text
329
330        sanitized = text
331        for pattern, replacement in cls.PATTERNS:
332            sanitized = pattern.sub(replacement, sanitized)
333        return sanitized

Remove credentials from string.

Parameters
  • text: Text that may contain credentials
Returns

Sanitized text

class DataLimit(pyoutlineapi.common_types.BaseValidatedModel, pyoutlineapi.models.ByteConversionMixin):
104class DataLimit(BaseValidatedModel, ByteConversionMixin):
105    """Data transfer limit in bytes with unit conversions."""
106
107    bytes: Bytes
108
109    @classmethod
110    def from_kilobytes(cls, kb: float) -> Self:
111        """Create DataLimit from kilobytes.
112
113        :param kb: Size in kilobytes
114        :return: DataLimit instance
115        """
116        return cls(bytes=int(kb * _BYTES_IN_KB))
117
118    @classmethod
119    def from_megabytes(cls, mb: float) -> Self:
120        """Create DataLimit from megabytes.
121
122        :param mb: Size in megabytes
123        :return: DataLimit instance
124        """
125        return cls(bytes=int(mb * _BYTES_IN_MB))
126
127    @classmethod
128    def from_gigabytes(cls, gb: float) -> Self:
129        """Create DataLimit from gigabytes.
130
131        :param gb: Size in gigabytes
132        :return: DataLimit instance
133        """
134        return cls(bytes=int(gb * _BYTES_IN_GB))

Data transfer limit in bytes with unit conversions.

bytes: Annotated[int, FieldInfo(annotation=NoneType, required=True, description='Size in bytes', metadata=[Ge(ge=0)])] = PydanticUndefined

Size in bytes

@classmethod
def from_kilobytes(cls, kb: float) -> Self:
109    @classmethod
110    def from_kilobytes(cls, kb: float) -> Self:
111        """Create DataLimit from kilobytes.
112
113        :param kb: Size in kilobytes
114        :return: DataLimit instance
115        """
116        return cls(bytes=int(kb * _BYTES_IN_KB))

Create DataLimit from kilobytes.

Parameters
  • kb: Size in kilobytes
Returns

DataLimit instance

@classmethod
def from_megabytes(cls, mb: float) -> Self:
118    @classmethod
119    def from_megabytes(cls, mb: float) -> Self:
120        """Create DataLimit from megabytes.
121
122        :param mb: Size in megabytes
123        :return: DataLimit instance
124        """
125        return cls(bytes=int(mb * _BYTES_IN_MB))

Create DataLimit from megabytes.

Parameters
  • mb: Size in megabytes
Returns

DataLimit instance

@classmethod
def from_gigabytes(cls, gb: float) -> Self:
127    @classmethod
128    def from_gigabytes(cls, gb: float) -> Self:
129        """Create DataLimit from gigabytes.
130
131        :param gb: Size in gigabytes
132        :return: DataLimit instance
133        """
134        return cls(bytes=int(gb * _BYTES_IN_GB))

Create DataLimit from gigabytes.

Parameters
  • gb: Size in gigabytes
Returns

DataLimit instance

class DataLimitRequest(pyoutlineapi.common_types.BaseValidatedModel):
540class DataLimitRequest(BaseValidatedModel):
541    """Request model for setting data limit.
542
543    Note:
544        The API expects the DataLimit object directly.
545        Use to_payload() to produce the correct request body.
546    """
547
548    limit: DataLimit
549
550    def to_payload(self) -> dict[str, dict[str, int]]:
551        """Convert to API request payload.
552
553        :return: Payload dict with limit object
554        """
555        return {"limit": cast(dict[str, int], self.limit.model_dump(by_alias=True))}

Request model for setting data limit.

Note:

The API expects the DataLimit object directly. Use to_payload() to produce the correct request body.

limit: DataLimit = PydanticUndefined
def to_payload(self) -> dict[str, dict[str, int]]:
550    def to_payload(self) -> dict[str, dict[str, int]]:
551        """Convert to API request payload.
552
553        :return: Payload dict with limit object
554        """
555        return {"limit": cast(dict[str, int], self.limit.model_dump(by_alias=True))}

Convert to API request payload.

Returns

Payload dict with limit object

class DataTransferred(pyoutlineapi.common_types.BaseValidatedModel, pyoutlineapi.models.ByteConversionMixin):
372class DataTransferred(BaseValidatedModel, ByteConversionMixin):
373    """Data transfer metric with byte conversions.
374
375    SCHEMA: Based on experimental metrics dataTransferred object
376    """
377
378    bytes: Bytes

Data transfer metric with byte conversions.

SCHEMA: Based on experimental metrics dataTransferred object

bytes: Annotated[int, FieldInfo(annotation=NoneType, required=True, description='Size in bytes', metadata=[Ge(ge=0)])] = PydanticUndefined

Size in bytes

class DefaultAuditLogger:
273class DefaultAuditLogger:
274    """Async audit logger with batching and backpressure handling."""
275
276    __slots__ = (
277        "_batch_size",
278        "_batch_timeout",
279        "_lock",
280        "_queue",
281        "_queue_size",
282        "_shutdown_event",
283        "_task",
284    )
285
286    def __init__(
287        self,
288        *,
289        queue_size: int = 10000,
290        batch_size: int = 100,
291        batch_timeout: float = 1.0,
292    ) -> None:
293        """Initialize audit logger with batching support.
294
295        :param queue_size: Maximum queue size (backpressure protection)
296        :param batch_size: Maximum batch size for processing
297        :param batch_timeout: Maximum time to wait for batch completion (seconds)
298        """
299        self._queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=queue_size)
300        self._queue_size = queue_size
301        self._batch_size = batch_size
302        self._batch_timeout = batch_timeout
303        self._task: asyncio.Task[None] | None = None
304        self._shutdown_event = asyncio.Event()
305        self._lock = asyncio.Lock()
306
307    async def alog_action(
308        self,
309        action: str,
310        resource: str,
311        *,
312        user: str | None = None,
313        details: dict[str, Any] | None = None,
314        correlation_id: str | None = None,
315    ) -> None:
316        """Log auditable action asynchronously with automatic batching.
317
318        :param action: Action being performed
319        :param resource: Resource identifier
320        :param user: User performing the action (optional)
321        :param details: Additional structured details (optional)
322        :param correlation_id: Request correlation ID (optional)
323        """
324        if self._shutdown_event.is_set():
325            # Fallback to sync logging during shutdown
326            return self.log_action(
327                action,
328                resource,
329                user=user,
330                details=details,
331                correlation_id=correlation_id,
332            )
333
334        # Ensure background task is running
335        await self._ensure_task_running()
336
337        # Build log entry
338        entry = self._build_entry(action, resource, user, details, correlation_id)
339
340        # Try to enqueue, handle backpressure
341        try:
342            self._queue.put_nowait(entry)
343        except asyncio.QueueFull:
344            # Backpressure: log warning and use sync fallback
345            if logger.isEnabledFor(logging.WARNING):
346                logger.warning(
347                    "[AUDIT] Queue full (%d items), using sync fallback",
348                    self._queue_size,
349                )
350            self.log_action(
351                action,
352                resource,
353                user=user,
354                details=details,
355                correlation_id=correlation_id,
356            )
357
358    def log_action(
359        self,
360        action: str,
361        resource: str,
362        *,
363        user: str | None = None,
364        details: dict[str, Any] | None = None,
365        correlation_id: str | None = None,
366    ) -> None:
367        """Log auditable action synchronously (fallback method).
368
369        :param action: Action being performed
370        :param resource: Resource identifier
371        :param user: User performing the action (optional)
372        :param details: Additional structured details (optional)
373        :param correlation_id: Request correlation ID (optional)
374        """
375        entry = self._build_entry(action, resource, user, details, correlation_id)
376        self._write_log(entry)
377
378    async def _ensure_task_running(self) -> None:
379        """Ensure background processing task is running (lazy start with lock)."""
380        if self._task is not None and not self._task.done():
381            return
382
383        async with self._lock:
384            # Double-check after acquiring lock
385            if self._task is None or self._task.done():
386                self._task = asyncio.create_task(
387                    self._process_queue(), name="audit-logger"
388                )
389
390    async def _process_queue(self) -> None:
391        """Background task for processing audit logs in batches.
392
393        Uses batching for improved throughput and reduced I/O overhead.
394        """
395        batch: list[dict[str, Any]] = []
396
397        try:
398            while not self._shutdown_event.is_set():
399                try:
400                    # Wait for item with timeout for batch processing
401                    entry = await asyncio.wait_for(
402                        self._queue.get(), timeout=self._batch_timeout
403                    )
404                    batch.append(entry)
405
406                    # Process batch when size reached or queue empty
407                    if len(batch) >= self._batch_size or self._queue.empty():
408                        self._write_batch(batch)
409                        batch.clear()
410
411                    self._queue.task_done()
412
413                except asyncio.TimeoutError:
414                    # Timeout: flush partial batch if any
415                    if batch:
416                        self._write_batch(batch)
417                        batch.clear()
418
419        except asyncio.CancelledError:
420            # Flush remaining batch on cancellation
421            if batch:
422                self._write_batch(batch)
423            raise
424        finally:
425            if logger.isEnabledFor(logging.DEBUG):
426                logger.debug("[AUDIT] Queue processor stopped")
427
428    def _write_batch(self, batch: list[dict[str, Any]]) -> None:
429        """Write batch of log entries efficiently.
430
431        :param batch: Batch of log entries to write
432        """
433        for entry in batch:
434            self._write_log(entry)
435
436    def _write_log(self, entry: dict[str, Any]) -> None:
437        """Write single log entry to logger.
438
439        :param entry: Log entry to write
440        """
441        message = self._format_message(entry)
442        logger.info(message, extra=entry)
443
444    @staticmethod
445    def _build_entry(
446        action: str,
447        resource: str,
448        user: str | None,
449        details: dict[str, Any] | None,
450        correlation_id: str | None,
451    ) -> dict[str, Any]:
452        """Build structured log entry with sanitization.
453
454        :param action: Action being performed
455        :param resource: Resource identifier
456        :param user: User performing action
457        :param details: Additional details
458        :param correlation_id: Correlation ID
459        :return: Structured log entry
460        """
461        entry: dict[str, Any] = {
462            "action": action,
463            "resource": resource,
464            "timestamp": time.time(),
465            "is_audit": True,
466        }
467
468        if user is not None:
469            entry["user"] = user
470        if correlation_id is not None:
471            entry["correlation_id"] = correlation_id
472        if details is not None:
473            entry["details"] = _sanitize_details(details)
474
475        return entry
476
477    @staticmethod
478    def _format_message(entry: dict[str, Any]) -> str:
479        """Format audit log message for human readability.
480
481        :param entry: Log entry
482        :return: Formatted message
483        """
484        action = entry["action"]
485        resource = entry["resource"]
486        user = entry.get("user")
487        correlation_id = entry.get("correlation_id")
488
489        parts = ["[AUDIT]", action, "on", resource]
490
491        if user:
492            parts.extend(["by", user])
493        if correlation_id:
494            parts.append(f"[{correlation_id}]")
495
496        return " ".join(parts)
497
498    async def shutdown(self, *, timeout: float = 5.0) -> None:
499        """Gracefully shutdown audit logger with queue draining.
500
501        :param timeout: Maximum time to wait for queue to drain (seconds)
502        """
503        async with self._lock:
504            if self._shutdown_event.is_set():
505                return
506
507            self._shutdown_event.set()
508
509            if logger.isEnabledFor(logging.DEBUG):
510                logger.debug("[AUDIT] Shutting down, draining queue")
511
512            # Wait for queue to drain
513            try:
514                await asyncio.wait_for(self._queue.join(), timeout=timeout)
515            except asyncio.TimeoutError:
516                remaining = self._queue.qsize()
517                if logger.isEnabledFor(logging.WARNING):
518                    logger.warning(
519                        "[AUDIT] Queue did not drain within %ss, %d items remaining",
520                        timeout,
521                        remaining,
522                    )
523
524            # Cancel processing task
525            if self._task and not self._task.done():
526                self._task.cancel()
527                with suppress(asyncio.CancelledError):
528                    await self._task
529
530            if logger.isEnabledFor(logging.DEBUG):
531                logger.debug("[AUDIT] Shutdown complete")

Async audit logger with batching and backpressure handling.

DefaultAuditLogger( *, queue_size: int = 10000, batch_size: int = 100, batch_timeout: float = 1.0)
286    def __init__(
287        self,
288        *,
289        queue_size: int = 10000,
290        batch_size: int = 100,
291        batch_timeout: float = 1.0,
292    ) -> None:
293        """Initialize audit logger with batching support.
294
295        :param queue_size: Maximum queue size (backpressure protection)
296        :param batch_size: Maximum batch size for processing
297        :param batch_timeout: Maximum time to wait for batch completion (seconds)
298        """
299        self._queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=queue_size)
300        self._queue_size = queue_size
301        self._batch_size = batch_size
302        self._batch_timeout = batch_timeout
303        self._task: asyncio.Task[None] | None = None
304        self._shutdown_event = asyncio.Event()
305        self._lock = asyncio.Lock()

Initialize audit logger with batching support.

Parameters
  • queue_size: Maximum queue size (backpressure protection)
  • batch_size: Maximum batch size for processing
  • batch_timeout: Maximum time to wait for batch completion (seconds)
async def alog_action( self, action: str, resource: str, *, user: str | None = None, details: dict[str, typing.Any] | None = None, correlation_id: str | None = None) -> None:
307    async def alog_action(
308        self,
309        action: str,
310        resource: str,
311        *,
312        user: str | None = None,
313        details: dict[str, Any] | None = None,
314        correlation_id: str | None = None,
315    ) -> None:
316        """Log auditable action asynchronously with automatic batching.
317
318        :param action: Action being performed
319        :param resource: Resource identifier
320        :param user: User performing the action (optional)
321        :param details: Additional structured details (optional)
322        :param correlation_id: Request correlation ID (optional)
323        """
324        if self._shutdown_event.is_set():
325            # Fallback to sync logging during shutdown
326            return self.log_action(
327                action,
328                resource,
329                user=user,
330                details=details,
331                correlation_id=correlation_id,
332            )
333
334        # Ensure background task is running
335        await self._ensure_task_running()
336
337        # Build log entry
338        entry = self._build_entry(action, resource, user, details, correlation_id)
339
340        # Try to enqueue, handle backpressure
341        try:
342            self._queue.put_nowait(entry)
343        except asyncio.QueueFull:
344            # Backpressure: log warning and use sync fallback
345            if logger.isEnabledFor(logging.WARNING):
346                logger.warning(
347                    "[AUDIT] Queue full (%d items), using sync fallback",
348                    self._queue_size,
349                )
350            self.log_action(
351                action,
352                resource,
353                user=user,
354                details=details,
355                correlation_id=correlation_id,
356            )

Log auditable action asynchronously with automatic batching.

Parameters
  • action: Action being performed
  • resource: Resource identifier
  • user: User performing the action (optional)
  • details: Additional structured details (optional)
  • correlation_id: Request correlation ID (optional)
def log_action( self, action: str, resource: str, *, user: str | None = None, details: dict[str, typing.Any] | None = None, correlation_id: str | None = None) -> None:
358    def log_action(
359        self,
360        action: str,
361        resource: str,
362        *,
363        user: str | None = None,
364        details: dict[str, Any] | None = None,
365        correlation_id: str | None = None,
366    ) -> None:
367        """Log auditable action synchronously (fallback method).
368
369        :param action: Action being performed
370        :param resource: Resource identifier
371        :param user: User performing the action (optional)
372        :param details: Additional structured details (optional)
373        :param correlation_id: Request correlation ID (optional)
374        """
375        entry = self._build_entry(action, resource, user, details, correlation_id)
376        self._write_log(entry)

Log auditable action synchronously (fallback method).

Parameters
  • action: Action being performed
  • resource: Resource identifier
  • user: User performing the action (optional)
  • details: Additional structured details (optional)
  • correlation_id: Request correlation ID (optional)
async def shutdown(self, *, timeout: float = 5.0) -> None:
498    async def shutdown(self, *, timeout: float = 5.0) -> None:
499        """Gracefully shutdown audit logger with queue draining.
500
501        :param timeout: Maximum time to wait for queue to drain (seconds)
502        """
503        async with self._lock:
504            if self._shutdown_event.is_set():
505                return
506
507            self._shutdown_event.set()
508
509            if logger.isEnabledFor(logging.DEBUG):
510                logger.debug("[AUDIT] Shutting down, draining queue")
511
512            # Wait for queue to drain
513            try:
514                await asyncio.wait_for(self._queue.join(), timeout=timeout)
515            except asyncio.TimeoutError:
516                remaining = self._queue.qsize()
517                if logger.isEnabledFor(logging.WARNING):
518                    logger.warning(
519                        "[AUDIT] Queue did not drain within %ss, %d items remaining",
520                        timeout,
521                        remaining,
522                    )
523
524            # Cancel processing task
525            if self._task and not self._task.done():
526                self._task.cancel()
527                with suppress(asyncio.CancelledError):
528                    await self._task
529
530            if logger.isEnabledFor(logging.DEBUG):
531                logger.debug("[AUDIT] Shutdown complete")

Gracefully shutdown audit logger with queue draining.

Parameters
  • timeout: Maximum time to wait for queue to drain (seconds)
class DevelopmentConfig(pyoutlineapi.OutlineClientConfig):
463class DevelopmentConfig(OutlineClientConfig):
464    """Development configuration with relaxed security.
465
466    Optimized for local development and testing with:
467    - Extended timeouts for debugging
468    - Detailed logging enabled by default
469    - Circuit breaker disabled for easier testing
470    """
471
472    model_config = SettingsConfigDict(
473        env_prefix=_DEV_ENV_PREFIX,
474        env_file=".env.dev",
475        case_sensitive=False,
476        extra="forbid",
477    )
478
479    enable_logging: bool = True
480    enable_circuit_breaker: bool = False
481    timeout: int = 30

Development configuration with relaxed security.

Optimized for local development and testing with:

  • Extended timeouts for debugging
  • Detailed logging enabled by default
  • Circuit breaker disabled for easier testing
enable_logging: bool = True
enable_circuit_breaker: bool = False
timeout: int = 30
class ErrorResponse(pyoutlineapi.common_types.BaseValidatedModel):
580class ErrorResponse(BaseValidatedModel):
581    """Error response with optimized string formatting.
582
583    SCHEMA: Based on API error response format
584    """
585
586    code: str
587    message: str
588
589    def __str__(self) -> str:
590        """Format error as string (optimized f-string).
591
592        :return: Formatted error message
593        """
594        return f"{self.code}: {self.message}"

Error response with optimized string formatting.

SCHEMA: Based on API error response format

code: str = PydanticUndefined
message: str = PydanticUndefined
class ExperimentalMetrics(pyoutlineapi.common_types.BaseValidatedModel):
467class ExperimentalMetrics(BaseValidatedModel):
468    """Experimental metrics with optimized lookup.
469
470    SCHEMA: Based on GET /experimental/server/metrics response
471    """
472
473    server: ServerExperimentalMetric
474    access_keys: list[AccessKeyMetric] = Field(alias="accessKeys")
475
476    def get_key_metric(self, key_id: str) -> AccessKeyMetric | None:
477        """Get metrics for specific key with early return.
478
479        :param key_id: Access key ID
480        :return: Key metrics or None if not found
481        """
482        for metric in self.access_keys:
483            if metric.access_key_id == key_id:
484                return metric  # Early return
485        return None

Experimental metrics with optimized lookup.

SCHEMA: Based on GET /experimental/server/metrics response

server: ServerExperimentalMetric = PydanticUndefined
access_keys: list[AccessKeyMetric] = PydanticUndefined
def get_key_metric(self, key_id: str) -> AccessKeyMetric | None:
476    def get_key_metric(self, key_id: str) -> AccessKeyMetric | None:
477        """Get metrics for specific key with early return.
478
479        :param key_id: Access key ID
480        :return: Key metrics or None if not found
481        """
482        for metric in self.access_keys:
483            if metric.access_key_id == key_id:
484                return metric  # Early return
485        return None

Get metrics for specific key with early return.

Parameters
  • key_id: Access key ID
Returns

Key metrics or None if not found

class HealthCheckResult(pyoutlineapi.common_types.BaseValidatedModel):
600class HealthCheckResult(BaseValidatedModel):
601    """Health check result with optimized diagnostics."""
602
603    healthy: bool
604    timestamp: float
605    checks: ChecksDict
606
607    @cached_property
608    def failed_checks(self) -> list[str]:
609        """Get failed checks (cached for repeated access).
610
611        :return: List of failed check names
612        """
613        return [
614            name
615            for name, result in self.checks.items()
616            if result.get("status") != "healthy"
617        ]
618
619    @property
620    def success_rate(self) -> float:
621        """Calculate success rate (uses cached failed_checks).
622
623        :return: Success rate (0.0 to 1.0)
624        """
625        if not self.checks:
626            return 1.0  # Early return
627
628        total = len(self.checks)
629        passed = total - len(self.failed_checks)  # Uses cached property
630        return passed / total

Health check result with optimized diagnostics.

healthy: bool = PydanticUndefined
timestamp: float = PydanticUndefined
checks: dict[str, dict[str, typing.Any]] = PydanticUndefined
failed_checks: list[str]
607    @cached_property
608    def failed_checks(self) -> list[str]:
609        """Get failed checks (cached for repeated access).
610
611        :return: List of failed check names
612        """
613        return [
614            name
615            for name, result in self.checks.items()
616            if result.get("status") != "healthy"
617        ]

Get failed checks (cached for repeated access).

Returns

List of failed check names

success_rate: float
619    @property
620    def success_rate(self) -> float:
621        """Calculate success rate (uses cached failed_checks).
622
623        :return: Success rate (0.0 to 1.0)
624        """
625        if not self.checks:
626            return 1.0  # Early return
627
628        total = len(self.checks)
629        passed = total - len(self.failed_checks)  # Uses cached property
630        return passed / total

Calculate success rate (uses cached failed_checks).

Returns

Success rate (0.0 to 1.0)

class HostnameRequest(pyoutlineapi.common_types.BaseValidatedModel):
513class HostnameRequest(BaseValidatedModel):
514    """Request model for setting hostname.
515
516    SCHEMA: Based on PUT /server/hostname-for-access-keys request body
517    """
518
519    hostname: str = Field(min_length=1)

Request model for setting hostname.

SCHEMA: Based on PUT /server/hostname-for-access-keys request body

hostname: str = PydanticUndefined
JsonDict = dict[str, 'JsonValue']
JsonPayload = dict[str, 'JsonValue'] | list['JsonValue'] | None
class LocationMetric(pyoutlineapi.common_types.BaseValidatedModel):
410class LocationMetric(BaseValidatedModel):
411    """Location-based usage metric.
412
413    SCHEMA: Based on experimental metrics locations array item
414    """
415
416    location: str
417    asn: int | None = None
418    as_org: str | None = Field(None, alias="asOrg")
419    tunnel_time: TunnelTime = Field(alias="tunnelTime")
420    data_transferred: DataTransferred = Field(alias="dataTransferred")

Location-based usage metric.

SCHEMA: Based on experimental metrics locations array item

location: str = PydanticUndefined
asn: int | None = None
as_org: str | None = None
tunnel_time: TunnelTime = PydanticUndefined
data_transferred: DataTransferred = PydanticUndefined
class MetricsCollector(typing.Protocol):
70class MetricsCollector(Protocol):
71    """Protocol for metrics collection.
72
73    Allows dependency injection of custom metrics backends.
74    """
75
76    def increment(self, metric: str, *, tags: MetricsTags | None = None) -> None:
77        """Increment counter metric."""
78        ...  # pragma: no cover
79
80    def timing(
81        self, metric: str, value: float, *, tags: MetricsTags | None = None
82    ) -> None:
83        """Record timing metric."""
84        ...  # pragma: no cover
85
86    def gauge(
87        self, metric: str, value: float, *, tags: MetricsTags | None = None
88    ) -> None:
89        """Set gauge metric."""
90        ...  # pragma: no cover

Protocol for metrics collection.

Allows dependency injection of custom metrics backends.

MetricsCollector(*args, **kwargs)
1957def _no_init_or_replace_init(self, *args, **kwargs):
1958    cls = type(self)
1959
1960    if cls._is_protocol:
1961        raise TypeError('Protocols cannot be instantiated')
1962
1963    # Already using a custom `__init__`. No need to calculate correct
1964    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1965    if cls.__init__ is not _no_init_or_replace_init:
1966        return
1967
1968    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1969    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1970    # searches for a proper new `__init__` in the MRO. The new `__init__`
1971    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1972    # instantiation of the protocol subclass will thus use the new
1973    # `__init__` and no longer call `_no_init_or_replace_init`.
1974    for base in cls.__mro__:
1975        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1976        if init is not _no_init_or_replace_init:
1977            cls.__init__ = init
1978            break
1979    else:
1980        # should not happen
1981        cls.__init__ = object.__init__
1982
1983    cls.__init__(self, *args, **kwargs)
def increment(self, metric: str, *, tags: dict[str, str] | None = None) -> None:
76    def increment(self, metric: str, *, tags: MetricsTags | None = None) -> None:
77        """Increment counter metric."""
78        ...  # pragma: no cover

Increment counter metric.

def timing( self, metric: str, value: float, *, tags: dict[str, str] | None = None) -> None:
80    def timing(
81        self, metric: str, value: float, *, tags: MetricsTags | None = None
82    ) -> None:
83        """Record timing metric."""
84        ...  # pragma: no cover

Record timing metric.

def gauge( self, metric: str, value: float, *, tags: dict[str, str] | None = None) -> None:
86    def gauge(
87        self, metric: str, value: float, *, tags: MetricsTags | None = None
88    ) -> None:
89        """Set gauge metric."""
90        ...  # pragma: no cover

Set gauge metric.

class MetricsEnabledRequest(pyoutlineapi.common_types.BaseValidatedModel):
558class MetricsEnabledRequest(BaseValidatedModel):
559    """Request model for enabling/disabling metrics.
560
561    SCHEMA: Based on PUT /metrics/enabled request body
562    """
563
564    metrics_enabled: bool = Field(alias="metricsEnabled")

Request model for enabling/disabling metrics.

SCHEMA: Based on PUT /metrics/enabled request body

metrics_enabled: bool = PydanticUndefined
class MetricsStatusResponse(pyoutlineapi.common_types.BaseValidatedModel):
567class MetricsStatusResponse(BaseValidatedModel):
568    """Response model for metrics status.
569
570    Returns current metrics sharing status.
571    SCHEMA: Based on GET /metrics/enabled response
572    """
573
574    metrics_enabled: bool = Field(alias="metricsEnabled")

Response model for metrics status.

Returns current metrics sharing status. SCHEMA: Based on GET /metrics/enabled response

metrics_enabled: bool = PydanticUndefined
MetricsTags = dict[str, str]
class MultiServerManager:
553class MultiServerManager:
554    """High-performance manager for multiple Outline servers.
555
556    Features:
557    - Concurrent operations across all servers
558    - Health checking and automatic failover
559    - Aggregated metrics and status
560    - Graceful shutdown with cleanup
561    - Thread-safe operations
562
563    Limits:
564    - Maximum 50 servers (configurable via _MAX_SERVERS)
565    - Automatic cleanup with weak references
566    """
567
568    __slots__ = (
569        "_audit_logger",
570        "_clients",
571        "_configs",
572        "_default_timeout",
573        "_lock",
574        "_metrics",
575    )
576
577    def __init__(
578        self,
579        configs: Sequence[OutlineClientConfig],
580        *,
581        audit_logger: AuditLogger | None = None,
582        metrics: MetricsCollector | None = None,
583        default_timeout: float = _DEFAULT_SERVER_TIMEOUT,
584    ) -> None:
585        """Initialize multiserver manager.
586
587        :param configs: Sequence of server configurations
588        :param audit_logger: Shared audit logger for all servers
589        :param metrics: Shared metrics collector for all servers
590        :param default_timeout: Default timeout for operations (seconds)
591        :raises ConfigurationError: If too many servers or invalid configs
592        """
593        if len(configs) > _MAX_SERVERS:
594            raise ConfigurationError(
595                f"Too many servers: {len(configs)} (max: {_MAX_SERVERS})"
596            )
597
598        if not configs:
599            raise ConfigurationError("At least one server configuration required")
600
601        self._configs = list(configs)
602        self._clients: dict[str, AsyncOutlineClient] = {}
603        self._audit_logger = audit_logger
604        self._metrics = metrics
605        self._default_timeout = default_timeout
606        self._lock = asyncio.Lock()
607
608    @property
609    def server_count(self) -> int:
610        """Get total number of configured servers.
611
612        :return: Number of servers
613        """
614        return len(self._configs)
615
616    @property
617    def active_servers(self) -> int:
618        """Get number of active (connected) servers.
619
620        :return: Number of active servers
621        """
622        return sum(1 for client in self._clients.values() if client.is_connected)
623
624    def get_server_names(self) -> list[str]:
625        """Get list of sanitized server URLs.
626
627        URLs are sanitized to remove sensitive path information.
628
629        :return: List of safe server identifiers
630        """
631        return [
632            Validators.sanitize_url_for_logging(config.api_url)
633            for config in self._configs
634        ]
635
636    async def __aenter__(self) -> MultiServerManager:
637        """Async context manager entry.
638
639        :return: Self reference
640        :raises ConfigurationError: If NO servers can be initialized
641        """
642        async with self._lock:
643            # Create initialization tasks for concurrent execution
644            init_tasks = []
645            for config in self._configs:
646                client = AsyncOutlineClient(
647                    config=config,
648                    audit_logger=self._audit_logger,
649                    metrics=self._metrics,
650                )
651                init_tasks.append((config, client.__aenter__()))
652
653            results = await asyncio.gather(
654                *[task for _, task in init_tasks],
655                return_exceptions=True,
656            )
657
658            # Process results
659            errors: list[str] = []
660            for idx, ((config, _), result) in enumerate(
661                zip(init_tasks, results, strict=True)
662            ):
663                safe_url = Validators.sanitize_url_for_logging(config.api_url)
664
665                if isinstance(result, Exception):
666                    error_msg = f"Failed to initialize server {safe_url}: {result}"
667                    errors.append(error_msg)
668                    if logger.isEnabledFor(logging.WARNING):
669                        logger.warning(error_msg)
670                else:
671                    # Get the client that was initialized
672                    client = AsyncOutlineClient(
673                        config=config,
674                        audit_logger=self._audit_logger,
675                        metrics=self._metrics,
676                    )
677                    self._clients[safe_url] = client
678
679                    if logger.isEnabledFor(logging.INFO):
680                        logger.info(
681                            "Server %d/%d initialized: %s",
682                            idx + 1,
683                            len(self._configs),
684                            safe_url,
685                        )
686
687            if not self._clients:
688                raise ConfigurationError(
689                    f"Failed to initialize any servers. Errors: {'; '.join(errors)}"
690                )
691
692            if logger.isEnabledFor(logging.INFO):
693                logger.info(
694                    "MultiServerManager ready: %d/%d servers active",
695                    len(self._clients),
696                    len(self._configs),
697                )
698
699        return self
700
701    async def __aexit__(
702        self,
703        exc_type: type[BaseException] | None,
704        exc_val: BaseException | None,
705        exc_tb: object | None,
706    ) -> bool:
707        """Async context manager exit.
708
709        :param exc_type: Exception type
710        :param exc_val: Exception value
711        :param exc_tb: Exception traceback
712        :return: False to propagate exceptions
713        """
714        async with self._lock:
715            shutdown_tasks = [
716                client.__aexit__(None, None, None) for client in self._clients.values()
717            ]
718
719            results = await asyncio.gather(*shutdown_tasks, return_exceptions=True)
720
721            errors = [
722                f"{server_id}: {result}"
723                for (server_id, _), result in zip(
724                    self._clients.items(), results, strict=False
725                )
726                if isinstance(result, Exception)
727            ]
728
729            self._clients.clear()
730
731            if errors and logger.isEnabledFor(logging.WARNING):
732                logger.warning("Shutdown completed with %d error(s)", len(errors))
733
734        return False
735
736    def get_client(self, server_identifier: str | int) -> AsyncOutlineClient:
737        """Get client by server identifier or index.
738
739        :param server_identifier: Server URL (sanitized) or 0-based index
740        :return: Client instance
741        :raises KeyError: If server not found
742        :raises IndexError: If index out of range
743        """
744        # Try as index first (fast path for common case)
745        if isinstance(server_identifier, int):
746            if 0 <= server_identifier < len(self._configs):
747                config = self._configs[server_identifier]
748                safe_url = Validators.sanitize_url_for_logging(config.api_url)
749                return self._clients[safe_url]
750            raise IndexError(
751                f"Server index {server_identifier} out of range (0-{len(self._configs) - 1})"
752            )
753
754        # Try as server ID
755        if server_identifier in self._clients:
756            return self._clients[server_identifier]
757
758        raise KeyError(f"Server not found: {server_identifier}")
759
760    def get_all_clients(self) -> list[AsyncOutlineClient]:
761        """Get all active clients.
762
763        :return: List of client instances
764        """
765        return list(self._clients.values())
766
767    async def health_check_all(
768        self,
769        timeout: float | None = None,
770    ) -> dict[str, dict[str, Any]]:
771        """Perform health check on all servers concurrently.
772
773        :param timeout: Timeout for each health check
774        :return: Dictionary mapping server IDs to health check results
775        """
776        timeout = timeout or self._default_timeout
777
778        tasks = [
779            self._health_check_single(server_id, client, timeout)
780            for server_id, client in self._clients.items()
781        ]
782
783        # Execute concurrently
784        results_list = await asyncio.gather(*tasks, return_exceptions=True)
785
786        # Build result dictionary
787        results: dict[str, dict[str, Any]] = {}
788        for (server_id, _), result in zip(
789            self._clients.items(), results_list, strict=False
790        ):
791            if isinstance(result, BaseException):
792                results[server_id] = {
793                    "healthy": False,
794                    "error": str(result),
795                    "error_type": type(result).__name__,
796                }
797            else:
798                results[server_id] = result
799
800        return results
801
802    @staticmethod
803    async def _health_check_single(
804        server_id: str,
805        client: AsyncOutlineClient,
806        timeout: float,
807    ) -> dict[str, Any]:
808        """Perform health check on a single server with timeout.
809
810        :param server_id: Server identifier
811        :param client: Client instance
812        :param timeout: Timeout for operation
813        :return: Health check result
814        """
815        try:
816            result = await asyncio.wait_for(
817                client.health_check(),
818                timeout=timeout,
819            )
820            result["server_id"] = server_id
821            return result
822        except asyncio.TimeoutError:
823            return {
824                "server_id": server_id,
825                "healthy": False,
826                "error": f"Health check timeout after {timeout}s",
827                "error_type": "TimeoutError",
828            }
829        except Exception as e:
830            return {
831                "server_id": server_id,
832                "healthy": False,
833                "error": str(e),
834                "error_type": type(e).__name__,
835            }
836
837    async def get_healthy_servers(
838        self,
839        timeout: float | None = None,
840    ) -> list[AsyncOutlineClient]:
841        """Get list of healthy servers after health check.
842
843        :param timeout: Timeout for health checks
844        :return: List of healthy clients
845        """
846        health_results = await self.health_check_all(timeout=timeout)
847
848        healthy_clients: list[AsyncOutlineClient] = []
849        for server_id, result in health_results.items():
850            if result.get("healthy", False):
851                try:
852                    client = self.get_client(server_id)
853                    healthy_clients.append(client)
854                except (KeyError, IndexError):
855                    continue
856
857        return healthy_clients
858
859    def get_status_summary(self) -> dict[str, Any]:
860        """Get aggregated status summary for all servers.
861
862        Synchronous operation - no API calls made.
863
864        :return: Status summary dictionary
865        """
866        return {
867            "total_servers": len(self._configs),
868            "active_servers": self.active_servers,
869            "server_statuses": {
870                server_id: client.get_status()
871                for server_id, client in self._clients.items()
872            },
873        }
874
875    def __repr__(self) -> str:
876        """String representation.
877
878        :return: String representation
879        """
880        active = self.active_servers
881        total = self.server_count
882        return f"MultiServerManager(servers={active}/{total} active)"

High-performance manager for multiple Outline servers.

Features:

  • Concurrent operations across all servers
  • Health checking and automatic failover
  • Aggregated metrics and status
  • Graceful shutdown with cleanup
  • Thread-safe operations

Limits:

  • Maximum 50 servers (configurable via _MAX_SERVERS)
  • Automatic cleanup with weak references
MultiServerManager( configs: Sequence[OutlineClientConfig], *, audit_logger: AuditLogger | None = None, metrics: MetricsCollector | None = None, default_timeout: float = 5.0)
577    def __init__(
578        self,
579        configs: Sequence[OutlineClientConfig],
580        *,
581        audit_logger: AuditLogger | None = None,
582        metrics: MetricsCollector | None = None,
583        default_timeout: float = _DEFAULT_SERVER_TIMEOUT,
584    ) -> None:
585        """Initialize multiserver manager.
586
587        :param configs: Sequence of server configurations
588        :param audit_logger: Shared audit logger for all servers
589        :param metrics: Shared metrics collector for all servers
590        :param default_timeout: Default timeout for operations (seconds)
591        :raises ConfigurationError: If too many servers or invalid configs
592        """
593        if len(configs) > _MAX_SERVERS:
594            raise ConfigurationError(
595                f"Too many servers: {len(configs)} (max: {_MAX_SERVERS})"
596            )
597
598        if not configs:
599            raise ConfigurationError("At least one server configuration required")
600
601        self._configs = list(configs)
602        self._clients: dict[str, AsyncOutlineClient] = {}
603        self._audit_logger = audit_logger
604        self._metrics = metrics
605        self._default_timeout = default_timeout
606        self._lock = asyncio.Lock()

Initialize multiserver manager.

Parameters
  • configs: Sequence of server configurations
  • audit_logger: Shared audit logger for all servers
  • metrics: Shared metrics collector for all servers
  • default_timeout: Default timeout for operations (seconds)
Raises
  • ConfigurationError: If too many servers or invalid configs
server_count: int
608    @property
609    def server_count(self) -> int:
610        """Get total number of configured servers.
611
612        :return: Number of servers
613        """
614        return len(self._configs)

Get total number of configured servers.

Returns

Number of servers

active_servers: int
616    @property
617    def active_servers(self) -> int:
618        """Get number of active (connected) servers.
619
620        :return: Number of active servers
621        """
622        return sum(1 for client in self._clients.values() if client.is_connected)

Get number of active (connected) servers.

Returns

Number of active servers

def get_server_names(self) -> list[str]:
624    def get_server_names(self) -> list[str]:
625        """Get list of sanitized server URLs.
626
627        URLs are sanitized to remove sensitive path information.
628
629        :return: List of safe server identifiers
630        """
631        return [
632            Validators.sanitize_url_for_logging(config.api_url)
633            for config in self._configs
634        ]

Get list of sanitized server URLs.

URLs are sanitized to remove sensitive path information.

Returns

List of safe server identifiers

def get_client( self, server_identifier: str | int) -> AsyncOutlineClient:
736    def get_client(self, server_identifier: str | int) -> AsyncOutlineClient:
737        """Get client by server identifier or index.
738
739        :param server_identifier: Server URL (sanitized) or 0-based index
740        :return: Client instance
741        :raises KeyError: If server not found
742        :raises IndexError: If index out of range
743        """
744        # Try as index first (fast path for common case)
745        if isinstance(server_identifier, int):
746            if 0 <= server_identifier < len(self._configs):
747                config = self._configs[server_identifier]
748                safe_url = Validators.sanitize_url_for_logging(config.api_url)
749                return self._clients[safe_url]
750            raise IndexError(
751                f"Server index {server_identifier} out of range (0-{len(self._configs) - 1})"
752            )
753
754        # Try as server ID
755        if server_identifier in self._clients:
756            return self._clients[server_identifier]
757
758        raise KeyError(f"Server not found: {server_identifier}")

Get client by server identifier or index.

Parameters
  • server_identifier: Server URL (sanitized) or 0-based index
Returns

Client instance

Raises
  • KeyError: If server not found
  • IndexError: If index out of range
def get_all_clients(self) -> list[AsyncOutlineClient]:
760    def get_all_clients(self) -> list[AsyncOutlineClient]:
761        """Get all active clients.
762
763        :return: List of client instances
764        """
765        return list(self._clients.values())

Get all active clients.

Returns

List of client instances

async def health_check_all(self, timeout: float | None = None) -> dict[str, dict[str, typing.Any]]:
767    async def health_check_all(
768        self,
769        timeout: float | None = None,
770    ) -> dict[str, dict[str, Any]]:
771        """Perform health check on all servers concurrently.
772
773        :param timeout: Timeout for each health check
774        :return: Dictionary mapping server IDs to health check results
775        """
776        timeout = timeout or self._default_timeout
777
778        tasks = [
779            self._health_check_single(server_id, client, timeout)
780            for server_id, client in self._clients.items()
781        ]
782
783        # Execute concurrently
784        results_list = await asyncio.gather(*tasks, return_exceptions=True)
785
786        # Build result dictionary
787        results: dict[str, dict[str, Any]] = {}
788        for (server_id, _), result in zip(
789            self._clients.items(), results_list, strict=False
790        ):
791            if isinstance(result, BaseException):
792                results[server_id] = {
793                    "healthy": False,
794                    "error": str(result),
795                    "error_type": type(result).__name__,
796                }
797            else:
798                results[server_id] = result
799
800        return results

Perform health check on all servers concurrently.

Parameters
  • timeout: Timeout for each health check
Returns

Dictionary mapping server IDs to health check results

async def get_healthy_servers( self, timeout: float | None = None) -> list[AsyncOutlineClient]:
837    async def get_healthy_servers(
838        self,
839        timeout: float | None = None,
840    ) -> list[AsyncOutlineClient]:
841        """Get list of healthy servers after health check.
842
843        :param timeout: Timeout for health checks
844        :return: List of healthy clients
845        """
846        health_results = await self.health_check_all(timeout=timeout)
847
848        healthy_clients: list[AsyncOutlineClient] = []
849        for server_id, result in health_results.items():
850            if result.get("healthy", False):
851                try:
852                    client = self.get_client(server_id)
853                    healthy_clients.append(client)
854                except (KeyError, IndexError):
855                    continue
856
857        return healthy_clients

Get list of healthy servers after health check.

Parameters
  • timeout: Timeout for health checks
Returns

List of healthy clients

def get_status_summary(self) -> dict[str, typing.Any]:
859    def get_status_summary(self) -> dict[str, Any]:
860        """Get aggregated status summary for all servers.
861
862        Synchronous operation - no API calls made.
863
864        :return: Status summary dictionary
865        """
866        return {
867            "total_servers": len(self._configs),
868            "active_servers": self.active_servers,
869            "server_statuses": {
870                server_id: client.get_status()
871                for server_id, client in self._clients.items()
872            },
873        }

Get aggregated status summary for all servers.

Synchronous operation - no API calls made.

Returns

Status summary dictionary

class NoOpAuditLogger:
537class NoOpAuditLogger:
538    """Zero-overhead no-op audit logger.
539
540    Implements AuditLogger protocol but performs no operations.
541    Useful for disabling audit without code changes or performance impact.
542    """
543
544    __slots__ = ()
545
546    async def alog_action(
547        self,
548        action: str,
549        resource: str,
550        *,
551        user: str | None = None,
552        details: dict[str, Any] | None = None,
553        correlation_id: str | None = None,
554    ) -> None:
555        """No-op async log."""
556
557    def log_action(
558        self,
559        action: str,
560        resource: str,
561        *,
562        user: str | None = None,
563        details: dict[str, Any] | None = None,
564        correlation_id: str | None = None,
565    ) -> None:
566        """No-op sync log."""
567
568    async def shutdown(self) -> None:
569        """No-op shutdown."""

Zero-overhead no-op audit logger.

Implements AuditLogger protocol but performs no operations. Useful for disabling audit without code changes or performance impact.

async def alog_action( self, action: str, resource: str, *, user: str | None = None, details: dict[str, typing.Any] | None = None, correlation_id: str | None = None) -> None:
546    async def alog_action(
547        self,
548        action: str,
549        resource: str,
550        *,
551        user: str | None = None,
552        details: dict[str, Any] | None = None,
553        correlation_id: str | None = None,
554    ) -> None:
555        """No-op async log."""

No-op async log.

def log_action( self, action: str, resource: str, *, user: str | None = None, details: dict[str, typing.Any] | None = None, correlation_id: str | None = None) -> None:
557    def log_action(
558        self,
559        action: str,
560        resource: str,
561        *,
562        user: str | None = None,
563        details: dict[str, Any] | None = None,
564        correlation_id: str | None = None,
565    ) -> None:
566        """No-op sync log."""

No-op sync log.

async def shutdown(self) -> None:
568    async def shutdown(self) -> None:
569        """No-op shutdown."""

No-op shutdown.

class NoOpMetrics:
 93class NoOpMetrics:
 94    """No-op metrics collector (zero-overhead default).
 95
 96    Uses __slots__ to minimize memory footprint.
 97    """
 98
 99    __slots__ = ()
100
101    def increment(self, metric: str, *, tags: MetricsTags | None = None) -> None:
102        """No-op increment (zero overhead)."""
103
104    def timing(
105        self, metric: str, value: float, *, tags: MetricsTags | None = None
106    ) -> None:
107        """No-op timing (zero overhead)."""
108
109    def gauge(
110        self, metric: str, value: float, *, tags: MetricsTags | None = None
111    ) -> None:
112        """No-op gauge (zero overhead)."""

No-op metrics collector (zero-overhead default).

Uses __slots__ to minimize memory footprint.

def increment(self, metric: str, *, tags: dict[str, str] | None = None) -> None:
101    def increment(self, metric: str, *, tags: MetricsTags | None = None) -> None:
102        """No-op increment (zero overhead)."""

No-op increment (zero overhead).

def timing( self, metric: str, value: float, *, tags: dict[str, str] | None = None) -> None:
104    def timing(
105        self, metric: str, value: float, *, tags: MetricsTags | None = None
106    ) -> None:
107        """No-op timing (zero overhead)."""

No-op timing (zero overhead).

def gauge( self, metric: str, value: float, *, tags: dict[str, str] | None = None) -> None:
109    def gauge(
110        self, metric: str, value: float, *, tags: MetricsTags | None = None
111    ) -> None:
112        """No-op gauge (zero overhead)."""

No-op gauge (zero overhead).

class OutlineClientConfig(pydantic_settings.main.BaseSettings):
 74class OutlineClientConfig(BaseSettings):
 75    """Main configuration."""
 76
 77    model_config = SettingsConfigDict(
 78        env_prefix=_ENV_PREFIX,
 79        env_file=".env",
 80        env_file_encoding="utf-8",
 81        case_sensitive=False,
 82        extra="forbid",
 83        validate_assignment=True,
 84        validate_default=True,
 85        frozen=False,
 86    )
 87
 88    # ===== Core Settings (Required) =====
 89
 90    api_url: str = Field(..., description="Outline server API URL with secret path")
 91    cert_sha256: SecretStr = Field(..., description="SHA-256 certificate fingerprint")
 92
 93    # ===== Client Settings =====
 94
 95    timeout: int = Field(
 96        default=10,
 97        ge=_MIN_TIMEOUT,
 98        le=_MAX_TIMEOUT,
 99        description="Request timeout (seconds)",
100    )
101    retry_attempts: int = Field(
102        default=2,
103        ge=_MIN_RETRY,
104        le=_MAX_RETRY,
105        description="Number of retries",
106    )
107    max_connections: int = Field(
108        default=10,
109        ge=_MIN_CONNECTIONS,
110        le=_MAX_CONNECTIONS,
111        description="Connection pool size",
112    )
113    rate_limit: int = Field(
114        default=100,
115        ge=_MIN_RATE_LIMIT,
116        le=_MAX_RATE_LIMIT,
117        description="Max concurrent requests",
118    )
119    user_agent: str = Field(
120        default=Constants.DEFAULT_USER_AGENT,
121        min_length=1,
122        max_length=256,
123        description="Custom user agent string",
124    )
125
126    # ===== Optional Features =====
127
128    enable_circuit_breaker: bool = Field(
129        default=True,
130        description="Enable circuit breaker",
131    )
132    enable_logging: bool = Field(
133        default=False,
134        description="Enable debug logging",
135    )
136    json_format: bool = Field(
137        default=False,
138        description="Return raw JSON",
139    )
140    allow_private_networks: bool = Field(
141        default=True,
142        description="Allow private or local network addresses in api_url",
143    )
144    resolve_dns_for_ssrf: bool = Field(
145        default=False,
146        description="Resolve DNS for SSRF checks (strict mode)",
147    )
148
149    # ===== Circuit Breaker Settings =====
150
151    circuit_failure_threshold: int = Field(
152        default=5,
153        ge=1,
154        le=100,
155        description="Failures before opening",
156    )
157    circuit_recovery_timeout: float = Field(
158        default=60.0,
159        ge=1.0,
160        le=3600.0,
161        description="Recovery wait time (seconds)",
162    )
163    circuit_success_threshold: int = Field(
164        default=2,
165        ge=1,
166        le=10,
167        description="Successes needed to close",
168    )
169    circuit_call_timeout: float = Field(
170        default=10.0,
171        ge=0.1,
172        le=300.0,
173        description="Circuit call timeout (seconds)",
174    )
175
176    # ===== Validators =====
177
178    @field_validator("api_url")
179    @classmethod
180    def validate_api_url(cls, v: str) -> str:
181        """Validate and normalize API URL with optimized regex.
182
183        :param v: URL to validate
184        :return: Validated URL
185        :raises ValueError: If URL is invalid
186        """
187        return Validators.validate_url(v)
188
189    @field_validator("cert_sha256")
190    @classmethod
191    def validate_cert(cls, v: SecretStr) -> SecretStr:
192        """Validate certificate fingerprint with constant-time comparison.
193
194        :param v: Certificate fingerprint
195        :return: Validated fingerprint
196        :raises ValueError: If fingerprint is invalid
197        """
198        return Validators.validate_cert_fingerprint(v)
199
200    @field_validator("user_agent")
201    @classmethod
202    def validate_user_agent(cls, v: str) -> str:
203        """Validate user agent string with efficient control char check.
204
205        :param v: User agent to validate
206        :return: Validated user agent
207        :raises ValueError: If user agent is invalid
208        """
209        v = Validators.validate_string_not_empty(v, "User agent")
210
211        # Efficient control character check using generator
212        if any(ord(c) < 32 for c in v):
213            raise ValueError("User agent contains invalid control characters")
214
215        return v
216
217    @model_validator(mode="after")
218    def validate_config(self) -> Self:
219        """Additional validation after model creation with pattern matching.
220
221        :return: Validated configuration instance
222        """
223        # Security warning for HTTP using pattern matching
224        match (self.api_url, "localhost" in self.api_url):
225            case (url, False) if "http://" in url:
226                _log_if_enabled(
227                    logging.WARNING,
228                    "Using HTTP for non-localhost connection. "
229                    "This is insecure and should only be used for testing.",
230                )
231
232        # Optional SSRF protection for private networks (no DNS resolution)
233        Validators.validate_url(
234            self.api_url,
235            allow_private_networks=self.allow_private_networks,
236            resolve_dns=False,
237        )
238
239        # Circuit breaker timeout adjustment with caching
240        if self.enable_circuit_breaker:
241            max_request_time = self._get_max_request_time()
242
243            if self.circuit_call_timeout < max_request_time:
244                _log_if_enabled(
245                    logging.WARNING,
246                    f"Circuit timeout ({self.circuit_call_timeout}s) is less than "
247                    f"max request time ({max_request_time}s). "
248                    f"Auto-adjusting to {max_request_time}s.",
249                )
250                object.__setattr__(self, "circuit_call_timeout", max_request_time)
251
252        return self
253
254    def _get_max_request_time(self) -> float:
255        """Calculate worst-case request time with instance caching.
256
257        :return: Maximum request time in seconds
258        """
259        if not hasattr(self, "_cached_max_request_time"):
260            self._cached_max_request_time = (
261                self.timeout * (self.retry_attempts + 1) + _SAFETY_MARGIN
262            )
263        return self._cached_max_request_time
264
265    # ===== Custom __setattr__ for SecretStr Protection =====
266
267    def __setattr__(self, name: str, value: object) -> None:
268        """Prevent accidental string assignment to SecretStr fields.
269
270        :param name: Attribute name
271        :param value: Attribute value
272        :raises TypeError: If trying to assign str to SecretStr field
273        """
274        # Fast path: skip check for non-cert fields
275        if name != "cert_sha256":
276            super().__setattr__(name, value)
277            return
278
279        if isinstance(value, str):
280            raise TypeError(
281                "cert_sha256 must be SecretStr, not str. Use: SecretStr('your_cert')"
282            )
283
284        super().__setattr__(name, value)
285
286    # ===== Helper Methods =====
287
288    @cached_property
289    def get_sanitized_config(self) -> ConfigDict:
290        """Get configuration with sensitive data masked (cached).
291
292        Safe for logging, debugging, and display.
293
294        Performance: ~20x speedup with caching for repeated calls
295        Memory: Single cached result per instance
296
297        :return: Sanitized configuration dictionary
298        """
299        return {
300            "api_url": Validators.sanitize_url_for_logging(self.api_url),
301            "cert_sha256": "***MASKED***",
302            "timeout": self.timeout,
303            "retry_attempts": self.retry_attempts,
304            "max_connections": self.max_connections,
305            "rate_limit": self.rate_limit,
306            "user_agent": self.user_agent,
307            "enable_circuit_breaker": self.enable_circuit_breaker,
308            "enable_logging": self.enable_logging,
309            "json_format": self.json_format,
310            "allow_private_networks": self.allow_private_networks,
311            "circuit_failure_threshold": self.circuit_failure_threshold,
312            "circuit_recovery_timeout": self.circuit_recovery_timeout,
313            "circuit_success_threshold": self.circuit_success_threshold,
314            "circuit_call_timeout": self.circuit_call_timeout,
315        }
316
317    def model_copy_immutable(self, **overrides: ConfigValue) -> OutlineClientConfig:
318        """Create immutable copy with overrides (optimized validation).
319
320        :param overrides: Configuration parameters to override
321        :return: Deep copy of configuration with applied updates
322        :raises ValueError: If invalid override keys provided
323
324        Example:
325            >>> new_config = config.model_copy_immutable(timeout=20)
326        """
327        # Optimized: Use frozenset intersection for O(1) validation
328        valid_keys = frozenset(ConfigOverrides.__annotations__.keys())
329        provided_keys = frozenset(overrides.keys())
330        invalid = provided_keys - valid_keys
331
332        if invalid:
333            raise ValueError(
334                f"Invalid configuration keys: {', '.join(sorted(invalid))}. "
335                f"Valid keys: {', '.join(sorted(valid_keys))}"
336            )
337
338        # Pydantic's model_copy is already optimized
339        return cast(  # type: ignore[redundant-cast, unused-ignore]
340            OutlineClientConfig, self.model_copy(deep=True, update=overrides)
341        )
342
343    @property
344    def circuit_config(self) -> CircuitConfig | None:
345        """Get circuit breaker configuration if enabled.
346
347        Returns None if circuit breaker is disabled, otherwise CircuitConfig instance.
348        Cached as property for performance.
349
350        :return: Circuit config or None if disabled
351        """
352        if not self.enable_circuit_breaker:
353            return None
354
355        return CircuitConfig(
356            failure_threshold=self.circuit_failure_threshold,
357            recovery_timeout=self.circuit_recovery_timeout,
358            success_threshold=self.circuit_success_threshold,
359            call_timeout=self.circuit_call_timeout,
360        )
361
362    # ===== Factory Methods =====
363
364    @classmethod
365    def from_env(
366        cls,
367        env_file: str | Path | None = None,
368        **overrides: ConfigValue,
369    ) -> OutlineClientConfig:
370        """Load configuration from environment with overrides.
371
372        :param env_file: Path to .env file
373        :param overrides: Configuration parameters to override
374        :return: Configuration instance
375        :raises ConfigurationError: If environment configuration is invalid
376
377        Example:
378            >>> config = OutlineClientConfig.from_env(
379            ...     env_file=".env.prod",
380            ...     timeout=20,
381            ...     enable_logging=True
382            ... )
383        """
384        # Fast path: validate overrides early
385        valid_keys = frozenset(ConfigOverrides.__annotations__.keys())
386        filtered_overrides = cast(
387            ConfigOverrides,
388            {k: v for k, v in overrides.items() if k in valid_keys},
389        )
390
391        if not env_file:
392            return cls(  # type: ignore[call-arg, unused-ignore]
393                **filtered_overrides
394            )
395
396        match env_file:
397            case str():
398                env_path = Path(env_file)
399            case Path():
400                env_path = env_file
401            case _:
402                raise TypeError(
403                    f"env_file must be str or Path, got {type(env_file).__name__}"
404                )
405
406        if not env_path.exists():
407            raise ConfigurationError(
408                f"Environment file not found: {env_path}",
409                field="env_file",
410            )
411
412        return cls(  # type: ignore[call-arg, unused-ignore]
413            _env_file=str(env_path),
414            **filtered_overrides,
415        )
416
417    @classmethod
418    def create_minimal(
419        cls,
420        api_url: str,
421        cert_sha256: str | SecretStr,
422        **overrides: ConfigValue,
423    ) -> OutlineClientConfig:
424        """Create minimal configuration (optimized validation).
425
426        :param api_url: API URL
427        :param cert_sha256: Certificate fingerprint
428        :param overrides: Optional configuration parameters
429        :return: Configuration instance
430        :raises TypeError: If cert_sha256 is not str or SecretStr
431
432        Example:
433            >>> config = OutlineClientConfig.create_minimal(
434            ...     api_url="https://server.com/path",
435            ...     cert_sha256="a" * 64,
436            ...     timeout=20
437            ... )
438        """
439        match cert_sha256:
440            case str():
441                cert = SecretStr(cert_sha256)
442            case SecretStr():
443                cert = cert_sha256
444            case _:
445                raise TypeError(
446                    f"cert_sha256 must be str or SecretStr, "
447                    f"got {type(cert_sha256).__name__}"
448                )
449
450        valid_keys = frozenset(ConfigOverrides.__annotations__.keys())
451        filtered_overrides = cast(
452            ConfigOverrides,
453            {k: v for k, v in overrides.items() if k in valid_keys},
454        )
455
456        return cls(
457            api_url=api_url,
458            cert_sha256=cert,
459            **filtered_overrides,
460        )

Main configuration.

api_url: str = PydanticUndefined

Outline server API URL with secret path

cert_sha256: pydantic.types.SecretStr = PydanticUndefined

SHA-256 certificate fingerprint

timeout: int = 10

Request timeout (seconds)

retry_attempts: int = 2

Number of retries

max_connections: int = 10

Connection pool size

rate_limit: int = 100

Max concurrent requests

user_agent: str = 'PyOutlineAPI/0.4.0'

Custom user agent string

enable_circuit_breaker: bool = True

Enable circuit breaker

enable_logging: bool = False

Enable debug logging

json_format: bool = False

Return raw JSON

allow_private_networks: bool = True

Allow private or local network addresses in api_url

resolve_dns_for_ssrf: bool = False

Resolve DNS for SSRF checks (strict mode)

circuit_failure_threshold: int = 5

Failures before opening

circuit_recovery_timeout: float = 60.0

Recovery wait time (seconds)

circuit_success_threshold: int = 2

Successes needed to close

circuit_call_timeout: float = 10.0

Circuit call timeout (seconds)

@field_validator('api_url')
@classmethod
def validate_api_url(cls, v: str) -> str:
178    @field_validator("api_url")
179    @classmethod
180    def validate_api_url(cls, v: str) -> str:
181        """Validate and normalize API URL with optimized regex.
182
183        :param v: URL to validate
184        :return: Validated URL
185        :raises ValueError: If URL is invalid
186        """
187        return Validators.validate_url(v)

Validate and normalize API URL with optimized regex.

Parameters
  • v: URL to validate
Returns

Validated URL

Raises
  • ValueError: If URL is invalid
@field_validator('cert_sha256')
@classmethod
def validate_cert(cls, v: pydantic.types.SecretStr) -> pydantic.types.SecretStr:
189    @field_validator("cert_sha256")
190    @classmethod
191    def validate_cert(cls, v: SecretStr) -> SecretStr:
192        """Validate certificate fingerprint with constant-time comparison.
193
194        :param v: Certificate fingerprint
195        :return: Validated fingerprint
196        :raises ValueError: If fingerprint is invalid
197        """
198        return Validators.validate_cert_fingerprint(v)

Validate certificate fingerprint with constant-time comparison.

Parameters
  • v: Certificate fingerprint
Returns

Validated fingerprint

Raises
  • ValueError: If fingerprint is invalid
@field_validator('user_agent')
@classmethod
def validate_user_agent(cls, v: str) -> str:
200    @field_validator("user_agent")
201    @classmethod
202    def validate_user_agent(cls, v: str) -> str:
203        """Validate user agent string with efficient control char check.
204
205        :param v: User agent to validate
206        :return: Validated user agent
207        :raises ValueError: If user agent is invalid
208        """
209        v = Validators.validate_string_not_empty(v, "User agent")
210
211        # Efficient control character check using generator
212        if any(ord(c) < 32 for c in v):
213            raise ValueError("User agent contains invalid control characters")
214
215        return v

Validate user agent string with efficient control char check.

Parameters
  • v: User agent to validate
Returns

Validated user agent

Raises
  • ValueError: If user agent is invalid
@model_validator(mode='after')
def validate_config(self) -> Self:
217    @model_validator(mode="after")
218    def validate_config(self) -> Self:
219        """Additional validation after model creation with pattern matching.
220
221        :return: Validated configuration instance
222        """
223        # Security warning for HTTP using pattern matching
224        match (self.api_url, "localhost" in self.api_url):
225            case (url, False) if "http://" in url:
226                _log_if_enabled(
227                    logging.WARNING,
228                    "Using HTTP for non-localhost connection. "
229                    "This is insecure and should only be used for testing.",
230                )
231
232        # Optional SSRF protection for private networks (no DNS resolution)
233        Validators.validate_url(
234            self.api_url,
235            allow_private_networks=self.allow_private_networks,
236            resolve_dns=False,
237        )
238
239        # Circuit breaker timeout adjustment with caching
240        if self.enable_circuit_breaker:
241            max_request_time = self._get_max_request_time()
242
243            if self.circuit_call_timeout < max_request_time:
244                _log_if_enabled(
245                    logging.WARNING,
246                    f"Circuit timeout ({self.circuit_call_timeout}s) is less than "
247                    f"max request time ({max_request_time}s). "
248                    f"Auto-adjusting to {max_request_time}s.",
249                )
250                object.__setattr__(self, "circuit_call_timeout", max_request_time)
251
252        return self

Additional validation after model creation with pattern matching.

Returns

Validated configuration instance

get_sanitized_config: dict[str, int | str | bool | float]
288    @cached_property
289    def get_sanitized_config(self) -> ConfigDict:
290        """Get configuration with sensitive data masked (cached).
291
292        Safe for logging, debugging, and display.
293
294        Performance: ~20x speedup with caching for repeated calls
295        Memory: Single cached result per instance
296
297        :return: Sanitized configuration dictionary
298        """
299        return {
300            "api_url": Validators.sanitize_url_for_logging(self.api_url),
301            "cert_sha256": "***MASKED***",
302            "timeout": self.timeout,
303            "retry_attempts": self.retry_attempts,
304            "max_connections": self.max_connections,
305            "rate_limit": self.rate_limit,
306            "user_agent": self.user_agent,
307            "enable_circuit_breaker": self.enable_circuit_breaker,
308            "enable_logging": self.enable_logging,
309            "json_format": self.json_format,
310            "allow_private_networks": self.allow_private_networks,
311            "circuit_failure_threshold": self.circuit_failure_threshold,
312            "circuit_recovery_timeout": self.circuit_recovery_timeout,
313            "circuit_success_threshold": self.circuit_success_threshold,
314            "circuit_call_timeout": self.circuit_call_timeout,
315        }

Get configuration with sensitive data masked (cached).

Safe for logging, debugging, and display.

Performance: ~20x speedup with caching for repeated calls Memory: Single cached result per instance

Returns

Sanitized configuration dictionary

def model_copy_immutable( self, **overrides: int | str | bool | float) -> OutlineClientConfig:
317    def model_copy_immutable(self, **overrides: ConfigValue) -> OutlineClientConfig:
318        """Create immutable copy with overrides (optimized validation).
319
320        :param overrides: Configuration parameters to override
321        :return: Deep copy of configuration with applied updates
322        :raises ValueError: If invalid override keys provided
323
324        Example:
325            >>> new_config = config.model_copy_immutable(timeout=20)
326        """
327        # Optimized: Use frozenset intersection for O(1) validation
328        valid_keys = frozenset(ConfigOverrides.__annotations__.keys())
329        provided_keys = frozenset(overrides.keys())
330        invalid = provided_keys - valid_keys
331
332        if invalid:
333            raise ValueError(
334                f"Invalid configuration keys: {', '.join(sorted(invalid))}. "
335                f"Valid keys: {', '.join(sorted(valid_keys))}"
336            )
337
338        # Pydantic's model_copy is already optimized
339        return cast(  # type: ignore[redundant-cast, unused-ignore]
340            OutlineClientConfig, self.model_copy(deep=True, update=overrides)
341        )

Create immutable copy with overrides (optimized validation).

Parameters
  • overrides: Configuration parameters to override
Returns

Deep copy of configuration with applied updates

Raises
  • ValueError: If invalid override keys provided
Example:
>>> new_config = config.model_copy_immutable(timeout=20)
circuit_config: CircuitConfig | None
343    @property
344    def circuit_config(self) -> CircuitConfig | None:
345        """Get circuit breaker configuration if enabled.
346
347        Returns None if circuit breaker is disabled, otherwise CircuitConfig instance.
348        Cached as property for performance.
349
350        :return: Circuit config or None if disabled
351        """
352        if not self.enable_circuit_breaker:
353            return None
354
355        return CircuitConfig(
356            failure_threshold=self.circuit_failure_threshold,
357            recovery_timeout=self.circuit_recovery_timeout,
358            success_threshold=self.circuit_success_threshold,
359            call_timeout=self.circuit_call_timeout,
360        )

Get circuit breaker configuration if enabled.

Returns None if circuit breaker is disabled, otherwise CircuitConfig instance. Cached as property for performance.

Returns

Circuit config or None if disabled

@classmethod
def from_env( cls, env_file: str | pathlib._local.Path | None = None, **overrides: int | str | bool | float) -> OutlineClientConfig:
364    @classmethod
365    def from_env(
366        cls,
367        env_file: str | Path | None = None,
368        **overrides: ConfigValue,
369    ) -> OutlineClientConfig:
370        """Load configuration from environment with overrides.
371
372        :param env_file: Path to .env file
373        :param overrides: Configuration parameters to override
374        :return: Configuration instance
375        :raises ConfigurationError: If environment configuration is invalid
376
377        Example:
378            >>> config = OutlineClientConfig.from_env(
379            ...     env_file=".env.prod",
380            ...     timeout=20,
381            ...     enable_logging=True
382            ... )
383        """
384        # Fast path: validate overrides early
385        valid_keys = frozenset(ConfigOverrides.__annotations__.keys())
386        filtered_overrides = cast(
387            ConfigOverrides,
388            {k: v for k, v in overrides.items() if k in valid_keys},
389        )
390
391        if not env_file:
392            return cls(  # type: ignore[call-arg, unused-ignore]
393                **filtered_overrides
394            )
395
396        match env_file:
397            case str():
398                env_path = Path(env_file)
399            case Path():
400                env_path = env_file
401            case _:
402                raise TypeError(
403                    f"env_file must be str or Path, got {type(env_file).__name__}"
404                )
405
406        if not env_path.exists():
407            raise ConfigurationError(
408                f"Environment file not found: {env_path}",
409                field="env_file",
410            )
411
412        return cls(  # type: ignore[call-arg, unused-ignore]
413            _env_file=str(env_path),
414            **filtered_overrides,
415        )

Load configuration from environment with overrides.

Parameters
  • env_file: Path to .env file
  • overrides: Configuration parameters to override
Returns

Configuration instance

Raises
  • ConfigurationError: If environment configuration is invalid
Example:
>>> config = OutlineClientConfig.from_env(
...     env_file=".env.prod",
...     timeout=20,
...     enable_logging=True
... )
@classmethod
def create_minimal( cls, api_url: str, cert_sha256: str | pydantic.types.SecretStr, **overrides: int | str | bool | float) -> OutlineClientConfig:
417    @classmethod
418    def create_minimal(
419        cls,
420        api_url: str,
421        cert_sha256: str | SecretStr,
422        **overrides: ConfigValue,
423    ) -> OutlineClientConfig:
424        """Create minimal configuration (optimized validation).
425
426        :param api_url: API URL
427        :param cert_sha256: Certificate fingerprint
428        :param overrides: Optional configuration parameters
429        :return: Configuration instance
430        :raises TypeError: If cert_sha256 is not str or SecretStr
431
432        Example:
433            >>> config = OutlineClientConfig.create_minimal(
434            ...     api_url="https://server.com/path",
435            ...     cert_sha256="a" * 64,
436            ...     timeout=20
437            ... )
438        """
439        match cert_sha256:
440            case str():
441                cert = SecretStr(cert_sha256)
442            case SecretStr():
443                cert = cert_sha256
444            case _:
445                raise TypeError(
446                    f"cert_sha256 must be str or SecretStr, "
447                    f"got {type(cert_sha256).__name__}"
448                )
449
450        valid_keys = frozenset(ConfigOverrides.__annotations__.keys())
451        filtered_overrides = cast(
452            ConfigOverrides,
453            {k: v for k, v in overrides.items() if k in valid_keys},
454        )
455
456        return cls(
457            api_url=api_url,
458            cert_sha256=cert,
459            **filtered_overrides,
460        )

Create minimal configuration (optimized validation).

Parameters
  • api_url: API URL
  • cert_sha256: Certificate fingerprint
  • overrides: Optional configuration parameters
Returns

Configuration instance

Raises
  • TypeError: If cert_sha256 is not str or SecretStr
Example:
>>> config = OutlineClientConfig.create_minimal(
...     api_url="https://server.com/path",
...     cert_sha256="a" * 64,
...     timeout=20
... )
class OutlineConnectionError(pyoutlineapi.OutlineError):
407class OutlineConnectionError(OutlineError):
408    """Network connection failure.
409
410    Attributes:
411        host: Host that failed
412        port: Port that failed
413
414    Example:
415        >>> error = OutlineConnectionError(
416        ...     "Connection refused", host="server.com", port=443
417        ... )
418        >>> error.is_retryable  # True
419    """
420
421    __slots__ = ("host", "port")
422
423    _is_retryable: ClassVar[bool] = True
424    _default_retry_delay: ClassVar[float] = 2.0
425
426    def __init__(
427        self,
428        message: str,
429        *,
430        host: str | None = None,
431        port: int | None = None,
432    ) -> None:
433        """Initialize connection error.
434
435        Args:
436            message: Error message
437            host: Host that failed
438            port: Port that failed
439        """
440        safe_details: dict[str, Any] | None = None
441        if host or port is not None:
442            safe_details = {}
443            if host:
444                safe_details["host"] = host
445            if port is not None:
446                safe_details["port"] = port
447
448        super().__init__(message, safe_details=safe_details)
449
450        self.host = host
451        self.port = port

Network connection failure.

Attributes:
  • host: Host that failed
  • port: Port that failed
Example:
>>> error = OutlineConnectionError(
...     "Connection refused", host="server.com", port=443
... )
>>> error.is_retryable  # True
OutlineConnectionError(message: str, *, host: str | None = None, port: int | None = None)
426    def __init__(
427        self,
428        message: str,
429        *,
430        host: str | None = None,
431        port: int | None = None,
432    ) -> None:
433        """Initialize connection error.
434
435        Args:
436            message: Error message
437            host: Host that failed
438            port: Port that failed
439        """
440        safe_details: dict[str, Any] | None = None
441        if host or port is not None:
442            safe_details = {}
443            if host:
444                safe_details["host"] = host
445            if port is not None:
446                safe_details["port"] = port
447
448        super().__init__(message, safe_details=safe_details)
449
450        self.host = host
451        self.port = port

Initialize connection error.

Arguments:
  • message: Error message
  • host: Host that failed
  • port: Port that failed
host
port
class OutlineError(builtins.Exception):
 42class OutlineError(Exception):
 43    """Base exception for all PyOutlineAPI errors.
 44
 45    Provides rich error context, retry guidance, and safe serialization
 46    with automatic credential sanitization.
 47
 48    Attributes:
 49        is_retryable: Whether this error type should be retried
 50        default_retry_delay: Suggested delay before retry in seconds
 51
 52    Example:
 53        >>> try:
 54        ...     raise OutlineError("Connection failed", details={"host": "server"})
 55        ... except OutlineError as e:
 56        ...     print(e.safe_details)  # {'host': 'server'}
 57    """
 58
 59    __slots__ = ("_cached_str", "_details", "_message", "_safe_details")
 60
 61    _is_retryable: ClassVar[bool] = False
 62    _default_retry_delay: ClassVar[float] = 1.0
 63
 64    def __init__(
 65        self,
 66        message: object,
 67        *,
 68        details: dict[str, Any] | None = None,
 69        safe_details: dict[str, Any] | None = None,
 70    ) -> None:
 71        """Initialize exception with automatic credential sanitization.
 72
 73        Args:
 74            message: Error message (automatically sanitized)
 75            details: Internal details (may contain sensitive data)
 76            safe_details: Safe details for logging/display
 77
 78        Raises:
 79            ValueError: If message exceeds maximum length after sanitization
 80        """
 81        # Validate and sanitize message
 82        if not isinstance(message, str):
 83            message = str(message)
 84
 85        # Sanitize credentials from message
 86        sanitized_message = CredentialSanitizer.sanitize(message)
 87
 88        # Truncate if too long
 89        if len(sanitized_message) > _MAX_MESSAGE_LENGTH:
 90            sanitized_message = sanitized_message[:_MAX_MESSAGE_LENGTH] + "..."
 91
 92        self._message = sanitized_message
 93        super().__init__(sanitized_message)
 94
 95        self._details: dict[str, Any] | MappingProxyType[str, Any] = (
 96            dict(details) if details else _EMPTY_DICT
 97        )
 98        self._safe_details: dict[str, Any] | MappingProxyType[str, Any] = (
 99            dict(safe_details) if safe_details else _EMPTY_DICT
100        )
101
102        self._cached_str: str | None = None
103
104    @property
105    def details(self) -> dict[str, Any]:
106        """Get internal error details (may contain sensitive data).
107
108        Warning:
109            Use with caution - may contain credentials or sensitive information.
110            For logging, use ``safe_details`` instead.
111
112        Returns:
113            Copy of internal details dictionary
114        """
115        if self._details is _EMPTY_DICT:
116            return {}
117        return self._details.copy()
118
119    @property
120    def safe_details(self) -> dict[str, Any]:
121        """Get sanitized error details safe for logging.
122
123        Returns:
124            Copy of safe details dictionary
125        """
126        if self._safe_details is _EMPTY_DICT:
127            return {}
128        return self._safe_details.copy()
129
130    def _format_details(self) -> str:
131        """Format safe details for string representation.
132
133        :return: Formatted details string
134        """
135        if not self._safe_details:
136            return ""
137
138        parts = [f"{k}={v}" for k, v in self._safe_details.items()]
139        return f" ({', '.join(parts)})"
140
141    def __str__(self) -> str:
142        """Safe string representation using safe_details.
143
144        Cached for performance on repeated access.
145
146        :return: String representation
147        """
148        if self._cached_str is None:
149            self._cached_str = f"{self._message}{self._format_details()}"
150        return self._cached_str
151
152    def __repr__(self) -> str:
153        """Safe repr without sensitive data.
154
155        :return: String representation
156        """
157        class_name = self.__class__.__name__
158        return f"{class_name}({self._message!r})"
159
160    @property
161    def is_retryable(self) -> bool:
162        """Return whether this error type should be retried."""
163        return self._is_retryable
164
165    @property
166    def default_retry_delay(self) -> float:
167        """Return suggested delay before retry in seconds."""
168        return self._default_retry_delay

Base exception for all PyOutlineAPI errors.

Provides rich error context, retry guidance, and safe serialization with automatic credential sanitization.

Attributes:
  • is_retryable: Whether this error type should be retried
  • default_retry_delay: Suggested delay before retry in seconds
Example:
>>> try:
...     raise OutlineError("Connection failed", details={"host": "server"})
... except OutlineError as e:
...     print(e.safe_details)  # {'host': 'server'}
OutlineError( message: object, *, details: dict[str, typing.Any] | None = None, safe_details: dict[str, typing.Any] | None = None)
 64    def __init__(
 65        self,
 66        message: object,
 67        *,
 68        details: dict[str, Any] | None = None,
 69        safe_details: dict[str, Any] | None = None,
 70    ) -> None:
 71        """Initialize exception with automatic credential sanitization.
 72
 73        Args:
 74            message: Error message (automatically sanitized)
 75            details: Internal details (may contain sensitive data)
 76            safe_details: Safe details for logging/display
 77
 78        Raises:
 79            ValueError: If message exceeds maximum length after sanitization
 80        """
 81        # Validate and sanitize message
 82        if not isinstance(message, str):
 83            message = str(message)
 84
 85        # Sanitize credentials from message
 86        sanitized_message = CredentialSanitizer.sanitize(message)
 87
 88        # Truncate if too long
 89        if len(sanitized_message) > _MAX_MESSAGE_LENGTH:
 90            sanitized_message = sanitized_message[:_MAX_MESSAGE_LENGTH] + "..."
 91
 92        self._message = sanitized_message
 93        super().__init__(sanitized_message)
 94
 95        self._details: dict[str, Any] | MappingProxyType[str, Any] = (
 96            dict(details) if details else _EMPTY_DICT
 97        )
 98        self._safe_details: dict[str, Any] | MappingProxyType[str, Any] = (
 99            dict(safe_details) if safe_details else _EMPTY_DICT
100        )
101
102        self._cached_str: str | None = None

Initialize exception with automatic credential sanitization.

Arguments:
  • message: Error message (automatically sanitized)
  • details: Internal details (may contain sensitive data)
  • safe_details: Safe details for logging/display
Raises:
  • ValueError: If message exceeds maximum length after sanitization
details: dict[str, typing.Any]
104    @property
105    def details(self) -> dict[str, Any]:
106        """Get internal error details (may contain sensitive data).
107
108        Warning:
109            Use with caution - may contain credentials or sensitive information.
110            For logging, use ``safe_details`` instead.
111
112        Returns:
113            Copy of internal details dictionary
114        """
115        if self._details is _EMPTY_DICT:
116            return {}
117        return self._details.copy()

Get internal error details (may contain sensitive data).

Warning:

Use with caution - may contain credentials or sensitive information. For logging, use safe_details instead.

Returns:

Copy of internal details dictionary

safe_details: dict[str, typing.Any]
119    @property
120    def safe_details(self) -> dict[str, Any]:
121        """Get sanitized error details safe for logging.
122
123        Returns:
124            Copy of safe details dictionary
125        """
126        if self._safe_details is _EMPTY_DICT:
127            return {}
128        return self._safe_details.copy()

Get sanitized error details safe for logging.

Returns:

Copy of safe details dictionary

is_retryable: bool
160    @property
161    def is_retryable(self) -> bool:
162        """Return whether this error type should be retried."""
163        return self._is_retryable

Return whether this error type should be retried.

default_retry_delay: float
165    @property
166    def default_retry_delay(self) -> float:
167        """Return suggested delay before retry in seconds."""
168        return self._default_retry_delay

Return suggested delay before retry in seconds.

class OutlineTimeoutError(pyoutlineapi.OutlineError):
454class OutlineTimeoutError(OutlineError):
455    """Operation timeout.
456
457    Attributes:
458        timeout: Timeout value in seconds
459        operation: Operation that timed out
460
461    Example:
462        >>> error = OutlineTimeoutError(
463        ...     "Request timeout", timeout=30.0, operation="get_server_info"
464        ... )
465        >>> error.is_retryable  # True
466    """
467
468    __slots__ = ("operation", "timeout")
469
470    _is_retryable: ClassVar[bool] = True
471    _default_retry_delay: ClassVar[float] = 2.0
472
473    def __init__(
474        self,
475        message: str,
476        *,
477        timeout: float | None = None,
478        operation: str | None = None,
479    ) -> None:
480        """Initialize timeout error.
481
482        Args:
483            message: Error message
484            timeout: Timeout value in seconds
485            operation: Operation that timed out
486        """
487        safe_details: dict[str, Any] | None = None
488        if timeout is not None or operation:
489            safe_details = {}
490            if timeout is not None:
491                safe_details["timeout"] = round(timeout, 2)
492            if operation:
493                safe_details["operation"] = operation
494
495        super().__init__(message, safe_details=safe_details)
496
497        self.timeout = timeout
498        self.operation = operation

Operation timeout.

Attributes:
  • timeout: Timeout value in seconds
  • operation: Operation that timed out
Example:
>>> error = OutlineTimeoutError(
...     "Request timeout", timeout=30.0, operation="get_server_info"
... )
>>> error.is_retryable  # True
OutlineTimeoutError( message: str, *, timeout: float | None = None, operation: str | None = None)
473    def __init__(
474        self,
475        message: str,
476        *,
477        timeout: float | None = None,
478        operation: str | None = None,
479    ) -> None:
480        """Initialize timeout error.
481
482        Args:
483            message: Error message
484            timeout: Timeout value in seconds
485            operation: Operation that timed out
486        """
487        safe_details: dict[str, Any] | None = None
488        if timeout is not None or operation:
489            safe_details = {}
490            if timeout is not None:
491                safe_details["timeout"] = round(timeout, 2)
492            if operation:
493                safe_details["operation"] = operation
494
495        super().__init__(message, safe_details=safe_details)
496
497        self.timeout = timeout
498        self.operation = operation

Initialize timeout error.

Arguments:
  • message: Error message
  • timeout: Timeout value in seconds
  • operation: Operation that timed out
timeout
operation
class PeakDeviceCount(pyoutlineapi.common_types.BaseValidatedModel):
423class PeakDeviceCount(BaseValidatedModel):
424    """Peak device count with timestamp.
425
426    SCHEMA: Based on experimental metrics connection peakDeviceCount object
427    """
428
429    data: int
430    timestamp: TimestampSec

Peak device count with timestamp.

SCHEMA: Based on experimental metrics connection peakDeviceCount object

data: int = PydanticUndefined
timestamp: Annotated[int, FieldInfo(annotation=NoneType, required=True, description='Unix timestamp in seconds', metadata=[Ge(ge=0)])] = PydanticUndefined

Unix timestamp in seconds

class PortRequest(pyoutlineapi.common_types.BaseValidatedModel):
522class PortRequest(BaseValidatedModel):
523    """Request model for setting default port.
524
525    SCHEMA: Based on PUT /server/port-for-new-access-keys request body
526    """
527
528    port: Port

Request model for setting default port.

SCHEMA: Based on PUT /server/port-for-new-access-keys request body

port: Annotated[int, FieldInfo(annotation=NoneType, required=True, description='Port number (1-65535)', metadata=[Ge(ge=1), Le(le=65535)])] = PydanticUndefined

Port number (1-65535)

class ProductionConfig(pyoutlineapi.OutlineClientConfig):
484class ProductionConfig(OutlineClientConfig):
485    """Production configuration with strict security.
486
487    Enforces HTTPS and enables all safety features:
488    - Circuit breaker enabled
489    - Logging disabled (performance)
490    - HTTPS enforcement
491    - Strict validation
492    """
493
494    model_config = SettingsConfigDict(
495        env_prefix=_PROD_ENV_PREFIX,
496        env_file=".env.prod",
497        case_sensitive=False,
498        extra="forbid",
499    )
500
501    enable_circuit_breaker: bool = True
502    enable_logging: bool = False
503    allow_private_networks: bool = False
504    resolve_dns_for_ssrf: bool = True
505
506    @model_validator(mode="after")
507    def enforce_security(self) -> Self:
508        """Enforce production security with optimized checks.
509
510        :return: Validated configuration
511        :raises ConfigurationError: If HTTP is used in production
512        """
513        match self.api_url:
514            case url if "http://" in url:
515                raise ConfigurationError(
516                    "Production environment must use HTTPS",
517                    field="api_url",
518                    security_issue=True,
519                )
520
521        if not self.enable_circuit_breaker:
522            _log_if_enabled(
523                logging.WARNING,
524                "Circuit breaker disabled in production. Not recommended.",
525            )
526
527        return self

Production configuration with strict security.

Enforces HTTPS and enables all safety features:

  • Circuit breaker enabled
  • Logging disabled (performance)
  • HTTPS enforcement
  • Strict validation
enable_circuit_breaker: bool = True
enable_logging: bool = False
allow_private_networks: bool = False
resolve_dns_for_ssrf: bool = True
@model_validator(mode='after')
def enforce_security(self) -> Self:
506    @model_validator(mode="after")
507    def enforce_security(self) -> Self:
508        """Enforce production security with optimized checks.
509
510        :return: Validated configuration
511        :raises ConfigurationError: If HTTP is used in production
512        """
513        match self.api_url:
514            case url if "http://" in url:
515                raise ConfigurationError(
516                    "Production environment must use HTTPS",
517                    field="api_url",
518                    security_issue=True,
519                )
520
521        if not self.enable_circuit_breaker:
522            _log_if_enabled(
523                logging.WARNING,
524                "Circuit breaker disabled in production. Not recommended.",
525            )
526
527        return self

Enforce production security with optimized checks.

Returns

Validated configuration

Raises
  • ConfigurationError: If HTTP is used in production
QueryParams = dict[str, str | int | float | bool]
ResponseData = dict[str, 'JsonValue']
class ResponseParser:
 38class ResponseParser:
 39    """High-performance utility class for parsing and validating API responses."""
 40
 41    __slots__ = ()  # Stateless class - zero memory overhead
 42
 43    @staticmethod
 44    @overload
 45    def parse(
 46        data: dict[str, JsonValue],
 47        model: type[T],
 48        *,
 49        as_json: Literal[True] = True,
 50    ) -> JsonDict: ...  # pragma: no cover
 51
 52    @staticmethod
 53    @overload
 54    def parse(
 55        data: dict[str, JsonValue],
 56        model: type[T],
 57        *,
 58        as_json: Literal[False] = False,
 59    ) -> T: ...  # pragma: no cover
 60
 61    @staticmethod
 62    @overload
 63    def parse(
 64        data: dict[str, JsonValue],
 65        model: type[T],
 66        *,
 67        as_json: bool,
 68    ) -> T | JsonDict: ...  # pragma: no cover
 69
 70    @staticmethod
 71    def parse(
 72        data: dict[str, JsonValue],
 73        model: type[T],
 74        *,
 75        as_json: bool = False,
 76    ) -> T | JsonDict:
 77        """Parse and validate response data with comprehensive error handling.
 78
 79        Type-safe overloads ensure correct return type based on as_json parameter.
 80
 81        :param data: Raw response data from API
 82        :param model: Pydantic model class for validation
 83        :param as_json: Return raw JSON dict instead of model instance
 84        :return: Validated model instance or JSON dict
 85        :raises ValidationError: If validation fails with detailed error info
 86
 87        Example:
 88            >>> data = {"name": "test", "id": 123}
 89            >>> # Type-safe: returns MyModel instance
 90            >>> result = ResponseParser.parse(data, MyModel, as_json=False)
 91            >>> # Type-safe: returns dict
 92            >>> json_result = ResponseParser.parse(data, MyModel, as_json=True)
 93        """
 94        if not isinstance(data, dict):
 95            raise OutlineValidationError(
 96                f"Expected dict, got {type(data).__name__}",
 97                model=model.__name__,
 98            )
 99
100        if not data and logger.isEnabledFor(Constants.LOG_LEVEL_DEBUG):
101            logger.debug("Parsing empty dict for model %s", model.__name__)
102
103        try:
104            validated = model.model_validate(data)
105
106            if as_json:
107                return cast(  # type: ignore[redundant-cast, unused-ignore]
108                    JsonDict, validated.model_dump(by_alias=True)
109                )
110            return cast(T, validated)  # type: ignore[redundant-cast, unused-ignore]
111
112        except ValidationError as e:
113            errors = e.errors()
114
115            if not errors:
116                raise OutlineValidationError(
117                    "Validation failed with no error details",
118                    model=model.__name__,
119                ) from e
120
121            first_error = errors[0]
122            field = ".".join(str(loc) for loc in first_error.get("loc", ()))
123            message = first_error.get("msg", "Validation failed")
124
125            error_count = len(errors)
126            if error_count > 1:
127                if logger.isEnabledFor(Constants.LOG_LEVEL_WARNING):
128                    logger.warning(
129                        "Multiple validation errors for %s: %d error(s)",
130                        model.__name__,
131                        error_count,
132                    )
133
134                if logger.isEnabledFor(Constants.LOG_LEVEL_DEBUG):
135                    logger.debug("Validation error details:")
136                    logged_count = min(error_count, _MAX_LOGGED_ERRORS)
137
138                    for i, error in enumerate(errors[:logged_count], 1):
139                        error_field = ".".join(str(loc) for loc in error.get("loc", ()))
140                        error_msg = error.get("msg", "Unknown error")
141                        logger.debug("  %d. %s: %s", i, error_field, error_msg)
142
143                    if error_count > _MAX_LOGGED_ERRORS:
144                        remaining = error_count - _MAX_LOGGED_ERRORS
145                        logger.debug("  ... and %d more error(s)", remaining)
146
147            raise OutlineValidationError(
148                message,
149                field=field,
150                model=model.__name__,
151            ) from e
152
153        except Exception as e:
154            # Catch any other unexpected errors during validation
155            if logger.isEnabledFor(Constants.LOG_LEVEL_ERROR):
156                logger.error(
157                    "Unexpected error during validation: %s",
158                    e,
159                    exc_info=True,
160                )
161            raise OutlineValidationError(
162                f"Unexpected error during validation: {e}",
163                model=model.__name__,
164            ) from e
165
166    @staticmethod
167    def parse_simple(data: Mapping[str, JsonValue] | object) -> bool:
168        """Parse simple success/error responses efficiently.
169
170        Handles various response formats with minimal overhead:
171        - {"success": true/false}
172        - {"error": "..."}  → False
173        - {"message": "..."}  → False
174        - Empty dict  → True (assumed success)
175
176        :param data: Response data
177        :return: True if successful, False otherwise
178
179        Example:
180            >>> ResponseParser.parse_simple({"success": True})
181            True
182            >>> ResponseParser.parse_simple({"error": "Something failed"})
183            False
184            >>> ResponseParser.parse_simple({})
185            True
186        """
187        if not isinstance(data, dict):
188            if logger.isEnabledFor(Constants.LOG_LEVEL_WARNING):
189                logger.warning(
190                    "Expected dict in parse_simple, got %s",
191                    type(data).__name__,
192                )
193            return False
194
195        if "success" in data:
196            success = data["success"]
197            if not isinstance(success, bool):
198                if logger.isEnabledFor(Constants.LOG_LEVEL_WARNING):
199                    logger.warning(
200                        "success field is not bool: %s, coercing to bool",
201                        type(success).__name__,
202                    )
203                return bool(success)
204            return success
205
206        return "error" not in data and "message" not in data
207
208    @staticmethod
209    def validate_response_structure(
210        data: Mapping[str, JsonValue] | object,
211        required_fields: Sequence[str] | None = None,
212    ) -> bool:
213        """Validate response structure without full parsing.
214
215        Lightweight validation before expensive Pydantic validation.
216        Useful for early rejection of malformed responses.
217
218        :param data: Response data to validate
219        :param required_fields: Sequence of required field names
220        :return: True if structure is valid
221
222        Example:
223            >>> data = {"id": 1, "name": "test"}
224            >>> ResponseParser.validate_response_structure(data, ["id", "name"])
225            True
226            >>> ResponseParser.validate_response_structure(data, ["id", "missing"])
227            False
228        """
229        if not isinstance(data, dict):
230            return False
231
232        if not data and not required_fields:
233            return True
234
235        if not required_fields:
236            return True
237
238        return all(field in data for field in required_fields)
239
240    @staticmethod
241    def extract_error_message(data: Mapping[str, JsonValue] | object) -> str | None:
242        """Extract error message from response data efficiently.
243
244        Checks common error field names in order of preference.
245        Uses pre-computed tuple for fast iteration.
246
247        :param data: Response data
248        :return: Error message or None if not found
249
250        Example:
251            >>> ResponseParser.extract_error_message({"error": "Not found"})
252            'Not found'
253            >>> ResponseParser.extract_error_message({"message": "Failed"})
254            'Failed'
255            >>> ResponseParser.extract_error_message({"success": True})
256            None
257        """
258        if not isinstance(data, dict):
259            return None
260
261        for field in _ERROR_FIELDS:
262            if field in data:
263                value = data[field]
264                # Fast path: already a string
265                if isinstance(value, str):
266                    return value
267                # Convert non-string to string (None → None)
268                return str(value) if value is not None else None
269
270        return None
271
272    @staticmethod
273    def is_error_response(data: Mapping[str, object] | object) -> bool:
274        """Check if response indicates an error efficiently.
275
276        Fast boolean check for error indicators in response.
277
278        :param data: Response data
279        :return: True if response indicates an error
280
281        Example:
282            >>> ResponseParser.is_error_response({"error": "Failed"})
283            True
284            >>> ResponseParser.is_error_response({"success": False})
285            True
286            >>> ResponseParser.is_error_response({"success": True})
287            False
288            >>> ResponseParser.is_error_response({})
289            False
290        """
291        if not isinstance(data, dict):
292            return False
293
294        if "error" in data or "error_message" in data:
295            return True
296
297        if "success" in data:
298            success = data["success"]
299            return success is False
300
301        # No error indicators found
302        return False

High-performance utility class for parsing and validating API responses.

@staticmethod
def parse( data: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None | dict[str, ForwardRef('JsonValue')] | list[ForwardRef('JsonValue')]] | list[str | int | float | bool | None | dict[str, ForwardRef('JsonValue')] | list[ForwardRef('JsonValue')]]], model: type[~T], *, as_json: bool = False) -> Union[~T, dict[str, str | int | float | bool | None | dict[str, ForwardRef('JsonValue')] | list[ForwardRef('JsonValue')]]]:
 70    @staticmethod
 71    def parse(
 72        data: dict[str, JsonValue],
 73        model: type[T],
 74        *,
 75        as_json: bool = False,
 76    ) -> T | JsonDict:
 77        """Parse and validate response data with comprehensive error handling.
 78
 79        Type-safe overloads ensure correct return type based on as_json parameter.
 80
 81        :param data: Raw response data from API
 82        :param model: Pydantic model class for validation
 83        :param as_json: Return raw JSON dict instead of model instance
 84        :return: Validated model instance or JSON dict
 85        :raises ValidationError: If validation fails with detailed error info
 86
 87        Example:
 88            >>> data = {"name": "test", "id": 123}
 89            >>> # Type-safe: returns MyModel instance
 90            >>> result = ResponseParser.parse(data, MyModel, as_json=False)
 91            >>> # Type-safe: returns dict
 92            >>> json_result = ResponseParser.parse(data, MyModel, as_json=True)
 93        """
 94        if not isinstance(data, dict):
 95            raise OutlineValidationError(
 96                f"Expected dict, got {type(data).__name__}",
 97                model=model.__name__,
 98            )
 99
100        if not data and logger.isEnabledFor(Constants.LOG_LEVEL_DEBUG):
101            logger.debug("Parsing empty dict for model %s", model.__name__)
102
103        try:
104            validated = model.model_validate(data)
105
106            if as_json:
107                return cast(  # type: ignore[redundant-cast, unused-ignore]
108                    JsonDict, validated.model_dump(by_alias=True)
109                )
110            return cast(T, validated)  # type: ignore[redundant-cast, unused-ignore]
111
112        except ValidationError as e:
113            errors = e.errors()
114
115            if not errors:
116                raise OutlineValidationError(
117                    "Validation failed with no error details",
118                    model=model.__name__,
119                ) from e
120
121            first_error = errors[0]
122            field = ".".join(str(loc) for loc in first_error.get("loc", ()))
123            message = first_error.get("msg", "Validation failed")
124
125            error_count = len(errors)
126            if error_count > 1:
127                if logger.isEnabledFor(Constants.LOG_LEVEL_WARNING):
128                    logger.warning(
129                        "Multiple validation errors for %s: %d error(s)",
130                        model.__name__,
131                        error_count,
132                    )
133
134                if logger.isEnabledFor(Constants.LOG_LEVEL_DEBUG):
135                    logger.debug("Validation error details:")
136                    logged_count = min(error_count, _MAX_LOGGED_ERRORS)
137
138                    for i, error in enumerate(errors[:logged_count], 1):
139                        error_field = ".".join(str(loc) for loc in error.get("loc", ()))
140                        error_msg = error.get("msg", "Unknown error")
141                        logger.debug("  %d. %s: %s", i, error_field, error_msg)
142
143                    if error_count > _MAX_LOGGED_ERRORS:
144                        remaining = error_count - _MAX_LOGGED_ERRORS
145                        logger.debug("  ... and %d more error(s)", remaining)
146
147            raise OutlineValidationError(
148                message,
149                field=field,
150                model=model.__name__,
151            ) from e
152
153        except Exception as e:
154            # Catch any other unexpected errors during validation
155            if logger.isEnabledFor(Constants.LOG_LEVEL_ERROR):
156                logger.error(
157                    "Unexpected error during validation: %s",
158                    e,
159                    exc_info=True,
160                )
161            raise OutlineValidationError(
162                f"Unexpected error during validation: {e}",
163                model=model.__name__,
164            ) from e

Parse and validate response data with comprehensive error handling.

Type-safe overloads ensure correct return type based on as_json parameter.

Parameters
  • data: Raw response data from API
  • model: Pydantic model class for validation
  • as_json: Return raw JSON dict instead of model instance
Returns

Validated model instance or JSON dict

Raises
  • ValidationError: If validation fails with detailed error info
Example:
>>> data = {"name": "test", "id": 123}
>>> # Type-safe: returns MyModel instance
>>> result = ResponseParser.parse(data, MyModel, as_json=False)
>>> # Type-safe: returns dict
>>> json_result = ResponseParser.parse(data, MyModel, as_json=True)
@staticmethod
def parse_simple( data: Mapping[str, str | int | float | bool | None | dict[str, str | int | float | bool | None | dict[str, ForwardRef('JsonValue')] | list[ForwardRef('JsonValue')]] | list[str | int | float | bool | None | dict[str, ForwardRef('JsonValue')] | list[ForwardRef('JsonValue')]]] | object) -> bool:
166    @staticmethod
167    def parse_simple(data: Mapping[str, JsonValue] | object) -> bool:
168        """Parse simple success/error responses efficiently.
169
170        Handles various response formats with minimal overhead:
171        - {"success": true/false}
172        - {"error": "..."}  → False
173        - {"message": "..."}  → False
174        - Empty dict  → True (assumed success)
175
176        :param data: Response data
177        :return: True if successful, False otherwise
178
179        Example:
180            >>> ResponseParser.parse_simple({"success": True})
181            True
182            >>> ResponseParser.parse_simple({"error": "Something failed"})
183            False
184            >>> ResponseParser.parse_simple({})
185            True
186        """
187        if not isinstance(data, dict):
188            if logger.isEnabledFor(Constants.LOG_LEVEL_WARNING):
189                logger.warning(
190                    "Expected dict in parse_simple, got %s",
191                    type(data).__name__,
192                )
193            return False
194
195        if "success" in data:
196            success = data["success"]
197            if not isinstance(success, bool):
198                if logger.isEnabledFor(Constants.LOG_LEVEL_WARNING):
199                    logger.warning(
200                        "success field is not bool: %s, coercing to bool",
201                        type(success).__name__,
202                    )
203                return bool(success)
204            return success
205
206        return "error" not in data and "message" not in data

Parse simple success/error responses efficiently.

Handles various response formats with minimal overhead:

  • {"success": true/false}
  • {"error": "..."} → False
  • {"message": "..."} → False
  • Empty dict → True (assumed success)
Parameters
  • data: Response data
Returns

True if successful, False otherwise

Example:
>>> ResponseParser.parse_simple({"success": True})
True
>>> ResponseParser.parse_simple({"error": "Something failed"})
False
>>> ResponseParser.parse_simple({})
True
@staticmethod
def validate_response_structure( data: Mapping[str, str | int | float | bool | None | dict[str, str | int | float | bool | None | dict[str, ForwardRef('JsonValue')] | list[ForwardRef('JsonValue')]] | list[str | int | float | bool | None | dict[str, ForwardRef('JsonValue')] | list[ForwardRef('JsonValue')]]] | object, required_fields: Sequence[str] | None = None) -> bool:
208    @staticmethod
209    def validate_response_structure(
210        data: Mapping[str, JsonValue] | object,
211        required_fields: Sequence[str] | None = None,
212    ) -> bool:
213        """Validate response structure without full parsing.
214
215        Lightweight validation before expensive Pydantic validation.
216        Useful for early rejection of malformed responses.
217
218        :param data: Response data to validate
219        :param required_fields: Sequence of required field names
220        :return: True if structure is valid
221
222        Example:
223            >>> data = {"id": 1, "name": "test"}
224            >>> ResponseParser.validate_response_structure(data, ["id", "name"])
225            True
226            >>> ResponseParser.validate_response_structure(data, ["id", "missing"])
227            False
228        """
229        if not isinstance(data, dict):
230            return False
231
232        if not data and not required_fields:
233            return True
234
235        if not required_fields:
236            return True
237
238        return all(field in data for field in required_fields)

Validate response structure without full parsing.

Lightweight validation before expensive Pydantic validation. Useful for early rejection of malformed responses.

Parameters
  • data: Response data to validate
  • required_fields: Sequence of required field names
Returns

True if structure is valid

Example:
>>> data = {"id": 1, "name": "test"}
>>> ResponseParser.validate_response_structure(data, ["id", "name"])
True
>>> ResponseParser.validate_response_structure(data, ["id", "missing"])
False
@staticmethod
def extract_error_message( data: Mapping[str, str | int | float | bool | None | dict[str, str | int | float | bool | None | dict[str, ForwardRef('JsonValue')] | list[ForwardRef('JsonValue')]] | list[str | int | float | bool | None | dict[str, ForwardRef('JsonValue')] | list[ForwardRef('JsonValue')]]] | object) -> str | None:
240    @staticmethod
241    def extract_error_message(data: Mapping[str, JsonValue] | object) -> str | None:
242        """Extract error message from response data efficiently.
243
244        Checks common error field names in order of preference.
245        Uses pre-computed tuple for fast iteration.
246
247        :param data: Response data
248        :return: Error message or None if not found
249
250        Example:
251            >>> ResponseParser.extract_error_message({"error": "Not found"})
252            'Not found'
253            >>> ResponseParser.extract_error_message({"message": "Failed"})
254            'Failed'
255            >>> ResponseParser.extract_error_message({"success": True})
256            None
257        """
258        if not isinstance(data, dict):
259            return None
260
261        for field in _ERROR_FIELDS:
262            if field in data:
263                value = data[field]
264                # Fast path: already a string
265                if isinstance(value, str):
266                    return value
267                # Convert non-string to string (None → None)
268                return str(value) if value is not None else None
269
270        return None

Extract error message from response data efficiently.

Checks common error field names in order of preference. Uses pre-computed tuple for fast iteration.

Parameters
  • data: Response data
Returns

Error message or None if not found

Example:
>>> ResponseParser.extract_error_message({"error": "Not found"})
'Not found'
>>> ResponseParser.extract_error_message({"message": "Failed"})
'Failed'
>>> ResponseParser.extract_error_message({"success": True})
None
@staticmethod
def is_error_response(data: Mapping[str, object] | object) -> bool:
272    @staticmethod
273    def is_error_response(data: Mapping[str, object] | object) -> bool:
274        """Check if response indicates an error efficiently.
275
276        Fast boolean check for error indicators in response.
277
278        :param data: Response data
279        :return: True if response indicates an error
280
281        Example:
282            >>> ResponseParser.is_error_response({"error": "Failed"})
283            True
284            >>> ResponseParser.is_error_response({"success": False})
285            True
286            >>> ResponseParser.is_error_response({"success": True})
287            False
288            >>> ResponseParser.is_error_response({})
289            False
290        """
291        if not isinstance(data, dict):
292            return False
293
294        if "error" in data or "error_message" in data:
295            return True
296
297        if "success" in data:
298            success = data["success"]
299            return success is False
300
301        # No error indicators found
302        return False

Check if response indicates an error efficiently.

Fast boolean check for error indicators in response.

Parameters
  • data: Response data
Returns

True if response indicates an error

Example:
>>> ResponseParser.is_error_response({"error": "Failed"})
True
>>> ResponseParser.is_error_response({"success": False})
True
>>> ResponseParser.is_error_response({"success": True})
False
>>> ResponseParser.is_error_response({})
False
class SecureIDGenerator:
339class SecureIDGenerator:
340    """Cryptographically secure ID generation."""
341
342    __slots__ = ()
343
344    @staticmethod
345    def generate_correlation_id() -> str:
346        """Generate secure correlation ID with 128 bits entropy.
347
348        Format: {timestamp_us}-{random_hex}
349
350        :return: Correlation ID string
351        """
352        # 16 bytes = 128 bits of entropy
353        random_part = secrets.token_hex(16)
354
355        # Microsecond timestamp for uniqueness and ordering
356        timestamp = int(time.time() * 1_000_000)
357
358        return f"{timestamp}-{random_part}"
359
360    @staticmethod
361    def generate_request_id() -> str:
362        """Generate secure request ID.
363
364        Alias for correlation ID for API compatibility.
365
366        :return: Request ID string
367        """
368        return SecureIDGenerator.generate_correlation_id()

Cryptographically secure ID generation.

@staticmethod
def generate_correlation_id() -> str:
344    @staticmethod
345    def generate_correlation_id() -> str:
346        """Generate secure correlation ID with 128 bits entropy.
347
348        Format: {timestamp_us}-{random_hex}
349
350        :return: Correlation ID string
351        """
352        # 16 bytes = 128 bits of entropy
353        random_part = secrets.token_hex(16)
354
355        # Microsecond timestamp for uniqueness and ordering
356        timestamp = int(time.time() * 1_000_000)
357
358        return f"{timestamp}-{random_part}"

Generate secure correlation ID with 128 bits entropy.

Format: {timestamp_us}-{random_hex}

Returns

Correlation ID string

@staticmethod
def generate_request_id() -> str:
360    @staticmethod
361    def generate_request_id() -> str:
362        """Generate secure request ID.
363
364        Alias for correlation ID for API compatibility.
365
366        :return: Request ID string
367        """
368        return SecureIDGenerator.generate_correlation_id()

Generate secure request ID.

Alias for correlation ID for API compatibility.

Returns

Request ID string

class Server(pyoutlineapi.common_types.BaseValidatedModel):
257class Server(BaseValidatedModel):
258    """Server information model with optimized properties.
259
260    SCHEMA: Based on GET /server response
261    """
262
263    name: str | None = None
264    server_id: str = Field(alias="serverId")
265    metrics_enabled: bool = Field(alias="metricsEnabled")
266    created_timestamp_ms: TimestampMs = Field(alias="createdTimestampMs")
267    port_for_new_access_keys: Port = Field(alias="portForNewAccessKeys")
268    hostname_for_access_keys: str | None = Field(None, alias="hostnameForAccessKeys")
269    access_key_data_limit: DataLimit | None = Field(None, alias="accessKeyDataLimit")
270    version: str | None = None
271
272    @field_validator("name", mode="before")
273    @classmethod
274    def validate_name(cls, v: str) -> str:
275        """Validate server name.
276
277        :param v: Server name
278        :return: Validated name
279        :raises ValueError: If name is empty
280        """
281        validated = Validators.validate_name(v)
282        if validated is None:
283            raise ValueError("Server name cannot be empty")
284        return validated
285
286    @property
287    def has_global_limit(self) -> bool:
288        """Check if server has global data limit (optimized).
289
290        :return: True if global limit exists
291        """
292        return self.access_key_data_limit is not None
293
294    @cached_property
295    def created_timestamp_seconds(self) -> float:
296        """Get creation timestamp in seconds (cached).
297
298        NOTE: Cached because timestamp is immutable
299
300        :return: Timestamp in seconds
301        """
302        return self.created_timestamp_ms / _MS_IN_SEC

Server information model with optimized properties.

SCHEMA: Based on GET /server response

name: str | None = None
server_id: str = PydanticUndefined
metrics_enabled: bool = PydanticUndefined
created_timestamp_ms: Annotated[int, FieldInfo(annotation=NoneType, required=True, description='Unix timestamp in milliseconds', metadata=[Ge(ge=0)])] = PydanticUndefined

Unix timestamp in milliseconds

port_for_new_access_keys: Annotated[int, FieldInfo(annotation=NoneType, required=True, description='Port number (1-65535)', metadata=[Ge(ge=1), Le(le=65535)])] = PydanticUndefined

Port number (1-65535)

hostname_for_access_keys: str | None = None
access_key_data_limit: DataLimit | None = None
version: str | None = None
@field_validator('name', mode='before')
@classmethod
def validate_name(cls, v: str) -> str:
272    @field_validator("name", mode="before")
273    @classmethod
274    def validate_name(cls, v: str) -> str:
275        """Validate server name.
276
277        :param v: Server name
278        :return: Validated name
279        :raises ValueError: If name is empty
280        """
281        validated = Validators.validate_name(v)
282        if validated is None:
283            raise ValueError("Server name cannot be empty")
284        return validated

Validate server name.

Parameters
  • v: Server name
Returns

Validated name

Raises
  • ValueError: If name is empty
has_global_limit: bool
286    @property
287    def has_global_limit(self) -> bool:
288        """Check if server has global data limit (optimized).
289
290        :return: True if global limit exists
291        """
292        return self.access_key_data_limit is not None

Check if server has global data limit (optimized).

Returns

True if global limit exists

created_timestamp_seconds: float
294    @cached_property
295    def created_timestamp_seconds(self) -> float:
296        """Get creation timestamp in seconds (cached).
297
298        NOTE: Cached because timestamp is immutable
299
300        :return: Timestamp in seconds
301        """
302        return self.created_timestamp_ms / _MS_IN_SEC

Get creation timestamp in seconds (cached).

NOTE: Cached because timestamp is immutable

Returns

Timestamp in seconds

class ServerExperimentalMetric(pyoutlineapi.common_types.BaseValidatedModel):
455class ServerExperimentalMetric(BaseValidatedModel):
456    """Server-level experimental metrics.
457
458    SCHEMA: Based on experimental metrics server object
459    """
460
461    tunnel_time: TunnelTime = Field(alias="tunnelTime")
462    data_transferred: DataTransferred = Field(alias="dataTransferred")
463    bandwidth: BandwidthInfo
464    locations: list[LocationMetric]

Server-level experimental metrics.

SCHEMA: Based on experimental metrics server object

tunnel_time: TunnelTime = PydanticUndefined
data_transferred: DataTransferred = PydanticUndefined
bandwidth: BandwidthInfo = PydanticUndefined
locations: list[LocationMetric] = PydanticUndefined
class ServerMetrics(pyoutlineapi.common_types.BaseValidatedModel):
308class ServerMetrics(BaseValidatedModel):
309    """Transfer metrics with optimized aggregations.
310
311    SCHEMA: Based on GET /metrics/transfer response
312    """
313
314    bytes_transferred_by_user_id: BytesPerUserDict = Field(
315        alias="bytesTransferredByUserId"
316    )
317
318    @cached_property
319    def total_bytes(self) -> int:
320        """Calculate total bytes with caching.
321
322        :return: Total bytes transferred
323        """
324        return sum(self.bytes_transferred_by_user_id.values())
325
326    @cached_property
327    def total_gigabytes(self) -> float:
328        """Get total in gigabytes (uses cached total_bytes).
329
330        :return: Total GB transferred
331        """
332        return self.total_bytes / _BYTES_IN_GB
333
334    @cached_property
335    def user_count(self) -> int:
336        """Get number of users (cached).
337
338        :return: Number of users
339        """
340        return len(self.bytes_transferred_by_user_id)
341
342    def get_user_bytes(self, user_id: str) -> int:
343        """Get bytes for specific user (O(1) dict lookup).
344
345        :param user_id: User/key ID
346        :return: Bytes transferred or 0 if not found
347        """
348        return self.bytes_transferred_by_user_id.get(user_id, 0)
349
350    def top_users(self, limit: int = 10) -> list[tuple[str, int]]:
351        """Get top users by bytes transferred (optimized sorting).
352
353        :param limit: Number of top users to return
354        :return: List of (user_id, bytes) tuples
355        """
356        return sorted(
357            self.bytes_transferred_by_user_id.items(),
358            key=lambda x: x[1],
359            reverse=True,
360        )[:limit]

Transfer metrics with optimized aggregations.

SCHEMA: Based on GET /metrics/transfer response

bytes_transferred_by_user_id: dict[str, int] = PydanticUndefined
total_bytes: int
318    @cached_property
319    def total_bytes(self) -> int:
320        """Calculate total bytes with caching.
321
322        :return: Total bytes transferred
323        """
324        return sum(self.bytes_transferred_by_user_id.values())

Calculate total bytes with caching.

Returns

Total bytes transferred

total_gigabytes: float
326    @cached_property
327    def total_gigabytes(self) -> float:
328        """Get total in gigabytes (uses cached total_bytes).
329
330        :return: Total GB transferred
331        """
332        return self.total_bytes / _BYTES_IN_GB

Get total in gigabytes (uses cached total_bytes).

Returns

Total GB transferred

user_count: int
334    @cached_property
335    def user_count(self) -> int:
336        """Get number of users (cached).
337
338        :return: Number of users
339        """
340        return len(self.bytes_transferred_by_user_id)

Get number of users (cached).

Returns

Number of users

def get_user_bytes(self, user_id: str) -> int:
342    def get_user_bytes(self, user_id: str) -> int:
343        """Get bytes for specific user (O(1) dict lookup).
344
345        :param user_id: User/key ID
346        :return: Bytes transferred or 0 if not found
347        """
348        return self.bytes_transferred_by_user_id.get(user_id, 0)

Get bytes for specific user (O(1) dict lookup).

Parameters
  • user_id: User/key ID
Returns

Bytes transferred or 0 if not found

def top_users(self, limit: int = 10) -> list[tuple[str, int]]:
350    def top_users(self, limit: int = 10) -> list[tuple[str, int]]:
351        """Get top users by bytes transferred (optimized sorting).
352
353        :param limit: Number of top users to return
354        :return: List of (user_id, bytes) tuples
355        """
356        return sorted(
357            self.bytes_transferred_by_user_id.items(),
358            key=lambda x: x[1],
359            reverse=True,
360        )[:limit]

Get top users by bytes transferred (optimized sorting).

Parameters
  • limit: Number of top users to return
Returns

List of (user_id, bytes) tuples

class ServerNameRequest(pyoutlineapi.common_types.BaseValidatedModel):
504class ServerNameRequest(BaseValidatedModel):
505    """Request model for renaming server.
506
507    SCHEMA: Based on PUT /name request body
508    """
509
510    name: str = Field(min_length=1, max_length=255)

Request model for renaming server.

SCHEMA: Based on PUT /name request body

name: str = PydanticUndefined
class ServerSummary(pyoutlineapi.common_types.BaseValidatedModel):
633class ServerSummary(BaseValidatedModel):
634    """Server summary with optimized aggregations."""
635
636    server: dict[str, Any]
637    access_keys_count: int
638    healthy: bool
639    transfer_metrics: BytesPerUserDict | None = None
640    experimental_metrics: dict[str, Any] | None = None
641    error: str | None = None
642
643    @property
644    def total_bytes_transferred(self) -> int:
645        """Get total bytes with early return optimization.
646
647        :return: Total bytes or 0 if no metrics
648        """
649        if not self.transfer_metrics:
650            return 0  # Early return
651        return sum(self.transfer_metrics.values())
652
653    @property
654    def total_gigabytes_transferred(self) -> float:
655        """Get total GB (uses total_bytes_transferred).
656
657        :return: Total GB or 0.0 if no metrics
658        """
659        return self.total_bytes_transferred / _BYTES_IN_GB
660
661    @property
662    def has_errors(self) -> bool:
663        """Check if summary has errors (optimized None check).
664
665        :return: True if errors present
666        """
667        return self.error is not None

Server summary with optimized aggregations.

server: dict[str, typing.Any] = PydanticUndefined
access_keys_count: int = PydanticUndefined
healthy: bool = PydanticUndefined
transfer_metrics: dict[str, int] | None = None
experimental_metrics: dict[str, typing.Any] | None = None
error: str | None = None
total_bytes_transferred: int
643    @property
644    def total_bytes_transferred(self) -> int:
645        """Get total bytes with early return optimization.
646
647        :return: Total bytes or 0 if no metrics
648        """
649        if not self.transfer_metrics:
650            return 0  # Early return
651        return sum(self.transfer_metrics.values())

Get total bytes with early return optimization.

Returns

Total bytes or 0 if no metrics

total_gigabytes_transferred: float
653    @property
654    def total_gigabytes_transferred(self) -> float:
655        """Get total GB (uses total_bytes_transferred).
656
657        :return: Total GB or 0.0 if no metrics
658        """
659        return self.total_bytes_transferred / _BYTES_IN_GB

Get total GB (uses total_bytes_transferred).

Returns

Total GB or 0.0 if no metrics

has_errors: bool
661    @property
662    def has_errors(self) -> bool:
663        """Check if summary has errors (optimized None check).
664
665        :return: True if errors present
666        """
667        return self.error is not None

Check if summary has errors (optimized None check).

Returns

True if errors present

TimestampMs = typing.Annotated[int, FieldInfo(annotation=NoneType, required=True, description='Unix timestamp in milliseconds', metadata=[Ge(ge=0)])]
TimestampSec = typing.Annotated[int, FieldInfo(annotation=NoneType, required=True, description='Unix timestamp in seconds', metadata=[Ge(ge=0)])]
class TunnelTime(pyoutlineapi.common_types.BaseValidatedModel, pyoutlineapi.models.TimeConversionMixin):
363class TunnelTime(BaseValidatedModel, TimeConversionMixin):
364    """Tunnel time metric with time conversions.
365
366    SCHEMA: Based on experimental metrics tunnelTime object
367    """
368
369    seconds: int = Field(ge=0)

Tunnel time metric with time conversions.

SCHEMA: Based on experimental metrics tunnelTime object

seconds: int = PydanticUndefined
class ValidationError(pyoutlineapi.OutlineError):
362class ValidationError(OutlineError):
363    """Data validation failure.
364
365    Raised when data fails validation against expected schema.
366
367    Attributes:
368        field: Field name that failed validation
369        model: Model name
370
371    Example:
372        >>> error = ValidationError(
373        ...     "Invalid port number", field="port", model="ServerConfig"
374        ... )
375    """
376
377    __slots__ = ("field", "model")
378
379    def __init__(
380        self,
381        message: str,
382        *,
383        field: str | None = None,
384        model: str | None = None,
385    ) -> None:
386        """Initialize validation error.
387
388        Args:
389            message: Error message
390            field: Field name that failed validation
391            model: Model name
392        """
393        safe_details: dict[str, Any] | None = None
394        if field or model:
395            safe_details = {}
396            if field:
397                safe_details["field"] = field
398            if model:
399                safe_details["model"] = model
400
401        super().__init__(message, safe_details=safe_details)
402
403        self.field = field
404        self.model = model

Data validation failure.

Raised when data fails validation against expected schema.

Attributes:
  • field: Field name that failed validation
  • model: Model name
Example:
>>> error = ValidationError(
...     "Invalid port number", field="port", model="ServerConfig"
... )
ValidationError(message: str, *, field: str | None = None, model: str | None = None)
379    def __init__(
380        self,
381        message: str,
382        *,
383        field: str | None = None,
384        model: str | None = None,
385    ) -> None:
386        """Initialize validation error.
387
388        Args:
389            message: Error message
390            field: Field name that failed validation
391            model: Model name
392        """
393        safe_details: dict[str, Any] | None = None
394        if field or model:
395            safe_details = {}
396            if field:
397                safe_details["field"] = field
398            if model:
399                safe_details["model"] = model
400
401        super().__init__(message, safe_details=safe_details)
402
403        self.field = field
404        self.model = model

Initialize validation error.

Arguments:
  • message: Error message
  • field: Field name that failed validation
  • model: Model name
field
model
class Validators:
433class Validators:
434    """Input validation utilities with security hardening."""
435
436    __slots__ = ()
437
438    @staticmethod
439    @lru_cache(maxsize=64)
440    def validate_cert_fingerprint(fingerprint: SecretStr) -> SecretStr:
441        """Validate and normalize certificate fingerprint.
442
443        :param fingerprint: SHA-256 fingerprint
444        :return: Normalized fingerprint (lowercase, no separators)
445        :raises ValueError: If format is invalid
446        """
447        if not fingerprint:
448            raise ValueError("Certificate fingerprint cannot be empty")
449
450        # Remove common separators
451        cleaned = fingerprint.get_secret_value().lower()
452
453        # Validate hex format
454        if not re.match(r"^[a-f0-9]{64}$", cleaned):
455            raise ValueError(
456                f"Invalid certificate fingerprint format. "
457                f"Expected 64 hex characters, got: {len(cleaned)}"
458            )
459
460        return SecretStr(cleaned)
461
462    @staticmethod
463    def validate_port(port: int) -> int:
464        """Validate port number.
465
466        :param port: Port number
467        :return: Validated port
468        :raises ValueError: If port is out of range
469        """
470        if not is_valid_port(port):
471            raise ValueError(
472                f"Port must be between {Constants.MIN_PORT} and {Constants.MAX_PORT}"
473            )
474        return port
475
476    @staticmethod
477    def validate_name(name: str) -> str:
478        """Validate name field.
479
480        :param name: Name to validate
481        :return: Validated name
482        :raises ValueError: If name is invalid
483        """
484        if not name or not name.strip():
485            raise ValueError("Name cannot be empty")
486
487        name = name.strip()
488        if len(name) > Constants.MAX_NAME_LENGTH:
489            raise ValueError(
490                f"Name too long: {len(name)} (max {Constants.MAX_NAME_LENGTH})"
491            )
492
493        return name
494
495    @staticmethod
496    def validate_url(
497        url: str,
498        *,
499        allow_private_networks: bool = True,
500        resolve_dns: bool = False,
501    ) -> str:
502        """Validate and sanitize URL.
503
504        :param url: URL to validate
505        :param allow_private_networks: Allow private/local network addresses
506        :param resolve_dns: Resolve hostname and block private/reserved IPs
507        :return: Validated URL
508        :raises ValueError: If URL is invalid
509        """
510        if not url or not url.strip():
511            raise ValueError("URL cannot be empty")
512
513        url = url.strip()
514
515        if len(url) > Constants.MAX_URL_LENGTH:
516            raise ValueError(
517                f"URL too long: {len(url)} (max {Constants.MAX_URL_LENGTH})"
518            )
519
520        # Check for null bytes
521        if "\x00" in url:
522            raise ValueError("URL contains null bytes")
523
524        # Parse URL
525        try:
526            parsed = urlparse(url)
527            if not parsed.scheme or not parsed.netloc:
528                raise ValueError("Invalid URL format")
529        except Exception as e:
530            raise ValueError(f"Invalid URL: {e}") from e
531
532        # SSRF protection for raw IPs in hostname (does not resolve DNS)
533        if (
534            not allow_private_networks
535            and parsed.hostname
536            and SSRFProtection.is_blocked_ip(parsed.hostname)
537        ):
538            raise ValueError(
539                f"Access to {parsed.hostname} is blocked (SSRF protection)"
540            )
541
542        # Explicitly block localhost when private networks are disallowed
543        if (
544            not allow_private_networks
545            and parsed.hostname in SSRFProtection.ALLOWED_LOCALHOST
546        ):
547            raise ValueError(
548                f"Access to {parsed.hostname} is blocked (SSRF protection)"
549            )
550
551        # Strict SSRF protection with DNS resolution (guards against rebinding)
552        if (
553            resolve_dns
554            and not allow_private_networks
555            and parsed.hostname
556            and not SSRFProtection.is_blocked_ip(parsed.hostname)
557            and SSRFProtection.is_blocked_hostname(parsed.hostname)
558        ):
559            raise ValueError(
560                f"Access to {parsed.hostname} is blocked (SSRF protection)"
561            )
562
563        return url
564
565    @staticmethod
566    def validate_string_not_empty(value: str, field_name: str) -> str:
567        """Validate string is not empty.
568
569        :param value: String value
570        :param field_name: Field name for error messages
571        :return: Stripped string
572        :raises ValueError: If string is empty
573        """
574        if not value or not value.strip():
575            raise ValueError(f"{field_name} cannot be empty")
576        return value.strip()
577
578    @staticmethod
579    def _validate_length(value: str, max_length: int, name: str) -> None:
580        """Validate string length.
581
582        :param value: String value
583        :param max_length: Maximum allowed length
584        :param name: Field name for error messages
585        :raises ValueError: If string is too long
586        """
587        if len(value) > max_length:
588            raise ValueError(f"{name} too long: {len(value)} (max {max_length})")
589
590    @staticmethod
591    def _validate_no_null_bytes(value: str, name: str) -> None:
592        """Validate string contains no null bytes.
593
594        :param value: String value
595        :param name: Field name for error messages
596        :raises ValueError: If string contains null bytes
597        """
598        if "\x00" in value:
599            raise ValueError(f"{name} contains null bytes")
600
601    @staticmethod
602    def validate_non_negative(value: DataLimit | int, name: str) -> int:
603        """Validate integer is non-negative.
604
605        :param value: Integer value
606        :param name: Field name for error messages
607        :return: Validated value
608        :raises ValueError: If value is negative
609        """
610        from .models import DataLimit
611
612        raw_value = value.bytes if isinstance(value, DataLimit) else value
613        if raw_value < 0:
614            raise ValueError(f"{name} must be non-negative, got {raw_value}")
615        return raw_value
616
617    @staticmethod
618    def validate_since(value: str) -> str:
619        """Validate experimental metrics 'since' parameter.
620
621        Accepts:
622        - Relative durations: 24h, 7d, 30m, 15s
623        - ISO-8601 timestamps (e.g., 2024-01-01T00:00:00Z)
624
625        :param value: Since parameter
626        :return: Sanitized since value
627        :raises ValueError: If value is invalid
628        """
629        if not value or not value.strip():
630            raise ValueError("'since' parameter cannot be empty")
631
632        sanitized = value.strip()
633
634        # Relative format (number + suffix)
635        if len(sanitized) >= 2 and sanitized[-1] in {"h", "d", "m", "s"}:
636            number = sanitized[:-1]
637            if number.isdigit():
638                return sanitized
639
640        # ISO-8601 timestamp (allow trailing Z)
641        iso_value = sanitized.replace("Z", "+00:00")
642        try:
643            datetime.fromisoformat(iso_value)
644            return sanitized
645        except ValueError:
646            raise ValueError(
647                "'since' must be a relative duration (e.g., '24h', '7d') "
648                "or ISO-8601 timestamp"
649            ) from None
650
651    @classmethod
652    @lru_cache(maxsize=256)
653    def validate_key_id(cls, key_id: str) -> str:
654        """Enhanced key_id validation.
655
656        :param key_id: Key ID to validate
657        :return: Validated key ID
658        :raises ValueError: If key ID is invalid
659        """
660        clean_id = cls.validate_string_not_empty(key_id, "key_id")
661        cls._validate_length(clean_id, Constants.MAX_KEY_ID_LENGTH, "key_id")
662        cls._validate_no_null_bytes(clean_id, "key_id")
663
664        try:
665            decoded = urllib.parse.unquote(clean_id)
666            double_decoded = urllib.parse.unquote(decoded)
667
668            # Check all variants for malicious characters
669            for variant in [clean_id, decoded, double_decoded]:
670                if any(c in variant for c in {".", "/", "\\", "%", "\x00"}):
671                    raise ValueError(
672                        "key_id contains invalid characters (., /, \\, %, null)"
673                    )
674        except Exception as e:
675            raise ValueError(f"Invalid key_id encoding: {e}") from e
676
677        # Strict whitelist approach
678        allowed_chars = frozenset(
679            "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-"
680        )
681        if not all(c in allowed_chars for c in clean_id):
682            raise ValueError("key_id must be alphanumeric, dashes, underscores only")
683
684        return clean_id
685
686    @staticmethod
687    @lru_cache(maxsize=256)
688    def sanitize_url_for_logging(url: str) -> str:
689        """Remove secret path from URL for safe logging.
690
691        :param url: URL to sanitize
692        :return: Sanitized URL
693        """
694        try:
695            parsed = urlparse(url)
696            return f"{parsed.scheme}://{parsed.netloc}/***"
697        except Exception:
698            return "***INVALID_URL***"
699
700    @staticmethod
701    @lru_cache(maxsize=512)
702    def sanitize_endpoint_for_logging(endpoint: str) -> str:
703        """Sanitize endpoint for safe logging.
704
705        :param endpoint: Endpoint to sanitize
706        :return: Sanitized endpoint
707        """
708        if not endpoint:
709            return "***EMPTY***"
710
711        parts = endpoint.split("/")
712        sanitized = [part if len(part) <= 20 else "***" for part in parts]
713        return "/".join(sanitized)

Input validation utilities with security hardening.

@staticmethod
@lru_cache(maxsize=64)
def validate_cert_fingerprint(fingerprint: pydantic.types.SecretStr) -> pydantic.types.SecretStr:
438    @staticmethod
439    @lru_cache(maxsize=64)
440    def validate_cert_fingerprint(fingerprint: SecretStr) -> SecretStr:
441        """Validate and normalize certificate fingerprint.
442
443        :param fingerprint: SHA-256 fingerprint
444        :return: Normalized fingerprint (lowercase, no separators)
445        :raises ValueError: If format is invalid
446        """
447        if not fingerprint:
448            raise ValueError("Certificate fingerprint cannot be empty")
449
450        # Remove common separators
451        cleaned = fingerprint.get_secret_value().lower()
452
453        # Validate hex format
454        if not re.match(r"^[a-f0-9]{64}$", cleaned):
455            raise ValueError(
456                f"Invalid certificate fingerprint format. "
457                f"Expected 64 hex characters, got: {len(cleaned)}"
458            )
459
460        return SecretStr(cleaned)

Validate and normalize certificate fingerprint.

Parameters
  • fingerprint: SHA-256 fingerprint
Returns

Normalized fingerprint (lowercase, no separators)

Raises
  • ValueError: If format is invalid
@staticmethod
def validate_port(port: int) -> int:
462    @staticmethod
463    def validate_port(port: int) -> int:
464        """Validate port number.
465
466        :param port: Port number
467        :return: Validated port
468        :raises ValueError: If port is out of range
469        """
470        if not is_valid_port(port):
471            raise ValueError(
472                f"Port must be between {Constants.MIN_PORT} and {Constants.MAX_PORT}"
473            )
474        return port

Validate port number.

Parameters
  • port: Port number
Returns

Validated port

Raises
  • ValueError: If port is out of range
@staticmethod
def validate_name(name: str) -> str:
476    @staticmethod
477    def validate_name(name: str) -> str:
478        """Validate name field.
479
480        :param name: Name to validate
481        :return: Validated name
482        :raises ValueError: If name is invalid
483        """
484        if not name or not name.strip():
485            raise ValueError("Name cannot be empty")
486
487        name = name.strip()
488        if len(name) > Constants.MAX_NAME_LENGTH:
489            raise ValueError(
490                f"Name too long: {len(name)} (max {Constants.MAX_NAME_LENGTH})"
491            )
492
493        return name

Validate name field.

Parameters
  • name: Name to validate
Returns

Validated name

Raises
  • ValueError: If name is invalid
@staticmethod
def validate_url( url: str, *, allow_private_networks: bool = True, resolve_dns: bool = False) -> str:
495    @staticmethod
496    def validate_url(
497        url: str,
498        *,
499        allow_private_networks: bool = True,
500        resolve_dns: bool = False,
501    ) -> str:
502        """Validate and sanitize URL.
503
504        :param url: URL to validate
505        :param allow_private_networks: Allow private/local network addresses
506        :param resolve_dns: Resolve hostname and block private/reserved IPs
507        :return: Validated URL
508        :raises ValueError: If URL is invalid
509        """
510        if not url or not url.strip():
511            raise ValueError("URL cannot be empty")
512
513        url = url.strip()
514
515        if len(url) > Constants.MAX_URL_LENGTH:
516            raise ValueError(
517                f"URL too long: {len(url)} (max {Constants.MAX_URL_LENGTH})"
518            )
519
520        # Check for null bytes
521        if "\x00" in url:
522            raise ValueError("URL contains null bytes")
523
524        # Parse URL
525        try:
526            parsed = urlparse(url)
527            if not parsed.scheme or not parsed.netloc:
528                raise ValueError("Invalid URL format")
529        except Exception as e:
530            raise ValueError(f"Invalid URL: {e}") from e
531
532        # SSRF protection for raw IPs in hostname (does not resolve DNS)
533        if (
534            not allow_private_networks
535            and parsed.hostname
536            and SSRFProtection.is_blocked_ip(parsed.hostname)
537        ):
538            raise ValueError(
539                f"Access to {parsed.hostname} is blocked (SSRF protection)"
540            )
541
542        # Explicitly block localhost when private networks are disallowed
543        if (
544            not allow_private_networks
545            and parsed.hostname in SSRFProtection.ALLOWED_LOCALHOST
546        ):
547            raise ValueError(
548                f"Access to {parsed.hostname} is blocked (SSRF protection)"
549            )
550
551        # Strict SSRF protection with DNS resolution (guards against rebinding)
552        if (
553            resolve_dns
554            and not allow_private_networks
555            and parsed.hostname
556            and not SSRFProtection.is_blocked_ip(parsed.hostname)
557            and SSRFProtection.is_blocked_hostname(parsed.hostname)
558        ):
559            raise ValueError(
560                f"Access to {parsed.hostname} is blocked (SSRF protection)"
561            )
562
563        return url

Validate and sanitize URL.

Parameters
  • url: URL to validate
  • allow_private_networks: Allow private/local network addresses
  • resolve_dns: Resolve hostname and block private/reserved IPs
Returns

Validated URL

Raises
  • ValueError: If URL is invalid
@staticmethod
def validate_string_not_empty(value: str, field_name: str) -> str:
565    @staticmethod
566    def validate_string_not_empty(value: str, field_name: str) -> str:
567        """Validate string is not empty.
568
569        :param value: String value
570        :param field_name: Field name for error messages
571        :return: Stripped string
572        :raises ValueError: If string is empty
573        """
574        if not value or not value.strip():
575            raise ValueError(f"{field_name} cannot be empty")
576        return value.strip()

Validate string is not empty.

Parameters
  • value: String value
  • field_name: Field name for error messages
Returns

Stripped string

Raises
  • ValueError: If string is empty
@staticmethod
def validate_non_negative(value: DataLimit | int, name: str) -> int:
601    @staticmethod
602    def validate_non_negative(value: DataLimit | int, name: str) -> int:
603        """Validate integer is non-negative.
604
605        :param value: Integer value
606        :param name: Field name for error messages
607        :return: Validated value
608        :raises ValueError: If value is negative
609        """
610        from .models import DataLimit
611
612        raw_value = value.bytes if isinstance(value, DataLimit) else value
613        if raw_value < 0:
614            raise ValueError(f"{name} must be non-negative, got {raw_value}")
615        return raw_value

Validate integer is non-negative.

Parameters
  • value: Integer value
  • name: Field name for error messages
Returns

Validated value

Raises
  • ValueError: If value is negative
@staticmethod
def validate_since(value: str) -> str:
617    @staticmethod
618    def validate_since(value: str) -> str:
619        """Validate experimental metrics 'since' parameter.
620
621        Accepts:
622        - Relative durations: 24h, 7d, 30m, 15s
623        - ISO-8601 timestamps (e.g., 2024-01-01T00:00:00Z)
624
625        :param value: Since parameter
626        :return: Sanitized since value
627        :raises ValueError: If value is invalid
628        """
629        if not value or not value.strip():
630            raise ValueError("'since' parameter cannot be empty")
631
632        sanitized = value.strip()
633
634        # Relative format (number + suffix)
635        if len(sanitized) >= 2 and sanitized[-1] in {"h", "d", "m", "s"}:
636            number = sanitized[:-1]
637            if number.isdigit():
638                return sanitized
639
640        # ISO-8601 timestamp (allow trailing Z)
641        iso_value = sanitized.replace("Z", "+00:00")
642        try:
643            datetime.fromisoformat(iso_value)
644            return sanitized
645        except ValueError:
646            raise ValueError(
647                "'since' must be a relative duration (e.g., '24h', '7d') "
648                "or ISO-8601 timestamp"
649            ) from None

Validate experimental metrics 'since' parameter.

Accepts:

  • Relative durations: 24h, 7d, 30m, 15s
  • ISO-8601 timestamps (e.g., 2024-01-01T00:00:00Z)
Parameters
  • value: Since parameter
Returns

Sanitized since value

Raises
  • ValueError: If value is invalid
@classmethod
@lru_cache(maxsize=256)
def validate_key_id(cls, key_id: str) -> str:
651    @classmethod
652    @lru_cache(maxsize=256)
653    def validate_key_id(cls, key_id: str) -> str:
654        """Enhanced key_id validation.
655
656        :param key_id: Key ID to validate
657        :return: Validated key ID
658        :raises ValueError: If key ID is invalid
659        """
660        clean_id = cls.validate_string_not_empty(key_id, "key_id")
661        cls._validate_length(clean_id, Constants.MAX_KEY_ID_LENGTH, "key_id")
662        cls._validate_no_null_bytes(clean_id, "key_id")
663
664        try:
665            decoded = urllib.parse.unquote(clean_id)
666            double_decoded = urllib.parse.unquote(decoded)
667
668            # Check all variants for malicious characters
669            for variant in [clean_id, decoded, double_decoded]:
670                if any(c in variant for c in {".", "/", "\\", "%", "\x00"}):
671                    raise ValueError(
672                        "key_id contains invalid characters (., /, \\, %, null)"
673                    )
674        except Exception as e:
675            raise ValueError(f"Invalid key_id encoding: {e}") from e
676
677        # Strict whitelist approach
678        allowed_chars = frozenset(
679            "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-"
680        )
681        if not all(c in allowed_chars for c in clean_id):
682            raise ValueError("key_id must be alphanumeric, dashes, underscores only")
683
684        return clean_id

Enhanced key_id validation.

Parameters
  • key_id: Key ID to validate
Returns

Validated key ID

Raises
  • ValueError: If key ID is invalid
@staticmethod
@lru_cache(maxsize=256)
def sanitize_url_for_logging(url: str) -> str:
686    @staticmethod
687    @lru_cache(maxsize=256)
688    def sanitize_url_for_logging(url: str) -> str:
689        """Remove secret path from URL for safe logging.
690
691        :param url: URL to sanitize
692        :return: Sanitized URL
693        """
694        try:
695            parsed = urlparse(url)
696            return f"{parsed.scheme}://{parsed.netloc}/***"
697        except Exception:
698            return "***INVALID_URL***"

Remove secret path from URL for safe logging.

Parameters
  • url: URL to sanitize
Returns

Sanitized URL

@staticmethod
@lru_cache(maxsize=512)
def sanitize_endpoint_for_logging(endpoint: str) -> str:
700    @staticmethod
701    @lru_cache(maxsize=512)
702    def sanitize_endpoint_for_logging(endpoint: str) -> str:
703        """Sanitize endpoint for safe logging.
704
705        :param endpoint: Endpoint to sanitize
706        :return: Sanitized endpoint
707        """
708        if not endpoint:
709            return "***EMPTY***"
710
711        parts = endpoint.split("/")
712        sanitized = [part if len(part) <= 20 else "***" for part in parts]
713        return "/".join(sanitized)

Sanitize endpoint for safe logging.

Parameters
  • endpoint: Endpoint to sanitize
Returns

Sanitized endpoint

__author__: Final[str] = 'Denis Rozhnovskiy'
__email__: Final[str] = 'pytelemonbot@mail.ru'
__license__: Final[str] = 'MIT'
__version__: str = '0.4.0'
def audited( *, log_success: bool = True, log_failure: bool = True) -> Callable[[Callable[~P, ~R]], Callable[~P, ~R]]:
575def audited(
576    *,
577    log_success: bool = True,
578    log_failure: bool = True,
579) -> Callable[[Callable[P, R]], Callable[P, R]]:
580    """Audit logging decorator with zero-config smart extraction.
581
582    Automatically extracts ALL information from function signature and execution:
583    - Action name: from function name
584    - Resource: from result.id, first parameter, or function analysis
585    - Details: from function signature (excluding None and defaults)
586    - Correlation ID: from instance._correlation_id if available
587    - Success/failure: from exception handling
588
589    Usage:
590        @audited()
591        async def create_access_key(self, name: str, port: int = 8080) -> AccessKey:
592            # action: "create_access_key"
593            # resource: result.id
594            # details: {"name": "...", "port": 8080} (if not default)
595            ...
596
597        @audited(log_success=False)
598        async def critical_operation(self, resource_id: str) -> bool:
599            # Only logs failures for alerting
600            ...
601
602    :param log_success: Log successful operations (default: True)
603    :param log_failure: Log failed operations (default: True)
604    :return: Decorated function with automatic audit logging
605    """
606
607    def decorator(func: Callable[P, R]) -> Callable[P, R]:
608        # Determine if function is async at decoration time
609        is_async = inspect.iscoroutinefunction(func)
610
611        if is_async:
612            async_func = cast("Callable[P, Awaitable[object]]", func)
613
614            @wraps(func)
615            async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> object:
616                # Check for audit logger on instance
617                instance = args[0] if args else None
618                audit_logger = getattr(instance, "_audit_logger", None)
619
620                # No logger? Execute without audit
621                if audit_logger is None:
622                    return await async_func(*args, **kwargs)
623
624                result: object | None = None
625
626                try:
627                    result = await async_func(*args, **kwargs)
628                except Exception as e:
629                    if log_failure:
630                        ctx = AuditContext.from_call(
631                            func=func,
632                            instance=instance,
633                            args=args,
634                            kwargs=kwargs,
635                            result=result,
636                            exception=e,
637                        )
638                        task = asyncio.create_task(
639                            audit_logger.alog_action(
640                                action=ctx.action,
641                                resource=ctx.resource,
642                                details=ctx.details,
643                                correlation_id=ctx.correlation_id,
644                            )
645                        )
646                        task.add_done_callback(lambda t: t.exception())
647                    raise
648                else:
649                    if log_success:
650                        ctx = AuditContext.from_call(
651                            func=func,
652                            instance=instance,
653                            args=args,
654                            kwargs=kwargs,
655                            result=result,
656                            exception=None,
657                        )
658                        task = asyncio.create_task(
659                            audit_logger.alog_action(
660                                action=ctx.action,
661                                resource=ctx.resource,
662                                details=ctx.details,
663                                correlation_id=ctx.correlation_id,
664                            )
665                        )
666                        task.add_done_callback(lambda t: t.exception())
667                    return result
668
669            return cast("Callable[P, R]", async_wrapper)
670
671        else:
672            sync_func = cast("Callable[P, object]", func)
673
674            @wraps(func)
675            def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> object:
676                # Check for audit logger on instance
677                instance = args[0] if args else None
678                audit_logger = getattr(instance, "_audit_logger", None)
679
680                # No logger? Execute without audit
681                if audit_logger is None:
682                    return sync_func(*args, **kwargs)
683
684                result: object | None = None
685
686                try:
687                    result = sync_func(*args, **kwargs)
688                except Exception as e:
689                    if log_failure:
690                        ctx = AuditContext.from_call(
691                            func=func,
692                            instance=instance,
693                            args=args,
694                            kwargs=kwargs,
695                            result=result,
696                            exception=e,
697                        )
698                        audit_logger.log_action(
699                            action=ctx.action,
700                            resource=ctx.resource,
701                            details=ctx.details,
702                            correlation_id=ctx.correlation_id,
703                        )
704                    raise
705                else:
706                    if log_success:
707                        ctx = AuditContext.from_call(
708                            func=func,
709                            instance=instance,
710                            args=args,
711                            kwargs=kwargs,
712                            result=result,
713                            exception=None,
714                        )
715                        audit_logger.log_action(
716                            action=ctx.action,
717                            resource=ctx.resource,
718                            details=ctx.details,
719                            correlation_id=ctx.correlation_id,
720                        )
721                    return result
722
723            return cast("Callable[P, R]", sync_wrapper)
724
725    return decorator

Audit logging decorator with zero-config smart extraction.

Automatically extracts ALL information from function signature and execution:

  • Action name: from function name
  • Resource: from result.id, first parameter, or function analysis
  • Details: from function signature (excluding None and defaults)
  • Correlation ID: from instance._correlation_id if available
  • Success/failure: from exception handling
Usage:

@audited() async def create_access_key(self, name: str, port: int = 8080) -> AccessKey: # action: "create_access_key" # resource: result.id # details: {"name": "...", "port": 8080} (if not default) ...

@audited(log_success=False) async def critical_operation(self, resource_id: str) -> bool: # Only logs failures for alerting ...

Parameters
  • log_success: Log successful operations (default: True)
  • log_failure: Log failed operations (default: True)
Returns

Decorated function with automatic audit logging

def build_config_overrides( **kwargs: int | str | bool | float | None) -> dict[str, int | str | bool | float | None]:
772def build_config_overrides(
773    **kwargs: int | str | bool | float | None,
774) -> dict[str, int | str | bool | float | None]:
775    """Build configuration overrides dictionary from kwargs.
776
777    DRY implementation - single source of truth for config building.
778
779    :param kwargs: Configuration parameters
780    :return: Dictionary containing only non-None values
781
782    Example:
783        >>> overrides = build_config_overrides(timeout=20, enable_logging=True)
784        >>> # Returns: {'timeout': 20, 'enable_logging': True}
785    """
786    valid_keys = ConfigOverrides.__annotations__.keys()
787    return {k: v for k, v in kwargs.items() if k in valid_keys and v is not None}

Build configuration overrides dictionary from kwargs.

DRY implementation - single source of truth for config building.

Parameters
  • kwargs: Configuration parameters
Returns

Dictionary containing only non-None values

Example:
>>> overrides = build_config_overrides(timeout=20, enable_logging=True)
>>> # Returns: {'timeout': 20, 'enable_logging': True}
correlation_id = <ContextVar name='correlation_id' default=''>
def create_client( api_url: str, cert_sha256: str, *, audit_logger: AuditLogger | None = None, metrics: MetricsCollector | None = None, **overrides: Unpack[ConfigOverrides]) -> AsyncOutlineClient:
888def create_client(
889    api_url: str,
890    cert_sha256: str,
891    *,
892    audit_logger: AuditLogger | None = None,
893    metrics: MetricsCollector | None = None,
894    **overrides: Unpack[ConfigOverrides],
895) -> AsyncOutlineClient:
896    """Create client with minimal parameters.
897
898    Convenience function for quick client creation without
899    explicit configuration object. Uses modern **overrides approach.
900
901    :param api_url: API URL with secret path
902    :param cert_sha256: SHA-256 certificate fingerprint
903    :param audit_logger: Custom audit logger (optional)
904    :param metrics: Custom metrics collector (optional)
905    :param overrides: Configuration overrides (timeout, retry_attempts, etc.)
906    :return: Configured client instance (use with async context manager)
907    :raises ConfigurationError: If parameters are invalid
908
909    Example (advanced, prefer from_env for production):
910        >>> async with AsyncOutlineClient.from_env() as client:
911        ...     info = await client.get_server_info()
912    """
913    return AsyncOutlineClient(
914        api_url=api_url,
915        cert_sha256=cert_sha256,
916        audit_logger=audit_logger,
917        metrics=metrics,
918        **overrides,
919    )

Create client with minimal parameters.

Convenience function for quick client creation without explicit configuration object. Uses modern **overrides approach.

Parameters
  • api_url: API URL with secret path
  • cert_sha256: SHA-256 certificate fingerprint
  • audit_logger: Custom audit logger (optional)
  • metrics: Custom metrics collector (optional)
  • overrides: Configuration overrides (timeout, retry_attempts, etc.)
Returns

Configured client instance (use with async context manager)

Raises
  • ConfigurationError: If parameters are invalid

Example (advanced, prefer from_env for production):

async with AsyncOutlineClient.from_env() as client: ... info = await client.get_server_info()

def create_env_template(path: str | pathlib._local.Path = '.env.example') -> None:
593def create_env_template(path: str | Path = ".env.example") -> None:
594    """Create .env template file (optimized I/O).
595
596    Performance: Uses cached template and efficient Path operations
597
598    :param path: Path to template file
599    """
600    # Pattern matching for path handling
601    match path:
602        case str():
603            target_path = Path(path)
604        case Path():
605            target_path = path
606        case _:
607            raise TypeError(f"path must be str or Path, got {type(path).__name__}")
608
609    # Use cached template
610    template = _get_env_template()
611    target_path.write_text(template, encoding="utf-8")
612
613    _log_if_enabled(
614        logging.INFO,
615        f"Created configuration template: {target_path}",
616    )

Create .env template file (optimized I/O).

Performance: Uses cached template and efficient Path operations

Parameters
  • path: Path to template file
def create_multi_server_manager( configs: Sequence[OutlineClientConfig], *, audit_logger: AuditLogger | None = None, metrics: MetricsCollector | None = None, default_timeout: float = 5.0) -> MultiServerManager:
922def create_multi_server_manager(
923    configs: Sequence[OutlineClientConfig],
924    *,
925    audit_logger: AuditLogger | None = None,
926    metrics: MetricsCollector | None = None,
927    default_timeout: float = _DEFAULT_SERVER_TIMEOUT,
928) -> MultiServerManager:
929    """Create multiserver manager with configurations.
930
931    Convenience function for creating a manager for multiple servers.
932
933    :param configs: Sequence of server configurations
934    :param audit_logger: Shared audit logger
935    :param metrics: Shared metrics collector
936    :param default_timeout: Default operation timeout
937    :return: MultiServerManager instance (use with async context manager)
938    :raises ConfigurationError: If configurations are invalid
939
940    Example:
941        >>> configs = [
942        ...     OutlineClientConfig.create_minimal("https://s1.com/path", "a" * 64),
943        ...     OutlineClientConfig.create_minimal("https://s2.com/path", "b" * 64),
944        ... ]
945        >>> async with create_multi_server_manager(configs) as manager:
946        ...     health = await manager.health_check_all()
947    """
948    return MultiServerManager(
949        configs=configs,
950        audit_logger=audit_logger,
951        metrics=metrics,
952        default_timeout=default_timeout,
953    )

Create multiserver manager with configurations.

Convenience function for creating a manager for multiple servers.

Parameters
  • configs: Sequence of server configurations
  • audit_logger: Shared audit logger
  • metrics: Shared metrics collector
  • default_timeout: Default operation timeout
Returns

MultiServerManager instance (use with async context manager)

Raises
  • ConfigurationError: If configurations are invalid
Example:
>>> configs = [
...     OutlineClientConfig.create_minimal("https://s1.com/path", "a" * 64),
...     OutlineClientConfig.create_minimal("https://s2.com/path", "b" * 64),
... ]
>>> async with create_multi_server_manager(configs) as manager:
...     health = await manager.health_check_all()
def format_error_chain(error: Exception) -> list[dict[str, typing.Any]]:
606def format_error_chain(error: Exception) -> list[dict[str, Any]]:
607    """Format exception chain for structured logging.
608
609    Args:
610        error: Exception to format
611
612    Returns:
613        List of error dictionaries ordered from root to leaf
614
615    Example:
616        >>> try:
617        ...     raise ValueError("Inner") from KeyError("Outer")
618        ... except Exception as e:
619        ...     chain = format_error_chain(e)
620        ...     len(chain)  # 2
621    """
622    # Pre-allocate with reasonable size hint (most chains are 1-3 errors)
623    chain: list[dict[str, Any]] = []
624    current: BaseException | None = error
625
626    while current is not None:
627        chain.append(get_safe_error_dict(current))
628        current = current.__cause__ or current.__context__
629
630    return chain

Format exception chain for structured logging.

Arguments:
  • error: Exception to format
Returns:

List of error dictionaries ordered from root to leaf

Example:
>>> try:
...     raise ValueError("Inner") from KeyError("Outer")
... except Exception as e:
...     chain = format_error_chain(e)
...     len(chain)  # 2
def get_audit_logger() -> AuditLogger | None:
778def get_audit_logger() -> AuditLogger | None:
779    """Get audit logger from current context.
780
781    :return: Audit logger instance or None
782    """
783    return _audit_logger_context.get()

Get audit logger from current context.

Returns

Audit logger instance or None

def get_or_create_audit_logger(instance_id: int | None = None) -> AuditLogger:
786def get_or_create_audit_logger(instance_id: int | None = None) -> AuditLogger:
787    """Get or create audit logger with weak reference caching.
788
789    :param instance_id: Instance ID for caching (optional)
790    :return: Audit logger instance
791    """
792    # Try context first
793    ctx_logger = _audit_logger_context.get()
794    if ctx_logger is not None:
795        return ctx_logger
796
797    # Try cache if instance_id provided
798    if instance_id is not None:
799        cached = _logger_cache.get(instance_id)
800        if cached is not None:
801            return cached
802
803    # Create new logger
804    logger_instance = DefaultAuditLogger()
805
806    # Cache if instance_id provided
807    if instance_id is not None:
808        _logger_cache[instance_id] = cast(AuditLogger, logger_instance)
809
810    return cast(AuditLogger, logger_instance)

Get or create audit logger with weak reference caching.

Parameters
  • instance_id: Instance ID for caching (optional)
Returns

Audit logger instance

def get_retry_delay(error: Exception) -> float | None:
504def get_retry_delay(error: Exception) -> float | None:
505    """Get suggested retry delay for an error.
506
507    Args:
508        error: Exception to check
509
510    Returns:
511        Retry delay in seconds, or None if not retryable
512
513    Example:
514        >>> error = OutlineTimeoutError("Timeout")
515        >>> get_retry_delay(error)  # 2.0
516    """
517    if not isinstance(error, OutlineError):
518        return None
519    if not error.is_retryable:
520        return None
521    return error.default_retry_delay

Get suggested retry delay for an error.

Arguments:
  • error: Exception to check
Returns:

Retry delay in seconds, or None if not retryable

Example:
>>> error = OutlineTimeoutError("Timeout")
>>> get_retry_delay(error)  # 2.0
def get_safe_error_dict(error: BaseException) -> dict[str, typing.Any]:
542def get_safe_error_dict(error: BaseException) -> dict[str, Any]:
543    """Extract safe error information for logging.
544
545    Returns only safe information without sensitive data.
546
547    Args:
548        error: Exception to convert
549
550    Returns:
551        Safe error dictionary suitable for logging
552
553    Example:
554        >>> error = APIError("Not found", status_code=404)
555        >>> get_safe_error_dict(error)
556        {'type': 'APIError', 'message': 'Not found', 'status_code': 404, ...}
557    """
558    result: dict[str, Any] = {
559        "type": type(error).__name__,
560        "message": str(error),
561    }
562
563    if not isinstance(error, OutlineError):
564        return result
565
566    result.update(
567        {
568            "retryable": error.is_retryable,
569            "retry_delay": error.default_retry_delay,
570            "safe_details": error.safe_details,
571        }
572    )
573
574    match error:
575        case APIError():
576            result["status_code"] = error.status_code
577            # Only compute these if status_code is not None
578            if error.status_code is not None:
579                result["is_client_error"] = error.is_client_error
580                result["is_server_error"] = error.is_server_error
581        case CircuitOpenError():
582            result["retry_after"] = error.retry_after
583        case ConfigurationError():
584            if error.field is not None:
585                result["field"] = error.field
586            result["security_issue"] = error.security_issue
587        case ValidationError():
588            if error.field is not None:
589                result["field"] = error.field
590            if error.model is not None:
591                result["model"] = error.model
592        case OutlineConnectionError():
593            if error.host is not None:
594                result["host"] = error.host
595            if error.port is not None:
596                result["port"] = error.port
597        case OutlineTimeoutError():
598            if error.timeout is not None:
599                result["timeout"] = error.timeout
600            if error.operation is not None:
601                result["operation"] = error.operation
602
603    return result

Extract safe error information for logging.

Returns only safe information without sensitive data.

Arguments:
  • error: Exception to convert
Returns:

Safe error dictionary suitable for logging

Example:
>>> error = APIError("Not found", status_code=404)
>>> get_safe_error_dict(error)
{'type': 'APIError', 'message': 'Not found', 'status_code': 404, ...}
def get_version() -> str:
258def get_version() -> str:
259    """Get package version string.
260
261    :return: Package version
262    """
263    return __version__

Get package version string.

Returns

Package version

def is_json_serializable( value: object) -> TypeGuard[str | int | float | bool | None | dict[str, str | int | float | bool | None | dict[str, ForwardRef('JsonValue')] | list[ForwardRef('JsonValue')]] | list[str | int | float | bool | None | dict[str, ForwardRef('JsonValue')] | list[ForwardRef('JsonValue')]]]:
413def is_json_serializable(value: object) -> TypeGuard[JsonValue]:
414    """Type guard for JSON-serializable values.
415
416    :param value: Value to check
417    :return: True if value is JSON-serializable
418    """
419    if value is None or isinstance(value, str | int | float | bool):
420        return True
421    if isinstance(value, dict):
422        return all(
423            isinstance(k, str) and is_json_serializable(v) for k, v in value.items()
424        )
425    if isinstance(value, list):
426        return all(is_json_serializable(item) for item in value)
427    return False

Type guard for JSON-serializable values.

Parameters
  • value: Value to check
Returns

True if value is JSON-serializable

def is_retryable(error: Exception) -> bool:
524def is_retryable(error: Exception) -> bool:
525    """Check if error should be retried.
526
527    Args:
528        error: Exception to check
529
530    Returns:
531        True if error is retryable
532
533    Example:
534        >>> error = APIError("Server error", status_code=503)
535        >>> is_retryable(error)  # True
536    """
537    if isinstance(error, OutlineError):
538        return error.is_retryable
539    return False

Check if error should be retried.

Arguments:
  • error: Exception to check
Returns:

True if error is retryable

Example:
>>> error = APIError("Server error", status_code=503)
>>> is_retryable(error)  # True
def is_valid_bytes(value: object) -> TypeGuard[int]:
404def is_valid_bytes(value: object) -> TypeGuard[int]:
405    """Type guard for valid byte counts.
406
407    :param value: Value to check
408    :return: True if value is valid bytes
409    """
410    return isinstance(value, int) and value >= 0

Type guard for valid byte counts.

Parameters
  • value: Value to check
Returns

True if value is valid bytes

def is_valid_port(value: object) -> TypeGuard[int]:
395def is_valid_port(value: object) -> TypeGuard[int]:
396    """Type guard for valid port numbers.
397
398    :param value: Value to check
399    :return: True if value is valid port
400    """
401    return isinstance(value, int) and Constants.MIN_PORT <= value <= Constants.MAX_PORT

Type guard for valid port numbers.

Parameters
  • value: Value to check
Returns

True if value is valid port

def load_config( environment: str = 'custom', **overrides: int | str | bool | float) -> OutlineClientConfig:
619def load_config(
620    environment: str = "custom",
621    **overrides: ConfigValue,
622) -> OutlineClientConfig:
623    """Load configuration for environment (optimized lookup).
624
625    :param environment: Environment name (development, production, custom)
626    :param overrides: Configuration parameters to override
627    :return: Configuration instance
628    :raises ValueError: If environment name is invalid
629
630    Example:
631        >>> config = load_config("production", timeout=20)
632    """
633    env_lower = environment.lower()
634
635    # Fast validation with frozenset
636    if env_lower not in _VALID_ENVIRONMENTS:
637        valid_envs = ", ".join(sorted(_VALID_ENVIRONMENTS))
638        raise ValueError(f"Invalid environment '{environment}'. Valid: {valid_envs}")
639
640    # Pattern matching for config selection (Python 3.10+)
641    config_class: type[OutlineClientConfig]
642    match env_lower:
643        case "development" | "dev":
644            config_class = DevelopmentConfig
645        case "production" | "prod":
646            config_class = ProductionConfig
647        case "custom":
648            config_class = OutlineClientConfig
649        case _:  # Should never reach due to validation above
650            config_class = OutlineClientConfig
651
652    # Optimized override filtering
653    valid_keys = frozenset(ConfigOverrides.__annotations__.keys())
654    filtered_overrides = cast(
655        ConfigOverrides,
656        {k: v for k, v in overrides.items() if k in valid_keys},
657    )
658
659    return config_class(  # type: ignore[call-arg, unused-ignore]
660        **filtered_overrides
661    )

Load configuration for environment (optimized lookup).

Parameters
  • environment: Environment name (development, production, custom)
  • overrides: Configuration parameters to override
Returns

Configuration instance

Raises
  • ValueError: If environment name is invalid
Example:
>>> config = load_config("production", timeout=20)
def mask_sensitive_data( data: Mapping[str, typing.Any], *, sensitive_keys: frozenset[str] | None = None, _depth: int = 0) -> dict[str, typing.Any]:
806def mask_sensitive_data(
807    data: Mapping[str, Any],
808    *,
809    sensitive_keys: frozenset[str] | None = None,
810    _depth: int = 0,
811) -> dict[str, Any]:
812    """Sensitive data masking with lazy copying and optimized recursion.
813
814    Uses lazy copying - only creates new dict when needed.
815    Includes recursion depth protection.
816
817    :param data: Data dictionary to mask
818    :param sensitive_keys: Set of sensitive key names (case-insensitive matching)
819    :param _depth: Current recursion depth (internal)
820    :return: Masked data dictionary (may be same object if no sensitive data found)
821    """
822    # Guard against infinite recursion
823    if _depth > Constants.MAX_RECURSION_DEPTH:
824        return {"_error": "Max recursion depth exceeded"}
825
826    keys_to_mask = sensitive_keys or DEFAULT_SENSITIVE_KEYS
827    keys_lower = {k.lower() for k in keys_to_mask}
828
829    masked: dict[str, Any] | None = None
830
831    for key, value in data.items():
832        # Check if key is sensitive
833        if key.lower() in keys_lower:
834            if masked is None:
835                masked = dict(data)
836            masked[key] = "***MASKED***"
837            continue
838
839        # Recursively handle nested dicts
840        if isinstance(value, dict):
841            nested = mask_sensitive_data(
842                value, sensitive_keys=keys_to_mask, _depth=_depth + 1
843            )
844            if nested is not value:
845                if masked is None:
846                    masked = dict(data)
847                masked[key] = nested
848
849        # Handle lists containing dicts
850        elif isinstance(value, list):
851            new_list: list[Any] = []
852            list_modified = False
853
854            for item in value:
855                if isinstance(item, dict):
856                    masked_item = mask_sensitive_data(
857                        item, sensitive_keys=keys_to_mask, _depth=_depth + 1
858                    )
859                    if masked_item is not item:
860                        list_modified = True
861                    new_list.append(masked_item)
862                else:
863                    new_list.append(item)
864
865            if list_modified:
866                if masked is None:
867                    masked = dict(data)
868                masked[key] = new_list
869
870    return masked if masked is not None else dict(data)

Sensitive data masking with lazy copying and optimized recursion.

Uses lazy copying - only creates new dict when needed. Includes recursion depth protection.

Parameters
  • data: Data dictionary to mask
  • sensitive_keys: Set of sensitive key names (case-insensitive matching)
  • _depth: Current recursion depth (internal)
Returns

Masked data dictionary (may be same object if no sensitive data found)

def quick_setup() -> None:
266def quick_setup() -> None:
267    """Create configuration template file for quick setup.
268
269    Creates `.env.example` file with all available configuration options.
270    """
271    create_env_template()
272    print("✅ Created .env.example")
273    print("📝 Edit the file with your server details")
274    print("🚀 Then use: AsyncOutlineClient.from_env()")

Create configuration template file for quick setup.

Creates .env.example file with all available configuration options.

def set_audit_logger(logger_instance: AuditLogger) -> None:
767def set_audit_logger(logger_instance: AuditLogger) -> None:
768    """Set audit logger for current async context.
769
770    Thread-safe and async-safe using contextvars.
771    Preferred over global state for high-load applications.
772
773    :param logger_instance: Audit logger instance
774    """
775    _audit_logger_context.set(logger_instance)

Set audit logger for current async context.

Thread-safe and async-safe using contextvars. Preferred over global state for high-load applications.

Parameters
  • logger_instance: Audit logger instance