pyoutlineapi
PyOutlineAPI: A modern, async-first Python client for the Outline VPN Server API.
Copyright (c) 2025 Denis Rozhnovskiy pytelemonbot@mail.ru All rights reserved.
This software is licensed under the MIT License.
You can find the full license text at:
Source code repository:
Quick Start:
from pyoutlineapi import AsyncOutlineClient
# From environment variables
async with AsyncOutlineClient.from_env() as client:
server = await client.get_server_info()
print(f"Server: {server.name}")
# Prefer from_env for production usage
async with AsyncOutlineClient.from_env() as client:
keys = await client.get_access_keys()
Advanced Usage - Type Hints:
from pyoutlineapi import (
AsyncOutlineClient,
AuditLogger,
AuditDetails,
MetricsCollector,
MetricsTags,
)
class CustomAuditLogger:
def log_action(
self,
action: str,
resource: str,
*,
user: str | None = None,
details: AuditDetails | None = None,
correlation_id: str | None = None,
) -> None:
print(f"[AUDIT] {action} on {resource}")
async with AsyncOutlineClient.from_env(
audit_logger=CustomAuditLogger(),
) as client:
await client.create_access_key(name="test")
1"""PyOutlineAPI: A modern, async-first Python client for the Outline VPN Server API. 2 3Copyright (c) 2025 Denis Rozhnovskiy <pytelemonbot@mail.ru> 4All rights reserved. 5 6This software is licensed under the MIT License. 7You can find the full license text at: 8 https://opensource.org/licenses/MIT 9 10Source code repository: 11 https://github.com/orenlab/pyoutlineapi 12 13Quick Start: 14 15```python 16from pyoutlineapi import AsyncOutlineClient 17 18# From environment variables 19async with AsyncOutlineClient.from_env() as client: 20 server = await client.get_server_info() 21 print(f"Server: {server.name}") 22 23# Prefer from_env for production usage 24async with AsyncOutlineClient.from_env() as client: 25 keys = await client.get_access_keys() 26``` 27 28Advanced Usage - Type Hints: 29 30```python 31from pyoutlineapi import ( 32 AsyncOutlineClient, 33 AuditLogger, 34 AuditDetails, 35 MetricsCollector, 36 MetricsTags, 37) 38 39class CustomAuditLogger: 40 def log_action( 41 self, 42 action: str, 43 resource: str, 44 *, 45 user: str | None = None, 46 details: AuditDetails | None = None, 47 correlation_id: str | None = None, 48 ) -> None: 49 print(f"[AUDIT] {action} on {resource}") 50 51async with AsyncOutlineClient.from_env( 52 audit_logger=CustomAuditLogger(), 53) as client: 54 await client.create_access_key(name="test") 55``` 56""" 57 58from __future__ import annotations 59 60from importlib import metadata 61from typing import TYPE_CHECKING, Final, NoReturn 62 63# Core imports 64from .audit import ( 65 AuditContext, 66 AuditLogger, 67 DefaultAuditLogger, 68 NoOpAuditLogger, 69 audited, 70 get_audit_logger, 71 get_or_create_audit_logger, 72 set_audit_logger, 73) 74from .base_client import MetricsCollector, NoOpMetrics, correlation_id 75from .circuit_breaker import CircuitConfig, CircuitMetrics, CircuitState 76from .client import ( 77 AsyncOutlineClient, 78 MultiServerManager, 79 create_client, 80 create_multi_server_manager, 81) 82from .common_types import ( 83 DEFAULT_SENSITIVE_KEYS, 84 AuditDetails, 85 ConfigOverrides, 86 Constants, 87 CredentialSanitizer, 88 JsonPayload, 89 MetricsTags, 90 QueryParams, 91 ResponseData, 92 SecureIDGenerator, 93 TimestampMs, 94 TimestampSec, 95 Validators, 96 build_config_overrides, 97 is_json_serializable, 98 is_valid_bytes, 99 is_valid_port, 100 mask_sensitive_data, 101) 102from .config import ( 103 DevelopmentConfig, 104 OutlineClientConfig, 105 ProductionConfig, 106 create_env_template, 107 load_config, 108) 109from .exceptions import ( 110 APIError, 111 CircuitOpenError, 112 ConfigurationError, 113 OutlineConnectionError, 114 OutlineError, 115 OutlineTimeoutError, 116 ValidationError, 117 format_error_chain, 118 get_retry_delay, 119 get_safe_error_dict, 120 is_retryable, 121) 122from .models import ( 123 AccessKey, 124 AccessKeyCreateRequest, 125 AccessKeyList, 126 AccessKeyMetric, 127 AccessKeyNameRequest, 128 BandwidthData, 129 BandwidthDataValue, 130 BandwidthInfo, 131 ConnectionInfo, 132 DataLimit, 133 DataLimitRequest, 134 DataTransferred, 135 ErrorResponse, 136 ExperimentalMetrics, 137 HealthCheckResult, 138 HostnameRequest, 139 LocationMetric, 140 MetricsEnabledRequest, 141 MetricsStatusResponse, 142 PeakDeviceCount, 143 PortRequest, 144 Server, 145 ServerExperimentalMetric, 146 ServerMetrics, 147 ServerNameRequest, 148 ServerSummary, 149 TunnelTime, 150) 151from .response_parser import JsonDict, ResponseParser 152 153# Package metadata 154try: 155 __version__: str = metadata.version("pyoutlineapi") 156except metadata.PackageNotFoundError: 157 __version__ = "0.4.0-dev" 158 159__author__: Final[str] = "Denis Rozhnovskiy" 160__email__: Final[str] = "pytelemonbot@mail.ru" 161__license__: Final[str] = "MIT" 162 163# Public API 164__all__: Final[list[str]] = [ 165 "DEFAULT_SENSITIVE_KEYS", 166 "APIError", 167 "AccessKey", 168 "AccessKeyCreateRequest", 169 "AccessKeyList", 170 "AccessKeyMetric", 171 "AccessKeyNameRequest", 172 "AsyncOutlineClient", 173 "AuditContext", 174 "AuditLogger", 175 "BandwidthData", 176 "BandwidthDataValue", 177 "BandwidthInfo", 178 "CircuitConfig", 179 "CircuitMetrics", 180 "CircuitOpenError", 181 "CircuitState", 182 "ConfigOverrides", 183 "ConfigurationError", 184 "Constants", 185 "CredentialSanitizer", 186 "DataLimit", 187 "DataLimitRequest", 188 "DataTransferred", 189 "DefaultAuditLogger", 190 "DevelopmentConfig", 191 "ErrorResponse", 192 "ExperimentalMetrics", 193 "HealthCheckResult", 194 "HostnameRequest", 195 "JsonDict", 196 "JsonPayload", 197 "LocationMetric", 198 "MetricsCollector", 199 "MetricsEnabledRequest", 200 "MetricsStatusResponse", 201 "MetricsTags", 202 "MultiServerManager", 203 "NoOpAuditLogger", 204 "NoOpMetrics", 205 "OutlineClientConfig", 206 "OutlineConnectionError", 207 "OutlineError", 208 "OutlineTimeoutError", 209 "PeakDeviceCount", 210 "PortRequest", 211 "ProductionConfig", 212 "QueryParams", 213 "ResponseData", 214 "ResponseParser", 215 "SecureIDGenerator", 216 "Server", 217 "ServerExperimentalMetric", 218 "ServerMetrics", 219 "ServerNameRequest", 220 "ServerSummary", 221 "TimestampMs", 222 "TimestampSec", 223 "TunnelTime", 224 "ValidationError", 225 "Validators", 226 "__author__", 227 "__email__", 228 "__license__", 229 "__version__", 230 "audited", 231 "build_config_overrides", 232 "correlation_id", 233 "create_client", 234 "create_env_template", 235 "create_multi_server_manager", 236 "format_error_chain", 237 "get_audit_logger", 238 "get_or_create_audit_logger", 239 "get_retry_delay", 240 "get_safe_error_dict", 241 "get_version", 242 "is_json_serializable", 243 "is_retryable", 244 "is_valid_bytes", 245 "is_valid_port", 246 "load_config", 247 "mask_sensitive_data", 248 "print_type_info", 249 "quick_setup", 250 "set_audit_logger", 251] 252 253 254# ===== Convenience Functions ===== 255 256 257def get_version() -> str: 258 """Get package version string. 259 260 :return: Package version 261 """ 262 return __version__ 263 264 265def quick_setup() -> None: 266 """Create configuration template file for quick setup. 267 268 Creates `.env.example` file with all available configuration options. 269 """ 270 create_env_template() 271 print("✅ Created .env.example") 272 print("📝 Edit the file with your server details") 273 print("🚀 Then use: AsyncOutlineClient.from_env()") 274 275 276def print_type_info() -> None: 277 """Print information about available type aliases for advanced usage.""" 278 info = """ 279🎯 PyOutlineAPI Type Aliases for Advanced Usage 280=============================================== 281 282For creating custom AuditLogger: 283 from pyoutlineapi import AuditLogger, AuditDetails 284 285 class MyAuditLogger: 286 def log_action( 287 self, 288 action: str, 289 resource: str, 290 *, 291 details: AuditDetails | None = None, 292 ... 293 ) -> None: ... 294 295 async def alog_action( 296 self, 297 action: str, 298 resource: str, 299 *, 300 details: AuditDetails | None = None, 301 ... 302 ) -> None: ... 303 304For creating custom MetricsCollector: 305 from pyoutlineapi import MetricsCollector, MetricsTags 306 307 class MyMetrics: 308 def increment( 309 self, 310 metric: str, 311 *, 312 tags: MetricsTags | None = None 313 ) -> None: ... 314 315Available Type Aliases: 316 - TimestampMs, TimestampSec # Unix timestamps 317 - JsonPayload, ResponseData # JSON data types 318 - QueryParams # URL query parameters 319 - AuditDetails # Audit log details 320 - MetricsTags # Metrics tags 321 322Constants and Validators: 323 from pyoutlineapi import Constants, Validators 324 325 # Access constants 326 Constants.RETRY_STATUS_CODES 327 Constants.MIN_PORT, Constants.MAX_PORT 328 329 # Use validators 330 Validators.validate_port(8080) 331 Validators.validate_key_id("my-key") 332 333Utility Classes: 334 from pyoutlineapi import ( 335 CredentialSanitizer, 336 SecureIDGenerator, 337 ResponseParser, 338 ) 339 340 # Sanitize sensitive data 341 safe_url = CredentialSanitizer.sanitize(url) 342 343 # Generate secure IDs 344 secure_id = SecureIDGenerator.generate() 345 346 # Parse API responses 347 parsed = ResponseParser.parse(data, Model) 348 349📖 Documentation: https://github.com/orenlab/pyoutlineapi 350 """ 351 print(info) 352 353 354# ===== Better Error Messages ===== 355 356 357def __getattr__(name: str) -> NoReturn: 358 """Provide helpful error messages for common mistakes. 359 360 :param name: Attribute name 361 :raises AttributeError: If attribute not found 362 """ 363 mistakes = { 364 "OutlineClient": "Use 'AsyncOutlineClient' instead", 365 "OutlineSettings": "Use 'OutlineClientConfig' instead", 366 "create_resilient_client": ( 367 "Use 'AsyncOutlineClient.from_env()' with 'enable_circuit_breaker=True'" 368 ), 369 } 370 371 if name in mistakes: 372 raise AttributeError(f"{name} not available. {mistakes[name]}") 373 374 raise AttributeError(f"module '{__name__}' has no attribute '{name}'") 375 376 377# ===== Interactive Help ===== 378 379if TYPE_CHECKING: 380 import sys 381 382 if hasattr(sys, "ps1"): 383 # Show help in interactive mode 384 print(f"🚀 PyOutlineAPI v{__version__}") 385 print("💡 Quick start: pyoutlineapi.quick_setup()") 386 print("🎯 Type hints: pyoutlineapi.print_type_info()") 387 print("📚 Help: help(pyoutlineapi.AsyncOutlineClient)")
171class APIError(OutlineError): 172 """HTTP API request failure. 173 174 Automatically determines retry eligibility based on HTTP status code. 175 176 Attributes: 177 status_code: HTTP status code (if available) 178 endpoint: API endpoint that failed 179 response_data: Raw response data (may contain sensitive info) 180 181 Example: 182 >>> error = APIError("Not found", status_code=404, endpoint="/server") 183 >>> error.is_client_error # True 184 >>> error.is_retryable # False 185 """ 186 187 __slots__ = ("endpoint", "response_data", "status_code") 188 189 def __init__( 190 self, 191 message: str, 192 *, 193 status_code: int | None = None, 194 endpoint: str | None = None, 195 response_data: dict[str, Any] | None = None, 196 ) -> None: 197 """Initialize API error with sanitized endpoint. 198 199 Args: 200 message: Error message 201 status_code: HTTP status code 202 endpoint: API endpoint (will be sanitized) 203 response_data: Response data (may contain sensitive info) 204 """ 205 from .common_types import Validators 206 207 # Sanitize endpoint for safe logging 208 safe_endpoint = ( 209 Validators.sanitize_endpoint_for_logging(endpoint) if endpoint else None 210 ) 211 212 # Build safe details (optimization: avoid dict creation if all None) 213 safe_details: dict[str, Any] | None = None 214 if status_code is not None or safe_endpoint is not None: 215 safe_details = {} 216 if status_code is not None: 217 safe_details["status_code"] = status_code 218 if safe_endpoint is not None: 219 safe_details["endpoint"] = safe_endpoint 220 221 # Build internal details (optimization: avoid dict creation if all None) 222 details: dict[str, Any] | None = None 223 if status_code is not None or endpoint is not None: 224 details = {} 225 if status_code is not None: 226 details["status_code"] = status_code 227 if endpoint is not None: 228 details["endpoint"] = endpoint 229 230 super().__init__(message, details=details, safe_details=safe_details) 231 232 # Store attributes directly (faster access than dict lookups) 233 self.status_code = status_code 234 self.endpoint = endpoint 235 self.response_data = response_data 236 237 @property 238 def is_retryable(self) -> bool: 239 """Check if error is retryable based on status code.""" 240 return ( 241 self.status_code in Constants.RETRY_STATUS_CODES 242 if self.status_code 243 else False 244 ) 245 246 @property 247 def is_client_error(self) -> bool: 248 """Check if error is a client error (4xx status). 249 250 Returns: 251 True if status code is 400-499 252 """ 253 return self.status_code is not None and 400 <= self.status_code < 500 254 255 @property 256 def is_server_error(self) -> bool: 257 """Check if error is a server error (5xx status). 258 259 Returns: 260 True if status code is 500-599 261 """ 262 return self.status_code is not None and 500 <= self.status_code < 600 263 264 @property 265 def is_rate_limit_error(self) -> bool: 266 """Check if error is a rate limit error (429 status). 267 268 Returns: 269 True if status code is 429 270 """ 271 return self.status_code == 429
HTTP API request failure.
Automatically determines retry eligibility based on HTTP status code.
Attributes:
- status_code: HTTP status code (if available)
- endpoint: API endpoint that failed
- response_data: Raw response data (may contain sensitive info)
Example:
>>> error = APIError("Not found", status_code=404, endpoint="/server") >>> error.is_client_error # True >>> error.is_retryable # False
189 def __init__( 190 self, 191 message: str, 192 *, 193 status_code: int | None = None, 194 endpoint: str | None = None, 195 response_data: dict[str, Any] | None = None, 196 ) -> None: 197 """Initialize API error with sanitized endpoint. 198 199 Args: 200 message: Error message 201 status_code: HTTP status code 202 endpoint: API endpoint (will be sanitized) 203 response_data: Response data (may contain sensitive info) 204 """ 205 from .common_types import Validators 206 207 # Sanitize endpoint for safe logging 208 safe_endpoint = ( 209 Validators.sanitize_endpoint_for_logging(endpoint) if endpoint else None 210 ) 211 212 # Build safe details (optimization: avoid dict creation if all None) 213 safe_details: dict[str, Any] | None = None 214 if status_code is not None or safe_endpoint is not None: 215 safe_details = {} 216 if status_code is not None: 217 safe_details["status_code"] = status_code 218 if safe_endpoint is not None: 219 safe_details["endpoint"] = safe_endpoint 220 221 # Build internal details (optimization: avoid dict creation if all None) 222 details: dict[str, Any] | None = None 223 if status_code is not None or endpoint is not None: 224 details = {} 225 if status_code is not None: 226 details["status_code"] = status_code 227 if endpoint is not None: 228 details["endpoint"] = endpoint 229 230 super().__init__(message, details=details, safe_details=safe_details) 231 232 # Store attributes directly (faster access than dict lookups) 233 self.status_code = status_code 234 self.endpoint = endpoint 235 self.response_data = response_data
Initialize API error with sanitized endpoint.
Arguments:
- message: Error message
- status_code: HTTP status code
- endpoint: API endpoint (will be sanitized)
- response_data: Response data (may contain sensitive info)
237 @property 238 def is_retryable(self) -> bool: 239 """Check if error is retryable based on status code.""" 240 return ( 241 self.status_code in Constants.RETRY_STATUS_CODES 242 if self.status_code 243 else False 244 )
Check if error is retryable based on status code.
246 @property 247 def is_client_error(self) -> bool: 248 """Check if error is a client error (4xx status). 249 250 Returns: 251 True if status code is 400-499 252 """ 253 return self.status_code is not None and 400 <= self.status_code < 500
Check if error is a client error (4xx status).
Returns:
True if status code is 400-499
255 @property 256 def is_server_error(self) -> bool: 257 """Check if error is a server error (5xx status). 258 259 Returns: 260 True if status code is 500-599 261 """ 262 return self.status_code is not None and 500 <= self.status_code < 600
Check if error is a server error (5xx status).
Returns:
True if status code is 500-599
264 @property 265 def is_rate_limit_error(self) -> bool: 266 """Check if error is a rate limit error (429 status). 267 268 Returns: 269 True if status code is 429 270 """ 271 return self.status_code == 429
Check if error is a rate limit error (429 status).
Returns:
True if status code is 429
137class AccessKey(BaseValidatedModel): 138 """Access key model matching API schema with optimized properties. 139 140 SCHEMA: Based on OpenAPI /access-keys endpoint 141 """ 142 143 id: str 144 name: str | None = None 145 password: str 146 port: Port 147 method: str 148 access_url: str = Field(alias="accessUrl") 149 data_limit: DataLimit | None = Field(None, alias="dataLimit") 150 151 @field_validator("name", mode="before") 152 @classmethod 153 def validate_name(cls, v: str | None) -> str | None: 154 """Validate and normalize name from API.""" 155 if v is None: 156 return None 157 158 if isinstance(v, str): 159 stripped = v.strip() 160 if not stripped: 161 return None 162 163 if len(stripped) > Constants.MAX_NAME_LENGTH: 164 raise ValueError( 165 f"Name too long: {len(stripped)} (max {Constants.MAX_NAME_LENGTH})" 166 ) 167 return stripped 168 169 @field_validator("id") 170 @classmethod 171 def validate_id(cls, v: str) -> str: 172 """Validate key ID. 173 174 :param v: Key ID 175 :return: Validated key ID 176 :raises ValueError: If ID is invalid 177 """ 178 return Validators.validate_key_id(v) 179 180 @property 181 def has_data_limit(self) -> bool: 182 """Check if key has data limit (optimized None check). 183 184 :return: True if data limit exists 185 """ 186 return self.data_limit is not None 187 188 @property 189 def display_name(self) -> str: 190 """Get display name with optimized conditional. 191 192 :return: Display name 193 """ 194 return self.name if self.name else f"Key-{self.id}"
Access key model matching API schema with optimized properties.
SCHEMA: Based on OpenAPI /access-keys endpoint
Port number (1-65535)
151 @field_validator("name", mode="before") 152 @classmethod 153 def validate_name(cls, v: str | None) -> str | None: 154 """Validate and normalize name from API.""" 155 if v is None: 156 return None 157 158 if isinstance(v, str): 159 stripped = v.strip() 160 if not stripped: 161 return None 162 163 if len(stripped) > Constants.MAX_NAME_LENGTH: 164 raise ValueError( 165 f"Name too long: {len(stripped)} (max {Constants.MAX_NAME_LENGTH})" 166 ) 167 return stripped
Validate and normalize name from API.
169 @field_validator("id") 170 @classmethod 171 def validate_id(cls, v: str) -> str: 172 """Validate key ID. 173 174 :param v: Key ID 175 :return: Validated key ID 176 :raises ValueError: If ID is invalid 177 """ 178 return Validators.validate_key_id(v)
Validate key ID.
Parameters
- v: Key ID
Returns
Validated key ID
Raises
- ValueError: If ID is invalid
180 @property 181 def has_data_limit(self) -> bool: 182 """Check if key has data limit (optimized None check). 183 184 :return: True if data limit exists 185 """ 186 return self.data_limit is not None
Check if key has data limit (optimized None check).
Returns
True if data limit exists
491class AccessKeyCreateRequest(BaseValidatedModel): 492 """Request model for creating access keys. 493 494 SCHEMA: Based on POST /access-keys request body 495 """ 496 497 name: str | None = Field(default=None, min_length=1, max_length=255) 498 method: str | None = None 499 password: str | None = None 500 port: Port | None = None 501 limit: DataLimit | None = None
Request model for creating access keys.
SCHEMA: Based on POST /access-keys request body
197class AccessKeyList(BaseValidatedModel): 198 """List of access keys with optimized utility methods. 199 200 SCHEMA: Based on GET /access-keys response 201 """ 202 203 access_keys: list[AccessKey] = Field(alias="accessKeys") 204 205 @cached_property 206 def count(self) -> int: 207 """Get number of access keys (cached). 208 209 NOTE: Cached because list is immutable after creation 210 211 :return: Key count 212 """ 213 return len(self.access_keys) 214 215 @property 216 def is_empty(self) -> bool: 217 """Check if list is empty (uses cached count). 218 219 :return: True if no keys 220 """ 221 return self.count == 0 222 223 def get_by_id(self, key_id: str) -> AccessKey | None: 224 """Get key by ID with early return optimization. 225 226 :param key_id: Access key ID 227 :return: Access key or None if not found 228 """ 229 for key in self.access_keys: 230 if key.id == key_id: 231 return key 232 return None 233 234 def get_by_name(self, name: str) -> list[AccessKey]: 235 """Get keys by name with optimized list comprehension. 236 237 :param name: Key name 238 :return: List of matching keys (may be multiple) 239 """ 240 return [key for key in self.access_keys if key.name == name] 241 242 def filter_with_limits(self) -> list[AccessKey]: 243 """Get keys with data limits (optimized comprehension). 244 245 :return: List of keys with limits 246 """ 247 return [key for key in self.access_keys if key.has_data_limit] 248 249 def filter_without_limits(self) -> list[AccessKey]: 250 """Get keys without data limits (optimized comprehension). 251 252 :return: List of keys without limits 253 """ 254 return [key for key in self.access_keys if not key.has_data_limit]
List of access keys with optimized utility methods.
SCHEMA: Based on GET /access-keys response
205 @cached_property 206 def count(self) -> int: 207 """Get number of access keys (cached). 208 209 NOTE: Cached because list is immutable after creation 210 211 :return: Key count 212 """ 213 return len(self.access_keys)
Get number of access keys (cached).
NOTE: Cached because list is immutable after creation
Returns
Key count
215 @property 216 def is_empty(self) -> bool: 217 """Check if list is empty (uses cached count). 218 219 :return: True if no keys 220 """ 221 return self.count == 0
Check if list is empty (uses cached count).
Returns
True if no keys
223 def get_by_id(self, key_id: str) -> AccessKey | None: 224 """Get key by ID with early return optimization. 225 226 :param key_id: Access key ID 227 :return: Access key or None if not found 228 """ 229 for key in self.access_keys: 230 if key.id == key_id: 231 return key 232 return None
Get key by ID with early return optimization.
Parameters
- key_id: Access key ID
Returns
Access key or None if not found
234 def get_by_name(self, name: str) -> list[AccessKey]: 235 """Get keys by name with optimized list comprehension. 236 237 :param name: Key name 238 :return: List of matching keys (may be multiple) 239 """ 240 return [key for key in self.access_keys if key.name == name]
Get keys by name with optimized list comprehension.
Parameters
- name: Key name
Returns
List of matching keys (may be multiple)
242 def filter_with_limits(self) -> list[AccessKey]: 243 """Get keys with data limits (optimized comprehension). 244 245 :return: List of keys with limits 246 """ 247 return [key for key in self.access_keys if key.has_data_limit]
Get keys with data limits (optimized comprehension).
Returns
List of keys with limits
249 def filter_without_limits(self) -> list[AccessKey]: 250 """Get keys without data limits (optimized comprehension). 251 252 :return: List of keys without limits 253 """ 254 return [key for key in self.access_keys if not key.has_data_limit]
Get keys without data limits (optimized comprehension).
Returns
List of keys without limits
443class AccessKeyMetric(BaseValidatedModel): 444 """Per-key experimental metrics. 445 446 SCHEMA: Based on experimental metrics accessKeys array item 447 """ 448 449 access_key_id: str = Field(alias="accessKeyId") 450 tunnel_time: TunnelTime = Field(alias="tunnelTime") 451 data_transferred: DataTransferred = Field(alias="dataTransferred") 452 connection: ConnectionInfo
Per-key experimental metrics.
SCHEMA: Based on experimental metrics accessKeys array item
531class AccessKeyNameRequest(BaseValidatedModel): 532 """Request model for renaming access key. 533 534 SCHEMA: Based on PUT /access-keys/{id}/name request body 535 """ 536 537 name: str = Field(min_length=1, max_length=255)
Request model for renaming access key.
SCHEMA: Based on PUT /access-keys/{id}/name request body
46class AsyncOutlineClient( 47 BaseHTTPClient, 48 ServerMixin, 49 AccessKeyMixin, 50 DataLimitMixin, 51 MetricsMixin, 52): 53 """High-performance async client for Outline VPN Server API.""" 54 55 __slots__ = ( 56 "_audit_logger_instance", 57 "_config", 58 "_default_json_format", 59 ) 60 61 def __init__( 62 self, 63 config: OutlineClientConfig | None = None, 64 *, 65 api_url: str | None = None, 66 cert_sha256: str | None = None, 67 audit_logger: AuditLogger | None = None, 68 metrics: MetricsCollector | None = None, 69 **overrides: Unpack[ConfigOverrides], 70 ) -> None: 71 """Initialize Outline client with modern configuration approach. 72 73 Uses structural pattern matching for configuration resolution. 74 75 :param config: Client configuration object 76 :param api_url: API URL (alternative to config) 77 :param cert_sha256: Certificate fingerprint (alternative to config) 78 :param audit_logger: Custom audit logger 79 :param metrics: Custom metrics collector 80 :param overrides: Configuration overrides (timeout, retry_attempts, etc.) 81 :raises ConfigurationError: If configuration is invalid 82 83 Example: 84 >>> async with AsyncOutlineClient.from_env() as client: 85 ... info = await client.get_server_info() 86 """ 87 # Build config_kwargs using utility function (DRY) 88 config_kwargs = build_config_overrides(**overrides) 89 90 # Validate configuration using pattern matching 91 resolved_config = self._resolve_configuration( 92 config, api_url, cert_sha256, config_kwargs 93 ) 94 95 self._config = resolved_config 96 self._audit_logger_instance = audit_logger 97 self._default_json_format = resolved_config.json_format 98 99 # Initialize base HTTP client 100 super().__init__( 101 api_url=resolved_config.api_url, 102 cert_sha256=resolved_config.cert_sha256, 103 timeout=resolved_config.timeout, 104 retry_attempts=resolved_config.retry_attempts, 105 max_connections=resolved_config.max_connections, 106 user_agent=resolved_config.user_agent, 107 enable_logging=resolved_config.enable_logging, 108 circuit_config=resolved_config.circuit_config, 109 rate_limit=resolved_config.rate_limit, 110 allow_private_networks=resolved_config.allow_private_networks, 111 resolve_dns_for_ssrf=resolved_config.resolve_dns_for_ssrf, 112 audit_logger=audit_logger, 113 metrics=metrics, 114 ) 115 116 # Cache instance for weak reference tracking (automatic cleanup) 117 _client_cache[id(self)] = self 118 119 if resolved_config.enable_logging and logger.isEnabledFor(logging.INFO): 120 safe_url = Validators.sanitize_url_for_logging(self.api_url) 121 logger.info("Client initialized for %s", safe_url) 122 123 @staticmethod 124 def _resolve_configuration( 125 config: OutlineClientConfig | None, 126 api_url: str | None, 127 cert_sha256: str | None, 128 kwargs: dict[str, Any], 129 ) -> OutlineClientConfig: 130 """Resolve and validate configuration using pattern matching. 131 132 :param config: Configuration object 133 :param api_url: Direct API URL 134 :param cert_sha256: Direct certificate 135 :param kwargs: Additional kwargs 136 :return: Resolved configuration 137 :raises ConfigurationError: If configuration is invalid 138 """ 139 match config, api_url, cert_sha256: 140 # Pattern 1: Direct parameters provided (most common case) 141 case None, str(url), str(cert) if url and cert: 142 return OutlineClientConfig.create_minimal(url, cert, **kwargs) 143 144 # Pattern 2: Config object provided 145 case OutlineClientConfig() as cfg, None, None: 146 return cfg 147 148 # Pattern 3: Missing required parameters 149 case None, None, _: 150 raise ConfigurationError( 151 "Missing required 'api_url'", 152 field="api_url", 153 security_issue=False, 154 ) 155 case None, _, None: 156 raise ConfigurationError( 157 "Missing required 'cert_sha256'", 158 field="cert_sha256", 159 security_issue=True, 160 ) 161 162 # Pattern 4: Conflicting parameters 163 case OutlineClientConfig(), str() | None, str() | None: 164 raise ConfigurationError( 165 "Cannot specify both 'config' and direct parameters" 166 ) 167 168 # Pattern 5: Invalid combination (catch-all) 169 case _: 170 raise ConfigurationError("Invalid parameter combination") 171 172 @property 173 def config(self) -> OutlineClientConfig: 174 """Get immutable copy of configuration. 175 176 :return: Deep copy of configuration 177 """ 178 return self._config.model_copy_immutable() 179 180 @property 181 def get_sanitized_config(self) -> dict[str, Any]: 182 """Delegate to config's sanitized representation. 183 184 See: OutlineClientConfig.get_sanitized_config(). 185 186 :return: Sanitized configuration from underlying config object 187 """ 188 return self._config.get_sanitized_config 189 190 @property 191 def json_format(self) -> bool: 192 """Get JSON format preference. 193 194 :return: True if raw JSON format is preferred 195 """ 196 return self._default_json_format 197 198 # ===== Factory Methods ===== 199 200 @classmethod 201 @asynccontextmanager 202 async def create( 203 cls, 204 api_url: str | None = None, 205 cert_sha256: str | None = None, 206 *, 207 config: OutlineClientConfig | None = None, 208 audit_logger: AuditLogger | None = None, 209 metrics: MetricsCollector | None = None, 210 **overrides: Unpack[ConfigOverrides], 211 ) -> AsyncGenerator[AsyncOutlineClient, None]: 212 """Create and initialize client as async context manager. 213 214 Automatically handles initialization and cleanup. 215 Recommended way to create clients in async contexts. 216 217 :param api_url: API URL 218 :param cert_sha256: Certificate fingerprint 219 :param config: Configuration object 220 :param audit_logger: Custom audit logger 221 :param metrics: Custom metrics collector 222 :param overrides: Configuration overrides (timeout, retry_attempts, etc.) 223 :yield: Initialized client instance 224 :raises ConfigurationError: If configuration is invalid 225 226 Example: 227 >>> async with AsyncOutlineClient.from_env() as client: 228 ... keys = await client.get_access_keys() 229 """ 230 if config is not None: 231 client = cls(config=config, audit_logger=audit_logger, metrics=metrics) 232 else: 233 client = cls( 234 api_url=api_url, 235 cert_sha256=cert_sha256, 236 audit_logger=audit_logger, 237 metrics=metrics, 238 **overrides, 239 ) 240 241 async with client: 242 yield client 243 244 @classmethod 245 def from_env( 246 cls, 247 *, 248 env_file: str | Path | None = None, 249 audit_logger: AuditLogger | None = None, 250 metrics: MetricsCollector | None = None, 251 **overrides: Unpack[ConfigOverrides], 252 ) -> AsyncOutlineClient: 253 """Create client from environment variables. 254 255 Reads configuration from environment or .env file. 256 Modern approach using **overrides for runtime configuration. 257 258 :param env_file: Path to environment file (.env) 259 :param audit_logger: Custom audit logger 260 :param metrics: Custom metrics collector 261 :param overrides: Configuration overrides (timeout, enable_logging, etc.) 262 :return: Configured client instance 263 :raises ConfigurationError: If environment configuration is invalid 264 265 Example: 266 >>> async with AsyncOutlineClient.from_env( 267 ... env_file=".env.production", 268 ... timeout=20, 269 ... ) as client: 270 ... info = await client.get_server_info() 271 """ 272 config = OutlineClientConfig.from_env(env_file=env_file, **overrides) 273 return cls(config=config, audit_logger=audit_logger, metrics=metrics) 274 275 # ===== Context Manager Methods ===== 276 277 async def __aexit__( 278 self, 279 exc_type: type[BaseException] | None, 280 exc_val: BaseException | None, 281 exc_tb: object | None, 282 ) -> None: 283 """Async context manager exit with comprehensive cleanup. 284 285 Ensures graceful shutdown even on exceptions. Uses ordered cleanup 286 sequence for proper resource deallocation. 287 288 Cleanup order: 289 1. Audit logger shutdown (drain queue) 290 2. HTTP client shutdown (close connections) 291 3. Emergency cleanup if steps 1-2 failed 292 293 :param exc_type: Exception type if error occurred 294 :param exc_val: Exception instance if error occurred 295 :param exc_tb: Exception traceback 296 :return: False to propagate exceptions 297 """ 298 cleanup_errors: list[str] = [] 299 300 # Step 1: Graceful audit logger shutdown 301 if self._audit_logger_instance is not None: 302 try: 303 if hasattr(self._audit_logger_instance, "shutdown"): 304 shutdown_method = self._audit_logger_instance.shutdown 305 if asyncio.iscoroutinefunction(shutdown_method): 306 await shutdown_method() 307 except Exception as e: 308 error_msg = f"Audit logger shutdown error: {e}" 309 cleanup_errors.append(error_msg) 310 if logger.isEnabledFor(logging.WARNING): 311 logger.warning(error_msg) 312 313 # Step 2: Shutdown HTTP client 314 try: 315 await self.shutdown(timeout=30.0) 316 except Exception as e: 317 error_msg = f"HTTP client shutdown error: {e}" 318 cleanup_errors.append(error_msg) 319 if logger.isEnabledFor(logging.ERROR): 320 logger.error(error_msg) 321 322 # Step 3: Emergency cleanup if shutdown failed 323 if cleanup_errors and hasattr(self, "_session"): 324 try: 325 if self._session and not self._session.closed: 326 await self._session.close() 327 if logger.isEnabledFor(logging.DEBUG): 328 logger.debug("Emergency session cleanup completed") 329 except Exception as e: 330 if logger.isEnabledFor(logging.DEBUG): 331 logger.debug("Emergency cleanup error: %s", e) 332 333 # Log summary of cleanup issues 334 if cleanup_errors and logger.isEnabledFor(logging.WARNING): 335 logger.warning( 336 "Cleanup completed with %d error(s): %s", 337 len(cleanup_errors), 338 "; ".join(cleanup_errors), 339 ) 340 341 # Always propagate the original exception 342 return None 343 344 # ===== Utility Methods ===== 345 346 async def health_check(self) -> dict[str, Any]: 347 """Perform basic health check. 348 349 Non-intrusive check that tests server connectivity without 350 modifying any state. Returns comprehensive health metrics. 351 352 :return: Health check result dictionary with response time 353 354 Example result: 355 { 356 "timestamp": 1234567890.123, 357 "healthy": True, 358 "response_time_ms": 45.2, 359 "connected": True, 360 "circuit_state": "closed", 361 "active_requests": 2, 362 "rate_limit_available": 98 363 } 364 """ 365 import time 366 367 health_data: dict[str, Any] = { 368 "timestamp": time.time(), 369 "connected": self.is_connected, 370 "circuit_state": self.circuit_state, 371 "active_requests": self.active_requests, 372 "rate_limit_available": self.available_slots, 373 } 374 375 try: 376 start_time = time.monotonic() 377 await self.get_server_info() 378 duration = time.monotonic() - start_time 379 380 health_data["healthy"] = True 381 health_data["response_time_ms"] = round(duration * 1000, 2) 382 383 except Exception as e: 384 health_data["healthy"] = False 385 health_data["error"] = str(e) 386 health_data["error_type"] = type(e).__name__ 387 388 return health_data 389 390 async def get_server_summary(self) -> dict[str, Any]: 391 """Get comprehensive server overview. 392 393 Aggregates multiple API calls into a single summary. 394 Continues on partial failures to return maximum information. 395 Executes non-dependent calls concurrently for performance. 396 397 :return: Server summary dictionary with aggregated data 398 399 Example result: 400 { 401 "timestamp": 1234567890.123, 402 "healthy": True, 403 "server": {...}, 404 "access_keys_count": 10, 405 "metrics_enabled": True, 406 "transfer_metrics": {...}, 407 "client_status": {...}, 408 "errors": [] 409 } 410 """ 411 import time 412 413 summary: dict[str, Any] = { 414 "timestamp": time.time(), 415 "healthy": True, 416 "errors": [], 417 } 418 419 server_task = self.get_server_info(as_json=True) 420 keys_task = self.get_access_keys(as_json=True) 421 metrics_status_task = self.get_metrics_status(as_json=True) 422 423 server_result, keys_result, metrics_status_result = await asyncio.gather( 424 server_task, keys_task, metrics_status_task, return_exceptions=True 425 ) 426 427 # Process server info 428 if isinstance(server_result, Exception): 429 summary["healthy"] = False 430 summary["errors"].append(f"Server info error: {server_result}") 431 if logger.isEnabledFor(logging.DEBUG): 432 logger.debug("Failed to fetch server info: %s", server_result) 433 else: 434 summary["server"] = server_result 435 436 # Process access keys 437 if isinstance(keys_result, Exception): 438 summary["healthy"] = False 439 summary["errors"].append(f"Access keys error: {keys_result}") 440 if logger.isEnabledFor(logging.DEBUG): 441 logger.debug("Failed to fetch access keys: %s", keys_result) 442 elif isinstance(keys_result, dict): 443 keys_list = keys_result.get("accessKeys", []) 444 summary["access_keys_count"] = ( 445 len(keys_list) if isinstance(keys_list, list) else 0 446 ) 447 elif isinstance(keys_result, AccessKeyList): 448 summary["access_keys_count"] = len(keys_result.access_keys) 449 else: 450 summary["access_keys_count"] = 0 451 452 # Process metrics status 453 if isinstance(metrics_status_result, Exception): 454 summary["errors"].append(f"Metrics status error: {metrics_status_result}") 455 if logger.isEnabledFor(logging.DEBUG): 456 logger.debug( 457 "Failed to fetch metrics status: %s", metrics_status_result 458 ) 459 elif isinstance(metrics_status_result, dict): 460 metrics_enabled = bool(metrics_status_result.get("metricsEnabled", False)) 461 summary["metrics_enabled"] = metrics_enabled 462 463 # Fetch transfer metrics if enabled (dependent call - sequential) 464 if metrics_enabled: 465 try: 466 transfer = await self.get_transfer_metrics(as_json=True) 467 summary["transfer_metrics"] = transfer 468 except Exception as e: 469 summary["errors"].append(f"Transfer metrics error: {e}") 470 if logger.isEnabledFor(logging.DEBUG): 471 logger.debug("Failed to fetch transfer metrics: %s", e) 472 elif isinstance(metrics_status_result, MetricsStatusResponse): 473 summary["metrics_enabled"] = metrics_status_result.metrics_enabled 474 if metrics_status_result.metrics_enabled: 475 try: 476 transfer = await self.get_transfer_metrics(as_json=True) 477 summary["transfer_metrics"] = transfer 478 except Exception as e: 479 summary["errors"].append(f"Transfer metrics error: {e}") 480 if logger.isEnabledFor(logging.DEBUG): 481 logger.debug("Failed to fetch transfer metrics: %s", e) 482 else: 483 summary["metrics_enabled"] = False 484 485 # Add client status (synchronous, no API call) 486 summary["client_status"] = { 487 "connected": self.is_connected, 488 "circuit_state": self.circuit_state, 489 "active_requests": self.active_requests, 490 "rate_limit": { 491 "limit": self.rate_limit, 492 "available": self.available_slots, 493 }, 494 } 495 496 return summary 497 498 def get_status(self) -> dict[str, Any]: 499 """Get current client status (synchronous). 500 501 Returns immediate status without making API calls. 502 Useful for monitoring and debugging. 503 504 :return: Status dictionary with all client metrics 505 506 Example result: 507 { 508 "connected": True, 509 "circuit_state": "closed", 510 "active_requests": 2, 511 "rate_limit": { 512 "limit": 100, 513 "available": 98, 514 "active": 2 515 }, 516 "circuit_metrics": {...} 517 } 518 """ 519 return { 520 "connected": self.is_connected, 521 "circuit_state": self.circuit_state, 522 "active_requests": self.active_requests, 523 "rate_limit": { 524 "limit": self.rate_limit, 525 "available": self.available_slots, 526 "active": self.active_requests, 527 }, 528 "circuit_metrics": self.get_circuit_metrics(), 529 } 530 531 def __repr__(self) -> str: 532 """Safe string representation without secrets. 533 534 Does not expose any sensitive information (URLs, certificates, tokens). 535 536 :return: String representation 537 """ 538 status = "connected" if self.is_connected else "disconnected" 539 parts = [f"status={status}"] 540 541 if self.circuit_state: 542 parts.append(f"circuit={self.circuit_state}") 543 544 if self.active_requests: 545 parts.append(f"requests={self.active_requests}") 546 547 return f"AsyncOutlineClient({', '.join(parts)})"
High-performance async client for Outline VPN Server API.
61 def __init__( 62 self, 63 config: OutlineClientConfig | None = None, 64 *, 65 api_url: str | None = None, 66 cert_sha256: str | None = None, 67 audit_logger: AuditLogger | None = None, 68 metrics: MetricsCollector | None = None, 69 **overrides: Unpack[ConfigOverrides], 70 ) -> None: 71 """Initialize Outline client with modern configuration approach. 72 73 Uses structural pattern matching for configuration resolution. 74 75 :param config: Client configuration object 76 :param api_url: API URL (alternative to config) 77 :param cert_sha256: Certificate fingerprint (alternative to config) 78 :param audit_logger: Custom audit logger 79 :param metrics: Custom metrics collector 80 :param overrides: Configuration overrides (timeout, retry_attempts, etc.) 81 :raises ConfigurationError: If configuration is invalid 82 83 Example: 84 >>> async with AsyncOutlineClient.from_env() as client: 85 ... info = await client.get_server_info() 86 """ 87 # Build config_kwargs using utility function (DRY) 88 config_kwargs = build_config_overrides(**overrides) 89 90 # Validate configuration using pattern matching 91 resolved_config = self._resolve_configuration( 92 config, api_url, cert_sha256, config_kwargs 93 ) 94 95 self._config = resolved_config 96 self._audit_logger_instance = audit_logger 97 self._default_json_format = resolved_config.json_format 98 99 # Initialize base HTTP client 100 super().__init__( 101 api_url=resolved_config.api_url, 102 cert_sha256=resolved_config.cert_sha256, 103 timeout=resolved_config.timeout, 104 retry_attempts=resolved_config.retry_attempts, 105 max_connections=resolved_config.max_connections, 106 user_agent=resolved_config.user_agent, 107 enable_logging=resolved_config.enable_logging, 108 circuit_config=resolved_config.circuit_config, 109 rate_limit=resolved_config.rate_limit, 110 allow_private_networks=resolved_config.allow_private_networks, 111 resolve_dns_for_ssrf=resolved_config.resolve_dns_for_ssrf, 112 audit_logger=audit_logger, 113 metrics=metrics, 114 ) 115 116 # Cache instance for weak reference tracking (automatic cleanup) 117 _client_cache[id(self)] = self 118 119 if resolved_config.enable_logging and logger.isEnabledFor(logging.INFO): 120 safe_url = Validators.sanitize_url_for_logging(self.api_url) 121 logger.info("Client initialized for %s", safe_url)
Initialize Outline client with modern configuration approach.
Uses structural pattern matching for configuration resolution.
Parameters
- config: Client configuration object
- api_url: API URL (alternative to config)
- cert_sha256: Certificate fingerprint (alternative to config)
- audit_logger: Custom audit logger
- metrics: Custom metrics collector
- overrides: Configuration overrides (timeout, retry_attempts, etc.)
Raises
- ConfigurationError: If configuration is invalid
Example:
>>> async with AsyncOutlineClient.from_env() as client: ... info = await client.get_server_info()
172 @property 173 def config(self) -> OutlineClientConfig: 174 """Get immutable copy of configuration. 175 176 :return: Deep copy of configuration 177 """ 178 return self._config.model_copy_immutable()
Get immutable copy of configuration.
Returns
Deep copy of configuration
180 @property 181 def get_sanitized_config(self) -> dict[str, Any]: 182 """Delegate to config's sanitized representation. 183 184 See: OutlineClientConfig.get_sanitized_config(). 185 186 :return: Sanitized configuration from underlying config object 187 """ 188 return self._config.get_sanitized_config
Delegate to config's sanitized representation.
See: OutlineClientConfig.get_sanitized_config().
Returns
Sanitized configuration from underlying config object
190 @property 191 def json_format(self) -> bool: 192 """Get JSON format preference. 193 194 :return: True if raw JSON format is preferred 195 """ 196 return self._default_json_format
Get JSON format preference.
Returns
True if raw JSON format is preferred
200 @classmethod 201 @asynccontextmanager 202 async def create( 203 cls, 204 api_url: str | None = None, 205 cert_sha256: str | None = None, 206 *, 207 config: OutlineClientConfig | None = None, 208 audit_logger: AuditLogger | None = None, 209 metrics: MetricsCollector | None = None, 210 **overrides: Unpack[ConfigOverrides], 211 ) -> AsyncGenerator[AsyncOutlineClient, None]: 212 """Create and initialize client as async context manager. 213 214 Automatically handles initialization and cleanup. 215 Recommended way to create clients in async contexts. 216 217 :param api_url: API URL 218 :param cert_sha256: Certificate fingerprint 219 :param config: Configuration object 220 :param audit_logger: Custom audit logger 221 :param metrics: Custom metrics collector 222 :param overrides: Configuration overrides (timeout, retry_attempts, etc.) 223 :yield: Initialized client instance 224 :raises ConfigurationError: If configuration is invalid 225 226 Example: 227 >>> async with AsyncOutlineClient.from_env() as client: 228 ... keys = await client.get_access_keys() 229 """ 230 if config is not None: 231 client = cls(config=config, audit_logger=audit_logger, metrics=metrics) 232 else: 233 client = cls( 234 api_url=api_url, 235 cert_sha256=cert_sha256, 236 audit_logger=audit_logger, 237 metrics=metrics, 238 **overrides, 239 ) 240 241 async with client: 242 yield client
Create and initialize client as async context manager.
Automatically handles initialization and cleanup. Recommended way to create clients in async contexts.
Parameters
- api_url: API URL
- cert_sha256: Certificate fingerprint
- config: Configuration object
- audit_logger: Custom audit logger
- metrics: Custom metrics collector
- overrides: Configuration overrides (timeout, retry_attempts, etc.) :yield: Initialized client instance
Raises
- ConfigurationError: If configuration is invalid
Example:
>>> async with AsyncOutlineClient.from_env() as client: ... keys = await client.get_access_keys()
244 @classmethod 245 def from_env( 246 cls, 247 *, 248 env_file: str | Path | None = None, 249 audit_logger: AuditLogger | None = None, 250 metrics: MetricsCollector | None = None, 251 **overrides: Unpack[ConfigOverrides], 252 ) -> AsyncOutlineClient: 253 """Create client from environment variables. 254 255 Reads configuration from environment or .env file. 256 Modern approach using **overrides for runtime configuration. 257 258 :param env_file: Path to environment file (.env) 259 :param audit_logger: Custom audit logger 260 :param metrics: Custom metrics collector 261 :param overrides: Configuration overrides (timeout, enable_logging, etc.) 262 :return: Configured client instance 263 :raises ConfigurationError: If environment configuration is invalid 264 265 Example: 266 >>> async with AsyncOutlineClient.from_env( 267 ... env_file=".env.production", 268 ... timeout=20, 269 ... ) as client: 270 ... info = await client.get_server_info() 271 """ 272 config = OutlineClientConfig.from_env(env_file=env_file, **overrides) 273 return cls(config=config, audit_logger=audit_logger, metrics=metrics)
Create client from environment variables.
Reads configuration from environment or .env file. Modern approach using **overrides for runtime configuration.
Parameters
- env_file: Path to environment file (.env)
- audit_logger: Custom audit logger
- metrics: Custom metrics collector
- overrides: Configuration overrides (timeout, enable_logging, etc.)
Returns
Configured client instance
Raises
- ConfigurationError: If environment configuration is invalid
Example:
>>> async with AsyncOutlineClient.from_env( ... env_file=".env.production", ... timeout=20, ... ) as client: ... info = await client.get_server_info()
346 async def health_check(self) -> dict[str, Any]: 347 """Perform basic health check. 348 349 Non-intrusive check that tests server connectivity without 350 modifying any state. Returns comprehensive health metrics. 351 352 :return: Health check result dictionary with response time 353 354 Example result: 355 { 356 "timestamp": 1234567890.123, 357 "healthy": True, 358 "response_time_ms": 45.2, 359 "connected": True, 360 "circuit_state": "closed", 361 "active_requests": 2, 362 "rate_limit_available": 98 363 } 364 """ 365 import time 366 367 health_data: dict[str, Any] = { 368 "timestamp": time.time(), 369 "connected": self.is_connected, 370 "circuit_state": self.circuit_state, 371 "active_requests": self.active_requests, 372 "rate_limit_available": self.available_slots, 373 } 374 375 try: 376 start_time = time.monotonic() 377 await self.get_server_info() 378 duration = time.monotonic() - start_time 379 380 health_data["healthy"] = True 381 health_data["response_time_ms"] = round(duration * 1000, 2) 382 383 except Exception as e: 384 health_data["healthy"] = False 385 health_data["error"] = str(e) 386 health_data["error_type"] = type(e).__name__ 387 388 return health_data
Perform basic health check.
Non-intrusive check that tests server connectivity without modifying any state. Returns comprehensive health metrics.
Returns
Health check result dictionary with response time
Example result:
{ "timestamp": 1234567890.123, "healthy": True, "response_time_ms": 45.2, "connected": True, "circuit_state": "closed", "active_requests": 2, "rate_limit_available": 98 }
390 async def get_server_summary(self) -> dict[str, Any]: 391 """Get comprehensive server overview. 392 393 Aggregates multiple API calls into a single summary. 394 Continues on partial failures to return maximum information. 395 Executes non-dependent calls concurrently for performance. 396 397 :return: Server summary dictionary with aggregated data 398 399 Example result: 400 { 401 "timestamp": 1234567890.123, 402 "healthy": True, 403 "server": {...}, 404 "access_keys_count": 10, 405 "metrics_enabled": True, 406 "transfer_metrics": {...}, 407 "client_status": {...}, 408 "errors": [] 409 } 410 """ 411 import time 412 413 summary: dict[str, Any] = { 414 "timestamp": time.time(), 415 "healthy": True, 416 "errors": [], 417 } 418 419 server_task = self.get_server_info(as_json=True) 420 keys_task = self.get_access_keys(as_json=True) 421 metrics_status_task = self.get_metrics_status(as_json=True) 422 423 server_result, keys_result, metrics_status_result = await asyncio.gather( 424 server_task, keys_task, metrics_status_task, return_exceptions=True 425 ) 426 427 # Process server info 428 if isinstance(server_result, Exception): 429 summary["healthy"] = False 430 summary["errors"].append(f"Server info error: {server_result}") 431 if logger.isEnabledFor(logging.DEBUG): 432 logger.debug("Failed to fetch server info: %s", server_result) 433 else: 434 summary["server"] = server_result 435 436 # Process access keys 437 if isinstance(keys_result, Exception): 438 summary["healthy"] = False 439 summary["errors"].append(f"Access keys error: {keys_result}") 440 if logger.isEnabledFor(logging.DEBUG): 441 logger.debug("Failed to fetch access keys: %s", keys_result) 442 elif isinstance(keys_result, dict): 443 keys_list = keys_result.get("accessKeys", []) 444 summary["access_keys_count"] = ( 445 len(keys_list) if isinstance(keys_list, list) else 0 446 ) 447 elif isinstance(keys_result, AccessKeyList): 448 summary["access_keys_count"] = len(keys_result.access_keys) 449 else: 450 summary["access_keys_count"] = 0 451 452 # Process metrics status 453 if isinstance(metrics_status_result, Exception): 454 summary["errors"].append(f"Metrics status error: {metrics_status_result}") 455 if logger.isEnabledFor(logging.DEBUG): 456 logger.debug( 457 "Failed to fetch metrics status: %s", metrics_status_result 458 ) 459 elif isinstance(metrics_status_result, dict): 460 metrics_enabled = bool(metrics_status_result.get("metricsEnabled", False)) 461 summary["metrics_enabled"] = metrics_enabled 462 463 # Fetch transfer metrics if enabled (dependent call - sequential) 464 if metrics_enabled: 465 try: 466 transfer = await self.get_transfer_metrics(as_json=True) 467 summary["transfer_metrics"] = transfer 468 except Exception as e: 469 summary["errors"].append(f"Transfer metrics error: {e}") 470 if logger.isEnabledFor(logging.DEBUG): 471 logger.debug("Failed to fetch transfer metrics: %s", e) 472 elif isinstance(metrics_status_result, MetricsStatusResponse): 473 summary["metrics_enabled"] = metrics_status_result.metrics_enabled 474 if metrics_status_result.metrics_enabled: 475 try: 476 transfer = await self.get_transfer_metrics(as_json=True) 477 summary["transfer_metrics"] = transfer 478 except Exception as e: 479 summary["errors"].append(f"Transfer metrics error: {e}") 480 if logger.isEnabledFor(logging.DEBUG): 481 logger.debug("Failed to fetch transfer metrics: %s", e) 482 else: 483 summary["metrics_enabled"] = False 484 485 # Add client status (synchronous, no API call) 486 summary["client_status"] = { 487 "connected": self.is_connected, 488 "circuit_state": self.circuit_state, 489 "active_requests": self.active_requests, 490 "rate_limit": { 491 "limit": self.rate_limit, 492 "available": self.available_slots, 493 }, 494 } 495 496 return summary
Get comprehensive server overview.
Aggregates multiple API calls into a single summary. Continues on partial failures to return maximum information. Executes non-dependent calls concurrently for performance.
Returns
Server summary dictionary with aggregated data
Example result:
{ "timestamp": 1234567890.123, "healthy": True, "server": {...}, "access_keys_count": 10, "metrics_enabled": True, "transfer_metrics": {...}, "client_status": {...}, "errors": [] }
498 def get_status(self) -> dict[str, Any]: 499 """Get current client status (synchronous). 500 501 Returns immediate status without making API calls. 502 Useful for monitoring and debugging. 503 504 :return: Status dictionary with all client metrics 505 506 Example result: 507 { 508 "connected": True, 509 "circuit_state": "closed", 510 "active_requests": 2, 511 "rate_limit": { 512 "limit": 100, 513 "available": 98, 514 "active": 2 515 }, 516 "circuit_metrics": {...} 517 } 518 """ 519 return { 520 "connected": self.is_connected, 521 "circuit_state": self.circuit_state, 522 "active_requests": self.active_requests, 523 "rate_limit": { 524 "limit": self.rate_limit, 525 "available": self.available_slots, 526 "active": self.active_requests, 527 }, 528 "circuit_metrics": self.get_circuit_metrics(), 529 }
Get current client status (synchronous).
Returns immediate status without making API calls. Useful for monitoring and debugging.
Returns
Status dictionary with all client metrics
Example result:
{ "connected": True, "circuit_state": "closed", "active_requests": 2, "rate_limit": { "limit": 100, "available": 98, "active": 2 }, "circuit_metrics": {...} }
57@dataclass(slots=True, frozen=True) 58class AuditContext: 59 """Immutable audit context extracted from function call. 60 61 Uses structural pattern matching and signature inspection for smart extraction. 62 """ 63 64 action: str 65 resource: str 66 success: bool 67 details: dict[str, Any] = field(default_factory=dict) 68 correlation_id: str | None = None 69 70 @classmethod 71 def from_call( 72 cls, 73 func: Callable[..., Any], 74 instance: object, 75 args: tuple[Any, ...], 76 kwargs: dict[str, Any], 77 result: object = None, 78 exception: Exception | None = None, 79 ) -> AuditContext: 80 """Build audit context from function call with intelligent extraction. 81 82 :param func: Function being audited 83 :param instance: Instance (self) for methods 84 :param args: Positional arguments 85 :param kwargs: Keyword arguments 86 :param result: Function result (if successful) 87 :param exception: Exception (if failed) 88 :return: Complete audit context 89 """ 90 success = exception is None 91 92 # Extract action from function name (snake_case -> action) 93 action = func.__name__ 94 95 # Smart resource extraction 96 resource = cls._extract_resource(func, args, kwargs, result, success) 97 98 # Smart details extraction with automatic sanitization 99 details = cls._extract_details(func, args, kwargs, result, exception, success) 100 101 # Correlation ID from instance if available 102 correlation_id = getattr(instance, "_correlation_id", None) 103 104 return cls( 105 action=action, 106 resource=resource, 107 success=success, 108 details=details, 109 correlation_id=correlation_id, 110 ) 111 112 @staticmethod 113 def _extract_resource( 114 func: Callable[..., Any], 115 args: tuple[Any, ...], 116 kwargs: dict[str, Any], 117 result: object, 118 success: bool, 119 ) -> str: 120 """Smart resource extraction using structural pattern matching. 121 122 Priority: 123 1. result.id (for create operations) 124 2. Known resource parameter names (key_id, id, resource_id) 125 3. First meaningful argument 126 4. Function name analysis 127 5. 'unknown' fallback 128 129 :param func: Function being audited 130 :param args: Positional arguments 131 :param kwargs: Keyword arguments 132 :param result: Function result 133 :param success: Whether operation succeeded 134 :return: Resource identifier 135 """ 136 # Pattern 1: Extract from successful result 137 if success and result is not None: 138 match result: 139 case _ if hasattr(result, "id"): 140 return str(result.id) 141 case dict() if "id" in result: 142 return str(result["id"]) 143 144 # Pattern 2: Extract from known parameter names 145 sig = inspect.signature(func) 146 params = list(sig.parameters.keys()) 147 148 # Skip 'self' and 'cls' 149 params = [p for p in params if p not in ("self", "cls")] 150 151 # Try common resource identifiers in priority order 152 for resource_param in ("key_id", "id", "resource_id", "user_id", "name"): 153 if resource_param in kwargs: 154 return str(kwargs[resource_param]) 155 156 # Pattern 3: First meaningful parameter 157 if params and params[0] in kwargs: 158 return str(kwargs[params[0]]) 159 160 # Pattern 4: First positional argument (after self) 161 if args: 162 return str(args[0]) 163 164 # Pattern 5: Analyze function name for hints 165 func_name = func.__name__.lower() 166 if any(keyword in func_name for keyword in ("server", "global", "system")): 167 return "server" 168 169 return "unknown" 170 171 @staticmethod 172 def _extract_details( 173 func: Callable[..., Any], 174 args: tuple[Any, ...], 175 kwargs: dict[str, Any], 176 result: object, 177 exception: Exception | None, 178 success: bool, 179 ) -> dict[str, Any]: 180 """Smart details extraction using signature introspection. 181 182 Only includes meaningful parameters (excludes technical ones and None values). 183 Automatically sanitizes sensitive data. 184 185 :param func: Function being audited 186 :param args: Positional arguments 187 :param kwargs: Keyword arguments 188 :param result: Function result 189 :param exception: Exception if failed 190 :param success: Whether operation succeeded 191 :return: Sanitized details dictionary 192 """ 193 details: dict[str, Any] = {"success": success} 194 195 # Signature-based extraction 196 sig = inspect.signature(func) 197 198 # Parameters to exclude from details 199 excluded = {"self", "cls", "as_json", "return_raw"} 200 201 for param_name, param in sig.parameters.items(): 202 if param_name in excluded: 203 continue 204 205 # Get actual value 206 value = kwargs.get(param_name) 207 208 # Only include meaningful values (not None, not default) 209 if value is not None and value != param.default: 210 # Convert complex objects to simple representations 211 match value: 212 case _ if hasattr(value, "model_dump"): 213 # Pydantic models 214 details[param_name] = value.model_dump(exclude_none=True) 215 case dict(): 216 details[param_name] = value 217 case list() | tuple(): 218 details[param_name] = len(value) # Count, not content 219 case _: 220 details[param_name] = value 221 222 # Add error information if present 223 if exception: 224 details["error"] = str(exception) 225 details["error_type"] = type(exception).__name__ 226 227 # Sanitize sensitive data 228 return _sanitize_details(details)
Immutable audit context extracted from function call.
Uses structural pattern matching and signature inspection for smart extraction.
70 @classmethod 71 def from_call( 72 cls, 73 func: Callable[..., Any], 74 instance: object, 75 args: tuple[Any, ...], 76 kwargs: dict[str, Any], 77 result: object = None, 78 exception: Exception | None = None, 79 ) -> AuditContext: 80 """Build audit context from function call with intelligent extraction. 81 82 :param func: Function being audited 83 :param instance: Instance (self) for methods 84 :param args: Positional arguments 85 :param kwargs: Keyword arguments 86 :param result: Function result (if successful) 87 :param exception: Exception (if failed) 88 :return: Complete audit context 89 """ 90 success = exception is None 91 92 # Extract action from function name (snake_case -> action) 93 action = func.__name__ 94 95 # Smart resource extraction 96 resource = cls._extract_resource(func, args, kwargs, result, success) 97 98 # Smart details extraction with automatic sanitization 99 details = cls._extract_details(func, args, kwargs, result, exception, success) 100 101 # Correlation ID from instance if available 102 correlation_id = getattr(instance, "_correlation_id", None) 103 104 return cls( 105 action=action, 106 resource=resource, 107 success=success, 108 details=details, 109 correlation_id=correlation_id, 110 )
Build audit context from function call with intelligent extraction.
Parameters
- func: Function being audited
- instance: Instance (self) for methods
- args: Positional arguments
- kwargs: Keyword arguments
- result: Function result (if successful)
- exception: Exception (if failed)
Returns
Complete audit context
234@runtime_checkable 235class AuditLogger(Protocol): 236 """Protocol for audit logging implementations. 237 238 Designed for async-first applications with sync fallback support. 239 """ 240 241 async def alog_action( 242 self, 243 action: str, 244 resource: str, 245 *, 246 user: str | None = None, 247 details: dict[str, Any] | None = None, 248 correlation_id: str | None = None, 249 ) -> None: 250 """Log auditable action asynchronously (primary method).""" 251 ... # pragma: no cover 252 253 def log_action( 254 self, 255 action: str, 256 resource: str, 257 *, 258 user: str | None = None, 259 details: dict[str, Any] | None = None, 260 correlation_id: str | None = None, 261 ) -> None: 262 """Log auditable action synchronously (fallback method).""" 263 ... # pragma: no cover 264 265 async def shutdown(self) -> None: 266 """Gracefully shutdown logger.""" 267 ... # pragma: no cover
Protocol for audit logging implementations.
Designed for async-first applications with sync fallback support.
1957def _no_init_or_replace_init(self, *args, **kwargs): 1958 cls = type(self) 1959 1960 if cls._is_protocol: 1961 raise TypeError('Protocols cannot be instantiated') 1962 1963 # Already using a custom `__init__`. No need to calculate correct 1964 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1965 if cls.__init__ is not _no_init_or_replace_init: 1966 return 1967 1968 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1969 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1970 # searches for a proper new `__init__` in the MRO. The new `__init__` 1971 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1972 # instantiation of the protocol subclass will thus use the new 1973 # `__init__` and no longer call `_no_init_or_replace_init`. 1974 for base in cls.__mro__: 1975 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1976 if init is not _no_init_or_replace_init: 1977 cls.__init__ = init 1978 break 1979 else: 1980 # should not happen 1981 cls.__init__ = object.__init__ 1982 1983 cls.__init__(self, *args, **kwargs)
241 async def alog_action( 242 self, 243 action: str, 244 resource: str, 245 *, 246 user: str | None = None, 247 details: dict[str, Any] | None = None, 248 correlation_id: str | None = None, 249 ) -> None: 250 """Log auditable action asynchronously (primary method).""" 251 ... # pragma: no cover
Log auditable action asynchronously (primary method).
253 def log_action( 254 self, 255 action: str, 256 resource: str, 257 *, 258 user: str | None = None, 259 details: dict[str, Any] | None = None, 260 correlation_id: str | None = None, 261 ) -> None: 262 """Log auditable action synchronously (fallback method).""" 263 ... # pragma: no cover
Log auditable action synchronously (fallback method).
390class BandwidthData(BaseValidatedModel): 391 """Bandwidth measurement data. 392 393 SCHEMA: Based on experimental metrics bandwidth current/peak object 394 """ 395 396 data: BandwidthDataValue 397 timestamp: TimestampSec | None = None
Bandwidth measurement data.
SCHEMA: Based on experimental metrics bandwidth current/peak object
381class BandwidthDataValue(BaseValidatedModel): 382 """Bandwidth data value. 383 384 SCHEMA: Based on experimental metrics bandwidth data object 385 """ 386 387 bytes: int
Bandwidth data value.
SCHEMA: Based on experimental metrics bandwidth data object
400class BandwidthInfo(BaseValidatedModel): 401 """Current and peak bandwidth information. 402 403 SCHEMA: Based on experimental metrics bandwidth object 404 """ 405 406 current: BandwidthData 407 peak: BandwidthData
Current and peak bandwidth information.
SCHEMA: Based on experimental metrics bandwidth object
49@dataclass(frozen=True, slots=True) 50class CircuitConfig: 51 """Circuit breaker configuration with validation. 52 53 Immutable configuration to prevent runtime modification. 54 Uses slots for memory efficiency (~40 bytes per instance). 55 """ 56 57 failure_threshold: int = 5 58 recovery_timeout: float = 60.0 59 success_threshold: int = 2 60 call_timeout: float = 10.0 61 62 def __post_init__(self) -> None: 63 """Validate configuration at creation time. 64 65 :raises ValueError: If any configuration value is invalid 66 """ 67 if self.failure_threshold < 1: 68 raise ValueError("failure_threshold must be >= 1") 69 if self.recovery_timeout < 1.0: 70 raise ValueError("recovery_timeout must be >= 1.0") 71 if self.success_threshold < 1: 72 raise ValueError("success_threshold must be >= 1") 73 if self.call_timeout < 0.1: 74 raise ValueError("call_timeout must be >= 0.1")
Circuit breaker configuration with validation.
Immutable configuration to prevent runtime modification. Uses slots for memory efficiency (~40 bytes per instance).
77@dataclass(slots=True) 78class CircuitMetrics: 79 """Circuit breaker metrics with efficient storage. 80 81 Uses slots for memory efficiency (~80 bytes per instance). 82 All calculations are O(1) with no allocations. 83 """ 84 85 total_calls: int = 0 86 successful_calls: int = 0 87 failed_calls: int = 0 88 state_changes: int = 0 89 last_failure_time: float = 0.0 90 last_success_time: float = 0.0 91 92 @property 93 def success_rate(self) -> float: 94 """Calculate success rate (O(1), no allocations). 95 96 :return: Success rate as decimal (0.0 to 1.0) 97 """ 98 if self.total_calls == 0: 99 return 1.0 100 return self.successful_calls / self.total_calls 101 102 @property 103 def failure_rate(self) -> float: 104 """Calculate failure rate (O(1), no allocations). 105 106 :return: Failure rate as decimal (0.0 to 1.0) 107 """ 108 return 1.0 - self.success_rate 109 110 def to_dict(self) -> dict[str, int | float]: 111 """Convert metrics to dictionary for serialization. 112 113 Pre-computes rates to avoid repeated calculations. 114 115 :return: Dictionary representation 116 """ 117 success_rate = self.success_rate # Calculate once 118 return { 119 "total_calls": self.total_calls, 120 "successful_calls": self.successful_calls, 121 "failed_calls": self.failed_calls, 122 "state_changes": self.state_changes, 123 "success_rate": success_rate, 124 "failure_rate": 1.0 - success_rate, # Reuse calculation 125 "last_failure_time": self.last_failure_time, 126 "last_success_time": self.last_success_time, 127 }
Circuit breaker metrics with efficient storage.
Uses slots for memory efficiency (~80 bytes per instance). All calculations are O(1) with no allocations.
92 @property 93 def success_rate(self) -> float: 94 """Calculate success rate (O(1), no allocations). 95 96 :return: Success rate as decimal (0.0 to 1.0) 97 """ 98 if self.total_calls == 0: 99 return 1.0 100 return self.successful_calls / self.total_calls
Calculate success rate (O(1), no allocations).
Returns
Success rate as decimal (0.0 to 1.0)
102 @property 103 def failure_rate(self) -> float: 104 """Calculate failure rate (O(1), no allocations). 105 106 :return: Failure rate as decimal (0.0 to 1.0) 107 """ 108 return 1.0 - self.success_rate
Calculate failure rate (O(1), no allocations).
Returns
Failure rate as decimal (0.0 to 1.0)
110 def to_dict(self) -> dict[str, int | float]: 111 """Convert metrics to dictionary for serialization. 112 113 Pre-computes rates to avoid repeated calculations. 114 115 :return: Dictionary representation 116 """ 117 success_rate = self.success_rate # Calculate once 118 return { 119 "total_calls": self.total_calls, 120 "successful_calls": self.successful_calls, 121 "failed_calls": self.failed_calls, 122 "state_changes": self.state_changes, 123 "success_rate": success_rate, 124 "failure_rate": 1.0 - success_rate, # Reuse calculation 125 "last_failure_time": self.last_failure_time, 126 "last_success_time": self.last_success_time, 127 }
Convert metrics to dictionary for serialization.
Pre-computes rates to avoid repeated calculations.
Returns
Dictionary representation
274class CircuitOpenError(OutlineError): 275 """Circuit breaker is open due to repeated failures. 276 277 Indicates temporary service unavailability. Clients should wait 278 for ``retry_after`` seconds before retrying. 279 280 Attributes: 281 retry_after: Seconds to wait before retry 282 283 Example: 284 >>> error = CircuitOpenError("Circuit open", retry_after=60.0) 285 >>> error.is_retryable # True 286 >>> error.retry_after # 60.0 287 """ 288 289 __slots__ = ("retry_after",) 290 291 _is_retryable: ClassVar[bool] = True 292 293 def __init__(self, message: str, *, retry_after: float = 60.0) -> None: 294 """Initialize circuit open error. 295 296 Args: 297 message: Error message 298 retry_after: Seconds to wait before retry 299 300 Raises: 301 ValueError: If retry_after is negative 302 """ 303 if retry_after < 0: 304 raise ValueError("retry_after must be non-negative") 305 306 # Pre-round for safe_details (avoid repeated rounding) 307 rounded_retry = round(retry_after, 2) 308 safe_details = {"retry_after": rounded_retry} 309 super().__init__(message, safe_details=safe_details) 310 311 self.retry_after = retry_after 312 313 @property 314 def default_retry_delay(self) -> float: 315 """Suggested delay before retry.""" 316 return self.retry_after
Circuit breaker is open due to repeated failures.
Indicates temporary service unavailability. Clients should wait
for retry_after seconds before retrying.
Attributes:
- retry_after: Seconds to wait before retry
Example:
>>> error = CircuitOpenError("Circuit open", retry_after=60.0) >>> error.is_retryable # True >>> error.retry_after # 60.0
293 def __init__(self, message: str, *, retry_after: float = 60.0) -> None: 294 """Initialize circuit open error. 295 296 Args: 297 message: Error message 298 retry_after: Seconds to wait before retry 299 300 Raises: 301 ValueError: If retry_after is negative 302 """ 303 if retry_after < 0: 304 raise ValueError("retry_after must be non-negative") 305 306 # Pre-round for safe_details (avoid repeated rounding) 307 rounded_retry = round(retry_after, 2) 308 safe_details = {"retry_after": rounded_retry} 309 super().__init__(message, safe_details=safe_details) 310 311 self.retry_after = retry_after
Initialize circuit open error.
Arguments:
- message: Error message
- retry_after: Seconds to wait before retry
Raises:
- ValueError: If retry_after is negative
36class CircuitState(Enum): 37 """Circuit breaker states. 38 39 CLOSED: Normal operation, requests pass through (hot path) 40 OPEN: Failures exceeded threshold, requests blocked 41 HALF_OPEN: Testing recovery, limited requests allowed 42 """ 43 44 CLOSED = auto() 45 OPEN = auto() 46 HALF_OPEN = auto()
Circuit breaker states.
CLOSED: Normal operation, requests pass through (hot path) OPEN: Failures exceeded threshold, requests blocked HALF_OPEN: Testing recovery, limited requests allowed
736class ConfigOverrides(TypedDict, total=False): 737 """Type-safe configuration overrides. 738 739 All fields are optional, allowing selective parameter overriding 740 while maintaining type safety. 741 """ 742 743 timeout: int 744 retry_attempts: int 745 max_connections: int 746 rate_limit: int 747 user_agent: str 748 enable_circuit_breaker: bool 749 circuit_failure_threshold: int 750 circuit_recovery_timeout: float 751 circuit_success_threshold: int 752 circuit_call_timeout: float 753 enable_logging: bool 754 json_format: bool 755 allow_private_networks: bool 756 resolve_dns_for_ssrf: bool
Type-safe configuration overrides.
All fields are optional, allowing selective parameter overriding while maintaining type safety.
319class ConfigurationError(OutlineError): 320 """Invalid or missing configuration. 321 322 Attributes: 323 field: Configuration field name that failed 324 security_issue: Whether this is a security-related issue 325 326 Example: 327 >>> error = ConfigurationError( 328 ... "Missing API URL", field="api_url", security_issue=True 329 ... ) 330 """ 331 332 __slots__ = ("field", "security_issue") 333 334 def __init__( 335 self, 336 message: str, 337 *, 338 field: str | None = None, 339 security_issue: bool = False, 340 ) -> None: 341 """Initialize configuration error. 342 343 Args: 344 message: Error message 345 field: Configuration field name 346 security_issue: Whether this is a security issue 347 """ 348 safe_details: dict[str, Any] | None = None 349 if field or security_issue: 350 safe_details = {} 351 if field: 352 safe_details["field"] = field 353 if security_issue: 354 safe_details["security_issue"] = True 355 356 super().__init__(message, safe_details=safe_details) 357 358 self.field = field 359 self.security_issue = security_issue
Invalid or missing configuration.
Attributes:
- field: Configuration field name that failed
- security_issue: Whether this is a security-related issue
Example:
>>> error = ConfigurationError( ... "Missing API URL", field="api_url", security_issue=True ... )
334 def __init__( 335 self, 336 message: str, 337 *, 338 field: str | None = None, 339 security_issue: bool = False, 340 ) -> None: 341 """Initialize configuration error. 342 343 Args: 344 message: Error message 345 field: Configuration field name 346 security_issue: Whether this is a security issue 347 """ 348 safe_details: dict[str, Any] | None = None 349 if field or security_issue: 350 safe_details = {} 351 if field: 352 safe_details["field"] = field 353 if security_issue: 354 safe_details["security_issue"] = True 355 356 super().__init__(message, safe_details=safe_details) 357 358 self.field = field 359 self.security_issue = security_issue
Initialize configuration error.
Arguments:
- message: Error message
- field: Configuration field name
- security_issue: Whether this is a security issue
84class Constants: 85 """Application-wide constants with security limits.""" 86 87 # Port constraints 88 MIN_PORT: Final[int] = 1 89 MAX_PORT: Final[int] = 65535 90 91 # Length limits 92 MAX_NAME_LENGTH: Final[int] = 255 93 CERT_FINGERPRINT_LENGTH: Final[int] = 64 94 MAX_KEY_ID_LENGTH: Final[int] = 255 95 MAX_URL_LENGTH: Final[int] = 2048 96 97 # Network defaults 98 DEFAULT_TIMEOUT: Final[int] = 10 99 DEFAULT_RETRY_ATTEMPTS: Final[int] = 2 100 DEFAULT_MIN_CONNECTIONS: Final[int] = 1 101 DEFAULT_MAX_CONNECTIONS: Final[int] = 100 102 DEFAULT_RETRY_DELAY: Final[float] = 1.0 103 DEFAULT_MIN_TIMEOUT: Final[int] = 1 104 DEFAULT_MAX_TIMEOUT: Final[int] = 300 105 DEFAULT_USER_AGENT: Final[str] = "PyOutlineAPI/0.4.0" 106 _MIN_RATE_LIMIT: Final[int] = 1 107 _MAX_RATE_LIMIT: Final[int] = 1000 108 _SAFETY_MARGIN: Final[float] = 10.0 109 110 # Resource limits 111 MAX_RECURSION_DEPTH: Final[int] = 10 112 MAX_SNAPSHOT_SIZE_MB: Final[int] = 10 113 114 # HTTP retry codes 115 RETRY_STATUS_CODES: Final[frozenset[int]] = frozenset( 116 {408, 429, 500, 502, 503, 504} 117 ) 118 119 # Logging levels 120 LOG_LEVEL_DEBUG: Final[int] = logging.DEBUG 121 LOG_LEVEL_INFO: Final[int] = logging.INFO 122 LOG_LEVEL_WARNING: Final[int] = logging.WARNING 123 LOG_LEVEL_ERROR: Final[int] = logging.ERROR 124 125 # ===== Security limits ===== 126 127 # Response size protection (DoS prevention) 128 MAX_RESPONSE_SIZE: Final[int] = 10 * 1024 * 1024 # 10 MB 129 MAX_RESPONSE_CHUNK_SIZE: Final[int] = 8192 # 8 KB chunks 130 131 # Rate limiting defaults 132 DEFAULT_RATE_LIMIT_RPS: Final[float] = 100.0 # Requests per second 133 DEFAULT_RATE_LIMIT_BURST: Final[int] = 200 # Burst capacity 134 DEFAULT_RATE_LIMIT: Final[int] = 100 # Concurrent requests 135 136 # Connection limits 137 MAX_CONNECTIONS_PER_HOST: Final[int] = 50 138 DNS_CACHE_TTL: Final[int] = 300 # 5 minutes 139 140 # Timeout strategies 141 TIMEOUT_WARNING_RATIO: Final[float] = 0.8 # Warn at 80% of timeout 142 MAX_TIMEOUT: Final[int] = 300 # 5 minutes absolute max
Application-wide constants with security limits.
281class CredentialSanitizer: 282 """Sanitize credentials from strings and exceptions.""" 283 284 # Patterns for detecting credentials 285 PATTERNS: Final[list[tuple[re.Pattern[str], str]]] = [ 286 ( 287 re.compile( 288 r'api[_-]?key["\']?\s*[:=]\s*["\']?([a-zA-Z0-9_\-\.]{20,})', 289 re.IGNORECASE, 290 ), 291 "***API_KEY***", 292 ), 293 ( 294 re.compile( 295 r'token["\']?\s*[:=]\s*["\']?([a-zA-Z0-9_\-.]{20,})', re.IGNORECASE 296 ), 297 "***TOKEN***", 298 ), 299 ( 300 re.compile(r'password["\']?\s*[:=]\s*["\']?([^\s"\']+)', re.IGNORECASE), 301 "***PASSWORD***", 302 ), 303 ( 304 re.compile( 305 r'cert[_-]?sha256["\']?\s*[:=]\s*["\']?([a-f0-9]{64})', re.IGNORECASE 306 ), 307 "***CERT***", 308 ), 309 ( 310 re.compile(r"bearer\s+([a-zA-Z0-9\-._~+/]+=*)", re.IGNORECASE), 311 "Bearer ***TOKEN***", 312 ), 313 ( 314 re.compile(r"access_url['\"]?\s*[:=]\s*['\"]?([^\s'\"]+)", re.IGNORECASE), 315 "***ACCESS_URL***", 316 ), 317 ] 318 319 @classmethod 320 @lru_cache(maxsize=512) 321 def sanitize(cls, text: str) -> str: 322 """Remove credentials from string. 323 324 :param text: Text that may contain credentials 325 :return: Sanitized text 326 """ 327 if not text: 328 return text 329 330 sanitized = text 331 for pattern, replacement in cls.PATTERNS: 332 sanitized = pattern.sub(replacement, sanitized) 333 return sanitized
Sanitize credentials from strings and exceptions.
319 @classmethod 320 @lru_cache(maxsize=512) 321 def sanitize(cls, text: str) -> str: 322 """Remove credentials from string. 323 324 :param text: Text that may contain credentials 325 :return: Sanitized text 326 """ 327 if not text: 328 return text 329 330 sanitized = text 331 for pattern, replacement in cls.PATTERNS: 332 sanitized = pattern.sub(replacement, sanitized) 333 return sanitized
Remove credentials from string.
Parameters
- text: Text that may contain credentials
Returns
Sanitized text
104class DataLimit(BaseValidatedModel, ByteConversionMixin): 105 """Data transfer limit in bytes with unit conversions.""" 106 107 bytes: Bytes 108 109 @classmethod 110 def from_kilobytes(cls, kb: float) -> Self: 111 """Create DataLimit from kilobytes. 112 113 :param kb: Size in kilobytes 114 :return: DataLimit instance 115 """ 116 return cls(bytes=int(kb * _BYTES_IN_KB)) 117 118 @classmethod 119 def from_megabytes(cls, mb: float) -> Self: 120 """Create DataLimit from megabytes. 121 122 :param mb: Size in megabytes 123 :return: DataLimit instance 124 """ 125 return cls(bytes=int(mb * _BYTES_IN_MB)) 126 127 @classmethod 128 def from_gigabytes(cls, gb: float) -> Self: 129 """Create DataLimit from gigabytes. 130 131 :param gb: Size in gigabytes 132 :return: DataLimit instance 133 """ 134 return cls(bytes=int(gb * _BYTES_IN_GB))
Data transfer limit in bytes with unit conversions.
Size in bytes
109 @classmethod 110 def from_kilobytes(cls, kb: float) -> Self: 111 """Create DataLimit from kilobytes. 112 113 :param kb: Size in kilobytes 114 :return: DataLimit instance 115 """ 116 return cls(bytes=int(kb * _BYTES_IN_KB))
Create DataLimit from kilobytes.
Parameters
- kb: Size in kilobytes
Returns
DataLimit instance
118 @classmethod 119 def from_megabytes(cls, mb: float) -> Self: 120 """Create DataLimit from megabytes. 121 122 :param mb: Size in megabytes 123 :return: DataLimit instance 124 """ 125 return cls(bytes=int(mb * _BYTES_IN_MB))
Create DataLimit from megabytes.
Parameters
- mb: Size in megabytes
Returns
DataLimit instance
127 @classmethod 128 def from_gigabytes(cls, gb: float) -> Self: 129 """Create DataLimit from gigabytes. 130 131 :param gb: Size in gigabytes 132 :return: DataLimit instance 133 """ 134 return cls(bytes=int(gb * _BYTES_IN_GB))
Create DataLimit from gigabytes.
Parameters
- gb: Size in gigabytes
Returns
DataLimit instance
540class DataLimitRequest(BaseValidatedModel): 541 """Request model for setting data limit. 542 543 Note: 544 The API expects the DataLimit object directly. 545 Use to_payload() to produce the correct request body. 546 """ 547 548 limit: DataLimit 549 550 def to_payload(self) -> dict[str, dict[str, int]]: 551 """Convert to API request payload. 552 553 :return: Payload dict with limit object 554 """ 555 return {"limit": cast(dict[str, int], self.limit.model_dump(by_alias=True))}
Request model for setting data limit.
Note:
The API expects the DataLimit object directly. Use to_payload() to produce the correct request body.
550 def to_payload(self) -> dict[str, dict[str, int]]: 551 """Convert to API request payload. 552 553 :return: Payload dict with limit object 554 """ 555 return {"limit": cast(dict[str, int], self.limit.model_dump(by_alias=True))}
Convert to API request payload.
Returns
Payload dict with limit object
372class DataTransferred(BaseValidatedModel, ByteConversionMixin): 373 """Data transfer metric with byte conversions. 374 375 SCHEMA: Based on experimental metrics dataTransferred object 376 """ 377 378 bytes: Bytes
Data transfer metric with byte conversions.
SCHEMA: Based on experimental metrics dataTransferred object
273class DefaultAuditLogger: 274 """Async audit logger with batching and backpressure handling.""" 275 276 __slots__ = ( 277 "_batch_size", 278 "_batch_timeout", 279 "_lock", 280 "_queue", 281 "_queue_size", 282 "_shutdown_event", 283 "_task", 284 ) 285 286 def __init__( 287 self, 288 *, 289 queue_size: int = 10000, 290 batch_size: int = 100, 291 batch_timeout: float = 1.0, 292 ) -> None: 293 """Initialize audit logger with batching support. 294 295 :param queue_size: Maximum queue size (backpressure protection) 296 :param batch_size: Maximum batch size for processing 297 :param batch_timeout: Maximum time to wait for batch completion (seconds) 298 """ 299 self._queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=queue_size) 300 self._queue_size = queue_size 301 self._batch_size = batch_size 302 self._batch_timeout = batch_timeout 303 self._task: asyncio.Task[None] | None = None 304 self._shutdown_event = asyncio.Event() 305 self._lock = asyncio.Lock() 306 307 async def alog_action( 308 self, 309 action: str, 310 resource: str, 311 *, 312 user: str | None = None, 313 details: dict[str, Any] | None = None, 314 correlation_id: str | None = None, 315 ) -> None: 316 """Log auditable action asynchronously with automatic batching. 317 318 :param action: Action being performed 319 :param resource: Resource identifier 320 :param user: User performing the action (optional) 321 :param details: Additional structured details (optional) 322 :param correlation_id: Request correlation ID (optional) 323 """ 324 if self._shutdown_event.is_set(): 325 # Fallback to sync logging during shutdown 326 return self.log_action( 327 action, 328 resource, 329 user=user, 330 details=details, 331 correlation_id=correlation_id, 332 ) 333 334 # Ensure background task is running 335 await self._ensure_task_running() 336 337 # Build log entry 338 entry = self._build_entry(action, resource, user, details, correlation_id) 339 340 # Try to enqueue, handle backpressure 341 try: 342 self._queue.put_nowait(entry) 343 except asyncio.QueueFull: 344 # Backpressure: log warning and use sync fallback 345 if logger.isEnabledFor(logging.WARNING): 346 logger.warning( 347 "[AUDIT] Queue full (%d items), using sync fallback", 348 self._queue_size, 349 ) 350 self.log_action( 351 action, 352 resource, 353 user=user, 354 details=details, 355 correlation_id=correlation_id, 356 ) 357 358 def log_action( 359 self, 360 action: str, 361 resource: str, 362 *, 363 user: str | None = None, 364 details: dict[str, Any] | None = None, 365 correlation_id: str | None = None, 366 ) -> None: 367 """Log auditable action synchronously (fallback method). 368 369 :param action: Action being performed 370 :param resource: Resource identifier 371 :param user: User performing the action (optional) 372 :param details: Additional structured details (optional) 373 :param correlation_id: Request correlation ID (optional) 374 """ 375 entry = self._build_entry(action, resource, user, details, correlation_id) 376 self._write_log(entry) 377 378 async def _ensure_task_running(self) -> None: 379 """Ensure background processing task is running (lazy start with lock).""" 380 if self._task is not None and not self._task.done(): 381 return 382 383 async with self._lock: 384 # Double-check after acquiring lock 385 if self._task is None or self._task.done(): 386 self._task = asyncio.create_task( 387 self._process_queue(), name="audit-logger" 388 ) 389 390 async def _process_queue(self) -> None: 391 """Background task for processing audit logs in batches. 392 393 Uses batching for improved throughput and reduced I/O overhead. 394 """ 395 batch: list[dict[str, Any]] = [] 396 397 try: 398 while not self._shutdown_event.is_set(): 399 try: 400 # Wait for item with timeout for batch processing 401 entry = await asyncio.wait_for( 402 self._queue.get(), timeout=self._batch_timeout 403 ) 404 batch.append(entry) 405 406 # Process batch when size reached or queue empty 407 if len(batch) >= self._batch_size or self._queue.empty(): 408 self._write_batch(batch) 409 batch.clear() 410 411 self._queue.task_done() 412 413 except asyncio.TimeoutError: 414 # Timeout: flush partial batch if any 415 if batch: 416 self._write_batch(batch) 417 batch.clear() 418 419 except asyncio.CancelledError: 420 # Flush remaining batch on cancellation 421 if batch: 422 self._write_batch(batch) 423 raise 424 finally: 425 if logger.isEnabledFor(logging.DEBUG): 426 logger.debug("[AUDIT] Queue processor stopped") 427 428 def _write_batch(self, batch: list[dict[str, Any]]) -> None: 429 """Write batch of log entries efficiently. 430 431 :param batch: Batch of log entries to write 432 """ 433 for entry in batch: 434 self._write_log(entry) 435 436 def _write_log(self, entry: dict[str, Any]) -> None: 437 """Write single log entry to logger. 438 439 :param entry: Log entry to write 440 """ 441 message = self._format_message(entry) 442 logger.info(message, extra=entry) 443 444 @staticmethod 445 def _build_entry( 446 action: str, 447 resource: str, 448 user: str | None, 449 details: dict[str, Any] | None, 450 correlation_id: str | None, 451 ) -> dict[str, Any]: 452 """Build structured log entry with sanitization. 453 454 :param action: Action being performed 455 :param resource: Resource identifier 456 :param user: User performing action 457 :param details: Additional details 458 :param correlation_id: Correlation ID 459 :return: Structured log entry 460 """ 461 entry: dict[str, Any] = { 462 "action": action, 463 "resource": resource, 464 "timestamp": time.time(), 465 "is_audit": True, 466 } 467 468 if user is not None: 469 entry["user"] = user 470 if correlation_id is not None: 471 entry["correlation_id"] = correlation_id 472 if details is not None: 473 entry["details"] = _sanitize_details(details) 474 475 return entry 476 477 @staticmethod 478 def _format_message(entry: dict[str, Any]) -> str: 479 """Format audit log message for human readability. 480 481 :param entry: Log entry 482 :return: Formatted message 483 """ 484 action = entry["action"] 485 resource = entry["resource"] 486 user = entry.get("user") 487 correlation_id = entry.get("correlation_id") 488 489 parts = ["[AUDIT]", action, "on", resource] 490 491 if user: 492 parts.extend(["by", user]) 493 if correlation_id: 494 parts.append(f"[{correlation_id}]") 495 496 return " ".join(parts) 497 498 async def shutdown(self, *, timeout: float = 5.0) -> None: 499 """Gracefully shutdown audit logger with queue draining. 500 501 :param timeout: Maximum time to wait for queue to drain (seconds) 502 """ 503 async with self._lock: 504 if self._shutdown_event.is_set(): 505 return 506 507 self._shutdown_event.set() 508 509 if logger.isEnabledFor(logging.DEBUG): 510 logger.debug("[AUDIT] Shutting down, draining queue") 511 512 # Wait for queue to drain 513 try: 514 await asyncio.wait_for(self._queue.join(), timeout=timeout) 515 except asyncio.TimeoutError: 516 remaining = self._queue.qsize() 517 if logger.isEnabledFor(logging.WARNING): 518 logger.warning( 519 "[AUDIT] Queue did not drain within %ss, %d items remaining", 520 timeout, 521 remaining, 522 ) 523 524 # Cancel processing task 525 if self._task and not self._task.done(): 526 self._task.cancel() 527 with suppress(asyncio.CancelledError): 528 await self._task 529 530 if logger.isEnabledFor(logging.DEBUG): 531 logger.debug("[AUDIT] Shutdown complete")
Async audit logger with batching and backpressure handling.
286 def __init__( 287 self, 288 *, 289 queue_size: int = 10000, 290 batch_size: int = 100, 291 batch_timeout: float = 1.0, 292 ) -> None: 293 """Initialize audit logger with batching support. 294 295 :param queue_size: Maximum queue size (backpressure protection) 296 :param batch_size: Maximum batch size for processing 297 :param batch_timeout: Maximum time to wait for batch completion (seconds) 298 """ 299 self._queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=queue_size) 300 self._queue_size = queue_size 301 self._batch_size = batch_size 302 self._batch_timeout = batch_timeout 303 self._task: asyncio.Task[None] | None = None 304 self._shutdown_event = asyncio.Event() 305 self._lock = asyncio.Lock()
Initialize audit logger with batching support.
Parameters
- queue_size: Maximum queue size (backpressure protection)
- batch_size: Maximum batch size for processing
- batch_timeout: Maximum time to wait for batch completion (seconds)
307 async def alog_action( 308 self, 309 action: str, 310 resource: str, 311 *, 312 user: str | None = None, 313 details: dict[str, Any] | None = None, 314 correlation_id: str | None = None, 315 ) -> None: 316 """Log auditable action asynchronously with automatic batching. 317 318 :param action: Action being performed 319 :param resource: Resource identifier 320 :param user: User performing the action (optional) 321 :param details: Additional structured details (optional) 322 :param correlation_id: Request correlation ID (optional) 323 """ 324 if self._shutdown_event.is_set(): 325 # Fallback to sync logging during shutdown 326 return self.log_action( 327 action, 328 resource, 329 user=user, 330 details=details, 331 correlation_id=correlation_id, 332 ) 333 334 # Ensure background task is running 335 await self._ensure_task_running() 336 337 # Build log entry 338 entry = self._build_entry(action, resource, user, details, correlation_id) 339 340 # Try to enqueue, handle backpressure 341 try: 342 self._queue.put_nowait(entry) 343 except asyncio.QueueFull: 344 # Backpressure: log warning and use sync fallback 345 if logger.isEnabledFor(logging.WARNING): 346 logger.warning( 347 "[AUDIT] Queue full (%d items), using sync fallback", 348 self._queue_size, 349 ) 350 self.log_action( 351 action, 352 resource, 353 user=user, 354 details=details, 355 correlation_id=correlation_id, 356 )
Log auditable action asynchronously with automatic batching.
Parameters
- action: Action being performed
- resource: Resource identifier
- user: User performing the action (optional)
- details: Additional structured details (optional)
- correlation_id: Request correlation ID (optional)
358 def log_action( 359 self, 360 action: str, 361 resource: str, 362 *, 363 user: str | None = None, 364 details: dict[str, Any] | None = None, 365 correlation_id: str | None = None, 366 ) -> None: 367 """Log auditable action synchronously (fallback method). 368 369 :param action: Action being performed 370 :param resource: Resource identifier 371 :param user: User performing the action (optional) 372 :param details: Additional structured details (optional) 373 :param correlation_id: Request correlation ID (optional) 374 """ 375 entry = self._build_entry(action, resource, user, details, correlation_id) 376 self._write_log(entry)
Log auditable action synchronously (fallback method).
Parameters
- action: Action being performed
- resource: Resource identifier
- user: User performing the action (optional)
- details: Additional structured details (optional)
- correlation_id: Request correlation ID (optional)
498 async def shutdown(self, *, timeout: float = 5.0) -> None: 499 """Gracefully shutdown audit logger with queue draining. 500 501 :param timeout: Maximum time to wait for queue to drain (seconds) 502 """ 503 async with self._lock: 504 if self._shutdown_event.is_set(): 505 return 506 507 self._shutdown_event.set() 508 509 if logger.isEnabledFor(logging.DEBUG): 510 logger.debug("[AUDIT] Shutting down, draining queue") 511 512 # Wait for queue to drain 513 try: 514 await asyncio.wait_for(self._queue.join(), timeout=timeout) 515 except asyncio.TimeoutError: 516 remaining = self._queue.qsize() 517 if logger.isEnabledFor(logging.WARNING): 518 logger.warning( 519 "[AUDIT] Queue did not drain within %ss, %d items remaining", 520 timeout, 521 remaining, 522 ) 523 524 # Cancel processing task 525 if self._task and not self._task.done(): 526 self._task.cancel() 527 with suppress(asyncio.CancelledError): 528 await self._task 529 530 if logger.isEnabledFor(logging.DEBUG): 531 logger.debug("[AUDIT] Shutdown complete")
Gracefully shutdown audit logger with queue draining.
Parameters
- timeout: Maximum time to wait for queue to drain (seconds)
463class DevelopmentConfig(OutlineClientConfig): 464 """Development configuration with relaxed security. 465 466 Optimized for local development and testing with: 467 - Extended timeouts for debugging 468 - Detailed logging enabled by default 469 - Circuit breaker disabled for easier testing 470 """ 471 472 model_config = SettingsConfigDict( 473 env_prefix=_DEV_ENV_PREFIX, 474 env_file=".env.dev", 475 case_sensitive=False, 476 extra="forbid", 477 ) 478 479 enable_logging: bool = True 480 enable_circuit_breaker: bool = False 481 timeout: int = 30
Development configuration with relaxed security.
Optimized for local development and testing with:
- Extended timeouts for debugging
- Detailed logging enabled by default
- Circuit breaker disabled for easier testing
580class ErrorResponse(BaseValidatedModel): 581 """Error response with optimized string formatting. 582 583 SCHEMA: Based on API error response format 584 """ 585 586 code: str 587 message: str 588 589 def __str__(self) -> str: 590 """Format error as string (optimized f-string). 591 592 :return: Formatted error message 593 """ 594 return f"{self.code}: {self.message}"
Error response with optimized string formatting.
SCHEMA: Based on API error response format
467class ExperimentalMetrics(BaseValidatedModel): 468 """Experimental metrics with optimized lookup. 469 470 SCHEMA: Based on GET /experimental/server/metrics response 471 """ 472 473 server: ServerExperimentalMetric 474 access_keys: list[AccessKeyMetric] = Field(alias="accessKeys") 475 476 def get_key_metric(self, key_id: str) -> AccessKeyMetric | None: 477 """Get metrics for specific key with early return. 478 479 :param key_id: Access key ID 480 :return: Key metrics or None if not found 481 """ 482 for metric in self.access_keys: 483 if metric.access_key_id == key_id: 484 return metric # Early return 485 return None
Experimental metrics with optimized lookup.
SCHEMA: Based on GET /experimental/server/metrics response
476 def get_key_metric(self, key_id: str) -> AccessKeyMetric | None: 477 """Get metrics for specific key with early return. 478 479 :param key_id: Access key ID 480 :return: Key metrics or None if not found 481 """ 482 for metric in self.access_keys: 483 if metric.access_key_id == key_id: 484 return metric # Early return 485 return None
Get metrics for specific key with early return.
Parameters
- key_id: Access key ID
Returns
Key metrics or None if not found
600class HealthCheckResult(BaseValidatedModel): 601 """Health check result with optimized diagnostics.""" 602 603 healthy: bool 604 timestamp: float 605 checks: ChecksDict 606 607 @cached_property 608 def failed_checks(self) -> list[str]: 609 """Get failed checks (cached for repeated access). 610 611 :return: List of failed check names 612 """ 613 return [ 614 name 615 for name, result in self.checks.items() 616 if result.get("status") != "healthy" 617 ] 618 619 @property 620 def success_rate(self) -> float: 621 """Calculate success rate (uses cached failed_checks). 622 623 :return: Success rate (0.0 to 1.0) 624 """ 625 if not self.checks: 626 return 1.0 # Early return 627 628 total = len(self.checks) 629 passed = total - len(self.failed_checks) # Uses cached property 630 return passed / total
Health check result with optimized diagnostics.
607 @cached_property 608 def failed_checks(self) -> list[str]: 609 """Get failed checks (cached for repeated access). 610 611 :return: List of failed check names 612 """ 613 return [ 614 name 615 for name, result in self.checks.items() 616 if result.get("status") != "healthy" 617 ]
Get failed checks (cached for repeated access).
Returns
List of failed check names
619 @property 620 def success_rate(self) -> float: 621 """Calculate success rate (uses cached failed_checks). 622 623 :return: Success rate (0.0 to 1.0) 624 """ 625 if not self.checks: 626 return 1.0 # Early return 627 628 total = len(self.checks) 629 passed = total - len(self.failed_checks) # Uses cached property 630 return passed / total
Calculate success rate (uses cached failed_checks).
Returns
Success rate (0.0 to 1.0)
513class HostnameRequest(BaseValidatedModel): 514 """Request model for setting hostname. 515 516 SCHEMA: Based on PUT /server/hostname-for-access-keys request body 517 """ 518 519 hostname: str = Field(min_length=1)
Request model for setting hostname.
SCHEMA: Based on PUT /server/hostname-for-access-keys request body
410class LocationMetric(BaseValidatedModel): 411 """Location-based usage metric. 412 413 SCHEMA: Based on experimental metrics locations array item 414 """ 415 416 location: str 417 asn: int | None = None 418 as_org: str | None = Field(None, alias="asOrg") 419 tunnel_time: TunnelTime = Field(alias="tunnelTime") 420 data_transferred: DataTransferred = Field(alias="dataTransferred")
Location-based usage metric.
SCHEMA: Based on experimental metrics locations array item
70class MetricsCollector(Protocol): 71 """Protocol for metrics collection. 72 73 Allows dependency injection of custom metrics backends. 74 """ 75 76 def increment(self, metric: str, *, tags: MetricsTags | None = None) -> None: 77 """Increment counter metric.""" 78 ... # pragma: no cover 79 80 def timing( 81 self, metric: str, value: float, *, tags: MetricsTags | None = None 82 ) -> None: 83 """Record timing metric.""" 84 ... # pragma: no cover 85 86 def gauge( 87 self, metric: str, value: float, *, tags: MetricsTags | None = None 88 ) -> None: 89 """Set gauge metric.""" 90 ... # pragma: no cover
Protocol for metrics collection.
Allows dependency injection of custom metrics backends.
1957def _no_init_or_replace_init(self, *args, **kwargs): 1958 cls = type(self) 1959 1960 if cls._is_protocol: 1961 raise TypeError('Protocols cannot be instantiated') 1962 1963 # Already using a custom `__init__`. No need to calculate correct 1964 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1965 if cls.__init__ is not _no_init_or_replace_init: 1966 return 1967 1968 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1969 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1970 # searches for a proper new `__init__` in the MRO. The new `__init__` 1971 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1972 # instantiation of the protocol subclass will thus use the new 1973 # `__init__` and no longer call `_no_init_or_replace_init`. 1974 for base in cls.__mro__: 1975 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1976 if init is not _no_init_or_replace_init: 1977 cls.__init__ = init 1978 break 1979 else: 1980 # should not happen 1981 cls.__init__ = object.__init__ 1982 1983 cls.__init__(self, *args, **kwargs)
76 def increment(self, metric: str, *, tags: MetricsTags | None = None) -> None: 77 """Increment counter metric.""" 78 ... # pragma: no cover
Increment counter metric.
558class MetricsEnabledRequest(BaseValidatedModel): 559 """Request model for enabling/disabling metrics. 560 561 SCHEMA: Based on PUT /metrics/enabled request body 562 """ 563 564 metrics_enabled: bool = Field(alias="metricsEnabled")
Request model for enabling/disabling metrics.
SCHEMA: Based on PUT /metrics/enabled request body
567class MetricsStatusResponse(BaseValidatedModel): 568 """Response model for metrics status. 569 570 Returns current metrics sharing status. 571 SCHEMA: Based on GET /metrics/enabled response 572 """ 573 574 metrics_enabled: bool = Field(alias="metricsEnabled")
Response model for metrics status.
Returns current metrics sharing status. SCHEMA: Based on GET /metrics/enabled response
553class MultiServerManager: 554 """High-performance manager for multiple Outline servers. 555 556 Features: 557 - Concurrent operations across all servers 558 - Health checking and automatic failover 559 - Aggregated metrics and status 560 - Graceful shutdown with cleanup 561 - Thread-safe operations 562 563 Limits: 564 - Maximum 50 servers (configurable via _MAX_SERVERS) 565 - Automatic cleanup with weak references 566 """ 567 568 __slots__ = ( 569 "_audit_logger", 570 "_clients", 571 "_configs", 572 "_default_timeout", 573 "_lock", 574 "_metrics", 575 ) 576 577 def __init__( 578 self, 579 configs: Sequence[OutlineClientConfig], 580 *, 581 audit_logger: AuditLogger | None = None, 582 metrics: MetricsCollector | None = None, 583 default_timeout: float = _DEFAULT_SERVER_TIMEOUT, 584 ) -> None: 585 """Initialize multiserver manager. 586 587 :param configs: Sequence of server configurations 588 :param audit_logger: Shared audit logger for all servers 589 :param metrics: Shared metrics collector for all servers 590 :param default_timeout: Default timeout for operations (seconds) 591 :raises ConfigurationError: If too many servers or invalid configs 592 """ 593 if len(configs) > _MAX_SERVERS: 594 raise ConfigurationError( 595 f"Too many servers: {len(configs)} (max: {_MAX_SERVERS})" 596 ) 597 598 if not configs: 599 raise ConfigurationError("At least one server configuration required") 600 601 self._configs = list(configs) 602 self._clients: dict[str, AsyncOutlineClient] = {} 603 self._audit_logger = audit_logger 604 self._metrics = metrics 605 self._default_timeout = default_timeout 606 self._lock = asyncio.Lock() 607 608 @property 609 def server_count(self) -> int: 610 """Get total number of configured servers. 611 612 :return: Number of servers 613 """ 614 return len(self._configs) 615 616 @property 617 def active_servers(self) -> int: 618 """Get number of active (connected) servers. 619 620 :return: Number of active servers 621 """ 622 return sum(1 for client in self._clients.values() if client.is_connected) 623 624 def get_server_names(self) -> list[str]: 625 """Get list of sanitized server URLs. 626 627 URLs are sanitized to remove sensitive path information. 628 629 :return: List of safe server identifiers 630 """ 631 return [ 632 Validators.sanitize_url_for_logging(config.api_url) 633 for config in self._configs 634 ] 635 636 async def __aenter__(self) -> MultiServerManager: 637 """Async context manager entry. 638 639 :return: Self reference 640 :raises ConfigurationError: If NO servers can be initialized 641 """ 642 async with self._lock: 643 # Create initialization tasks for concurrent execution 644 init_tasks = [] 645 for config in self._configs: 646 client = AsyncOutlineClient( 647 config=config, 648 audit_logger=self._audit_logger, 649 metrics=self._metrics, 650 ) 651 init_tasks.append((config, client.__aenter__())) 652 653 results = await asyncio.gather( 654 *[task for _, task in init_tasks], 655 return_exceptions=True, 656 ) 657 658 # Process results 659 errors: list[str] = [] 660 for idx, ((config, _), result) in enumerate( 661 zip(init_tasks, results, strict=True) 662 ): 663 safe_url = Validators.sanitize_url_for_logging(config.api_url) 664 665 if isinstance(result, Exception): 666 error_msg = f"Failed to initialize server {safe_url}: {result}" 667 errors.append(error_msg) 668 if logger.isEnabledFor(logging.WARNING): 669 logger.warning(error_msg) 670 else: 671 # Get the client that was initialized 672 client = AsyncOutlineClient( 673 config=config, 674 audit_logger=self._audit_logger, 675 metrics=self._metrics, 676 ) 677 self._clients[safe_url] = client 678 679 if logger.isEnabledFor(logging.INFO): 680 logger.info( 681 "Server %d/%d initialized: %s", 682 idx + 1, 683 len(self._configs), 684 safe_url, 685 ) 686 687 if not self._clients: 688 raise ConfigurationError( 689 f"Failed to initialize any servers. Errors: {'; '.join(errors)}" 690 ) 691 692 if logger.isEnabledFor(logging.INFO): 693 logger.info( 694 "MultiServerManager ready: %d/%d servers active", 695 len(self._clients), 696 len(self._configs), 697 ) 698 699 return self 700 701 async def __aexit__( 702 self, 703 exc_type: type[BaseException] | None, 704 exc_val: BaseException | None, 705 exc_tb: object | None, 706 ) -> bool: 707 """Async context manager exit. 708 709 :param exc_type: Exception type 710 :param exc_val: Exception value 711 :param exc_tb: Exception traceback 712 :return: False to propagate exceptions 713 """ 714 async with self._lock: 715 shutdown_tasks = [ 716 client.__aexit__(None, None, None) for client in self._clients.values() 717 ] 718 719 results = await asyncio.gather(*shutdown_tasks, return_exceptions=True) 720 721 errors = [ 722 f"{server_id}: {result}" 723 for (server_id, _), result in zip( 724 self._clients.items(), results, strict=False 725 ) 726 if isinstance(result, Exception) 727 ] 728 729 self._clients.clear() 730 731 if errors and logger.isEnabledFor(logging.WARNING): 732 logger.warning("Shutdown completed with %d error(s)", len(errors)) 733 734 return False 735 736 def get_client(self, server_identifier: str | int) -> AsyncOutlineClient: 737 """Get client by server identifier or index. 738 739 :param server_identifier: Server URL (sanitized) or 0-based index 740 :return: Client instance 741 :raises KeyError: If server not found 742 :raises IndexError: If index out of range 743 """ 744 # Try as index first (fast path for common case) 745 if isinstance(server_identifier, int): 746 if 0 <= server_identifier < len(self._configs): 747 config = self._configs[server_identifier] 748 safe_url = Validators.sanitize_url_for_logging(config.api_url) 749 return self._clients[safe_url] 750 raise IndexError( 751 f"Server index {server_identifier} out of range (0-{len(self._configs) - 1})" 752 ) 753 754 # Try as server ID 755 if server_identifier in self._clients: 756 return self._clients[server_identifier] 757 758 raise KeyError(f"Server not found: {server_identifier}") 759 760 def get_all_clients(self) -> list[AsyncOutlineClient]: 761 """Get all active clients. 762 763 :return: List of client instances 764 """ 765 return list(self._clients.values()) 766 767 async def health_check_all( 768 self, 769 timeout: float | None = None, 770 ) -> dict[str, dict[str, Any]]: 771 """Perform health check on all servers concurrently. 772 773 :param timeout: Timeout for each health check 774 :return: Dictionary mapping server IDs to health check results 775 """ 776 timeout = timeout or self._default_timeout 777 778 tasks = [ 779 self._health_check_single(server_id, client, timeout) 780 for server_id, client in self._clients.items() 781 ] 782 783 # Execute concurrently 784 results_list = await asyncio.gather(*tasks, return_exceptions=True) 785 786 # Build result dictionary 787 results: dict[str, dict[str, Any]] = {} 788 for (server_id, _), result in zip( 789 self._clients.items(), results_list, strict=False 790 ): 791 if isinstance(result, BaseException): 792 results[server_id] = { 793 "healthy": False, 794 "error": str(result), 795 "error_type": type(result).__name__, 796 } 797 else: 798 results[server_id] = result 799 800 return results 801 802 @staticmethod 803 async def _health_check_single( 804 server_id: str, 805 client: AsyncOutlineClient, 806 timeout: float, 807 ) -> dict[str, Any]: 808 """Perform health check on a single server with timeout. 809 810 :param server_id: Server identifier 811 :param client: Client instance 812 :param timeout: Timeout for operation 813 :return: Health check result 814 """ 815 try: 816 result = await asyncio.wait_for( 817 client.health_check(), 818 timeout=timeout, 819 ) 820 result["server_id"] = server_id 821 return result 822 except asyncio.TimeoutError: 823 return { 824 "server_id": server_id, 825 "healthy": False, 826 "error": f"Health check timeout after {timeout}s", 827 "error_type": "TimeoutError", 828 } 829 except Exception as e: 830 return { 831 "server_id": server_id, 832 "healthy": False, 833 "error": str(e), 834 "error_type": type(e).__name__, 835 } 836 837 async def get_healthy_servers( 838 self, 839 timeout: float | None = None, 840 ) -> list[AsyncOutlineClient]: 841 """Get list of healthy servers after health check. 842 843 :param timeout: Timeout for health checks 844 :return: List of healthy clients 845 """ 846 health_results = await self.health_check_all(timeout=timeout) 847 848 healthy_clients: list[AsyncOutlineClient] = [] 849 for server_id, result in health_results.items(): 850 if result.get("healthy", False): 851 try: 852 client = self.get_client(server_id) 853 healthy_clients.append(client) 854 except (KeyError, IndexError): 855 continue 856 857 return healthy_clients 858 859 def get_status_summary(self) -> dict[str, Any]: 860 """Get aggregated status summary for all servers. 861 862 Synchronous operation - no API calls made. 863 864 :return: Status summary dictionary 865 """ 866 return { 867 "total_servers": len(self._configs), 868 "active_servers": self.active_servers, 869 "server_statuses": { 870 server_id: client.get_status() 871 for server_id, client in self._clients.items() 872 }, 873 } 874 875 def __repr__(self) -> str: 876 """String representation. 877 878 :return: String representation 879 """ 880 active = self.active_servers 881 total = self.server_count 882 return f"MultiServerManager(servers={active}/{total} active)"
High-performance manager for multiple Outline servers.
Features:
- Concurrent operations across all servers
- Health checking and automatic failover
- Aggregated metrics and status
- Graceful shutdown with cleanup
- Thread-safe operations
Limits:
- Maximum 50 servers (configurable via _MAX_SERVERS)
- Automatic cleanup with weak references
577 def __init__( 578 self, 579 configs: Sequence[OutlineClientConfig], 580 *, 581 audit_logger: AuditLogger | None = None, 582 metrics: MetricsCollector | None = None, 583 default_timeout: float = _DEFAULT_SERVER_TIMEOUT, 584 ) -> None: 585 """Initialize multiserver manager. 586 587 :param configs: Sequence of server configurations 588 :param audit_logger: Shared audit logger for all servers 589 :param metrics: Shared metrics collector for all servers 590 :param default_timeout: Default timeout for operations (seconds) 591 :raises ConfigurationError: If too many servers or invalid configs 592 """ 593 if len(configs) > _MAX_SERVERS: 594 raise ConfigurationError( 595 f"Too many servers: {len(configs)} (max: {_MAX_SERVERS})" 596 ) 597 598 if not configs: 599 raise ConfigurationError("At least one server configuration required") 600 601 self._configs = list(configs) 602 self._clients: dict[str, AsyncOutlineClient] = {} 603 self._audit_logger = audit_logger 604 self._metrics = metrics 605 self._default_timeout = default_timeout 606 self._lock = asyncio.Lock()
Initialize multiserver manager.
Parameters
- configs: Sequence of server configurations
- audit_logger: Shared audit logger for all servers
- metrics: Shared metrics collector for all servers
- default_timeout: Default timeout for operations (seconds)
Raises
- ConfigurationError: If too many servers or invalid configs
608 @property 609 def server_count(self) -> int: 610 """Get total number of configured servers. 611 612 :return: Number of servers 613 """ 614 return len(self._configs)
Get total number of configured servers.
Returns
Number of servers
616 @property 617 def active_servers(self) -> int: 618 """Get number of active (connected) servers. 619 620 :return: Number of active servers 621 """ 622 return sum(1 for client in self._clients.values() if client.is_connected)
Get number of active (connected) servers.
Returns
Number of active servers
624 def get_server_names(self) -> list[str]: 625 """Get list of sanitized server URLs. 626 627 URLs are sanitized to remove sensitive path information. 628 629 :return: List of safe server identifiers 630 """ 631 return [ 632 Validators.sanitize_url_for_logging(config.api_url) 633 for config in self._configs 634 ]
Get list of sanitized server URLs.
URLs are sanitized to remove sensitive path information.
Returns
List of safe server identifiers
736 def get_client(self, server_identifier: str | int) -> AsyncOutlineClient: 737 """Get client by server identifier or index. 738 739 :param server_identifier: Server URL (sanitized) or 0-based index 740 :return: Client instance 741 :raises KeyError: If server not found 742 :raises IndexError: If index out of range 743 """ 744 # Try as index first (fast path for common case) 745 if isinstance(server_identifier, int): 746 if 0 <= server_identifier < len(self._configs): 747 config = self._configs[server_identifier] 748 safe_url = Validators.sanitize_url_for_logging(config.api_url) 749 return self._clients[safe_url] 750 raise IndexError( 751 f"Server index {server_identifier} out of range (0-{len(self._configs) - 1})" 752 ) 753 754 # Try as server ID 755 if server_identifier in self._clients: 756 return self._clients[server_identifier] 757 758 raise KeyError(f"Server not found: {server_identifier}")
Get client by server identifier or index.
Parameters
- server_identifier: Server URL (sanitized) or 0-based index
Returns
Client instance
Raises
- KeyError: If server not found
- IndexError: If index out of range
760 def get_all_clients(self) -> list[AsyncOutlineClient]: 761 """Get all active clients. 762 763 :return: List of client instances 764 """ 765 return list(self._clients.values())
Get all active clients.
Returns
List of client instances
767 async def health_check_all( 768 self, 769 timeout: float | None = None, 770 ) -> dict[str, dict[str, Any]]: 771 """Perform health check on all servers concurrently. 772 773 :param timeout: Timeout for each health check 774 :return: Dictionary mapping server IDs to health check results 775 """ 776 timeout = timeout or self._default_timeout 777 778 tasks = [ 779 self._health_check_single(server_id, client, timeout) 780 for server_id, client in self._clients.items() 781 ] 782 783 # Execute concurrently 784 results_list = await asyncio.gather(*tasks, return_exceptions=True) 785 786 # Build result dictionary 787 results: dict[str, dict[str, Any]] = {} 788 for (server_id, _), result in zip( 789 self._clients.items(), results_list, strict=False 790 ): 791 if isinstance(result, BaseException): 792 results[server_id] = { 793 "healthy": False, 794 "error": str(result), 795 "error_type": type(result).__name__, 796 } 797 else: 798 results[server_id] = result 799 800 return results
Perform health check on all servers concurrently.
Parameters
- timeout: Timeout for each health check
Returns
Dictionary mapping server IDs to health check results
837 async def get_healthy_servers( 838 self, 839 timeout: float | None = None, 840 ) -> list[AsyncOutlineClient]: 841 """Get list of healthy servers after health check. 842 843 :param timeout: Timeout for health checks 844 :return: List of healthy clients 845 """ 846 health_results = await self.health_check_all(timeout=timeout) 847 848 healthy_clients: list[AsyncOutlineClient] = [] 849 for server_id, result in health_results.items(): 850 if result.get("healthy", False): 851 try: 852 client = self.get_client(server_id) 853 healthy_clients.append(client) 854 except (KeyError, IndexError): 855 continue 856 857 return healthy_clients
Get list of healthy servers after health check.
Parameters
- timeout: Timeout for health checks
Returns
List of healthy clients
859 def get_status_summary(self) -> dict[str, Any]: 860 """Get aggregated status summary for all servers. 861 862 Synchronous operation - no API calls made. 863 864 :return: Status summary dictionary 865 """ 866 return { 867 "total_servers": len(self._configs), 868 "active_servers": self.active_servers, 869 "server_statuses": { 870 server_id: client.get_status() 871 for server_id, client in self._clients.items() 872 }, 873 }
Get aggregated status summary for all servers.
Synchronous operation - no API calls made.
Returns
Status summary dictionary
537class NoOpAuditLogger: 538 """Zero-overhead no-op audit logger. 539 540 Implements AuditLogger protocol but performs no operations. 541 Useful for disabling audit without code changes or performance impact. 542 """ 543 544 __slots__ = () 545 546 async def alog_action( 547 self, 548 action: str, 549 resource: str, 550 *, 551 user: str | None = None, 552 details: dict[str, Any] | None = None, 553 correlation_id: str | None = None, 554 ) -> None: 555 """No-op async log.""" 556 557 def log_action( 558 self, 559 action: str, 560 resource: str, 561 *, 562 user: str | None = None, 563 details: dict[str, Any] | None = None, 564 correlation_id: str | None = None, 565 ) -> None: 566 """No-op sync log.""" 567 568 async def shutdown(self) -> None: 569 """No-op shutdown."""
Zero-overhead no-op audit logger.
Implements AuditLogger protocol but performs no operations. Useful for disabling audit without code changes or performance impact.
546 async def alog_action( 547 self, 548 action: str, 549 resource: str, 550 *, 551 user: str | None = None, 552 details: dict[str, Any] | None = None, 553 correlation_id: str | None = None, 554 ) -> None: 555 """No-op async log."""
No-op async log.
557 def log_action( 558 self, 559 action: str, 560 resource: str, 561 *, 562 user: str | None = None, 563 details: dict[str, Any] | None = None, 564 correlation_id: str | None = None, 565 ) -> None: 566 """No-op sync log."""
No-op sync log.
93class NoOpMetrics: 94 """No-op metrics collector (zero-overhead default). 95 96 Uses __slots__ to minimize memory footprint. 97 """ 98 99 __slots__ = () 100 101 def increment(self, metric: str, *, tags: MetricsTags | None = None) -> None: 102 """No-op increment (zero overhead).""" 103 104 def timing( 105 self, metric: str, value: float, *, tags: MetricsTags | None = None 106 ) -> None: 107 """No-op timing (zero overhead).""" 108 109 def gauge( 110 self, metric: str, value: float, *, tags: MetricsTags | None = None 111 ) -> None: 112 """No-op gauge (zero overhead)."""
No-op metrics collector (zero-overhead default).
Uses __slots__ to minimize memory footprint.
101 def increment(self, metric: str, *, tags: MetricsTags | None = None) -> None: 102 """No-op increment (zero overhead)."""
No-op increment (zero overhead).
74class OutlineClientConfig(BaseSettings): 75 """Main configuration.""" 76 77 model_config = SettingsConfigDict( 78 env_prefix=_ENV_PREFIX, 79 env_file=".env", 80 env_file_encoding="utf-8", 81 case_sensitive=False, 82 extra="forbid", 83 validate_assignment=True, 84 validate_default=True, 85 frozen=False, 86 ) 87 88 # ===== Core Settings (Required) ===== 89 90 api_url: str = Field(..., description="Outline server API URL with secret path") 91 cert_sha256: SecretStr = Field(..., description="SHA-256 certificate fingerprint") 92 93 # ===== Client Settings ===== 94 95 timeout: int = Field( 96 default=10, 97 ge=_MIN_TIMEOUT, 98 le=_MAX_TIMEOUT, 99 description="Request timeout (seconds)", 100 ) 101 retry_attempts: int = Field( 102 default=2, 103 ge=_MIN_RETRY, 104 le=_MAX_RETRY, 105 description="Number of retries", 106 ) 107 max_connections: int = Field( 108 default=10, 109 ge=_MIN_CONNECTIONS, 110 le=_MAX_CONNECTIONS, 111 description="Connection pool size", 112 ) 113 rate_limit: int = Field( 114 default=100, 115 ge=_MIN_RATE_LIMIT, 116 le=_MAX_RATE_LIMIT, 117 description="Max concurrent requests", 118 ) 119 user_agent: str = Field( 120 default=Constants.DEFAULT_USER_AGENT, 121 min_length=1, 122 max_length=256, 123 description="Custom user agent string", 124 ) 125 126 # ===== Optional Features ===== 127 128 enable_circuit_breaker: bool = Field( 129 default=True, 130 description="Enable circuit breaker", 131 ) 132 enable_logging: bool = Field( 133 default=False, 134 description="Enable debug logging", 135 ) 136 json_format: bool = Field( 137 default=False, 138 description="Return raw JSON", 139 ) 140 allow_private_networks: bool = Field( 141 default=True, 142 description="Allow private or local network addresses in api_url", 143 ) 144 resolve_dns_for_ssrf: bool = Field( 145 default=False, 146 description="Resolve DNS for SSRF checks (strict mode)", 147 ) 148 149 # ===== Circuit Breaker Settings ===== 150 151 circuit_failure_threshold: int = Field( 152 default=5, 153 ge=1, 154 le=100, 155 description="Failures before opening", 156 ) 157 circuit_recovery_timeout: float = Field( 158 default=60.0, 159 ge=1.0, 160 le=3600.0, 161 description="Recovery wait time (seconds)", 162 ) 163 circuit_success_threshold: int = Field( 164 default=2, 165 ge=1, 166 le=10, 167 description="Successes needed to close", 168 ) 169 circuit_call_timeout: float = Field( 170 default=10.0, 171 ge=0.1, 172 le=300.0, 173 description="Circuit call timeout (seconds)", 174 ) 175 176 # ===== Validators ===== 177 178 @field_validator("api_url") 179 @classmethod 180 def validate_api_url(cls, v: str) -> str: 181 """Validate and normalize API URL with optimized regex. 182 183 :param v: URL to validate 184 :return: Validated URL 185 :raises ValueError: If URL is invalid 186 """ 187 return Validators.validate_url(v) 188 189 @field_validator("cert_sha256") 190 @classmethod 191 def validate_cert(cls, v: SecretStr) -> SecretStr: 192 """Validate certificate fingerprint with constant-time comparison. 193 194 :param v: Certificate fingerprint 195 :return: Validated fingerprint 196 :raises ValueError: If fingerprint is invalid 197 """ 198 return Validators.validate_cert_fingerprint(v) 199 200 @field_validator("user_agent") 201 @classmethod 202 def validate_user_agent(cls, v: str) -> str: 203 """Validate user agent string with efficient control char check. 204 205 :param v: User agent to validate 206 :return: Validated user agent 207 :raises ValueError: If user agent is invalid 208 """ 209 v = Validators.validate_string_not_empty(v, "User agent") 210 211 # Efficient control character check using generator 212 if any(ord(c) < 32 for c in v): 213 raise ValueError("User agent contains invalid control characters") 214 215 return v 216 217 @model_validator(mode="after") 218 def validate_config(self) -> Self: 219 """Additional validation after model creation with pattern matching. 220 221 :return: Validated configuration instance 222 """ 223 # Security warning for HTTP using pattern matching 224 match (self.api_url, "localhost" in self.api_url): 225 case (url, False) if "http://" in url: 226 _log_if_enabled( 227 logging.WARNING, 228 "Using HTTP for non-localhost connection. " 229 "This is insecure and should only be used for testing.", 230 ) 231 232 # Optional SSRF protection for private networks (no DNS resolution) 233 Validators.validate_url( 234 self.api_url, 235 allow_private_networks=self.allow_private_networks, 236 resolve_dns=False, 237 ) 238 239 # Circuit breaker timeout adjustment with caching 240 if self.enable_circuit_breaker: 241 max_request_time = self._get_max_request_time() 242 243 if self.circuit_call_timeout < max_request_time: 244 _log_if_enabled( 245 logging.WARNING, 246 f"Circuit timeout ({self.circuit_call_timeout}s) is less than " 247 f"max request time ({max_request_time}s). " 248 f"Auto-adjusting to {max_request_time}s.", 249 ) 250 object.__setattr__(self, "circuit_call_timeout", max_request_time) 251 252 return self 253 254 def _get_max_request_time(self) -> float: 255 """Calculate worst-case request time with instance caching. 256 257 :return: Maximum request time in seconds 258 """ 259 if not hasattr(self, "_cached_max_request_time"): 260 self._cached_max_request_time = ( 261 self.timeout * (self.retry_attempts + 1) + _SAFETY_MARGIN 262 ) 263 return self._cached_max_request_time 264 265 # ===== Custom __setattr__ for SecretStr Protection ===== 266 267 def __setattr__(self, name: str, value: object) -> None: 268 """Prevent accidental string assignment to SecretStr fields. 269 270 :param name: Attribute name 271 :param value: Attribute value 272 :raises TypeError: If trying to assign str to SecretStr field 273 """ 274 # Fast path: skip check for non-cert fields 275 if name != "cert_sha256": 276 super().__setattr__(name, value) 277 return 278 279 if isinstance(value, str): 280 raise TypeError( 281 "cert_sha256 must be SecretStr, not str. Use: SecretStr('your_cert')" 282 ) 283 284 super().__setattr__(name, value) 285 286 # ===== Helper Methods ===== 287 288 @cached_property 289 def get_sanitized_config(self) -> ConfigDict: 290 """Get configuration with sensitive data masked (cached). 291 292 Safe for logging, debugging, and display. 293 294 Performance: ~20x speedup with caching for repeated calls 295 Memory: Single cached result per instance 296 297 :return: Sanitized configuration dictionary 298 """ 299 return { 300 "api_url": Validators.sanitize_url_for_logging(self.api_url), 301 "cert_sha256": "***MASKED***", 302 "timeout": self.timeout, 303 "retry_attempts": self.retry_attempts, 304 "max_connections": self.max_connections, 305 "rate_limit": self.rate_limit, 306 "user_agent": self.user_agent, 307 "enable_circuit_breaker": self.enable_circuit_breaker, 308 "enable_logging": self.enable_logging, 309 "json_format": self.json_format, 310 "allow_private_networks": self.allow_private_networks, 311 "circuit_failure_threshold": self.circuit_failure_threshold, 312 "circuit_recovery_timeout": self.circuit_recovery_timeout, 313 "circuit_success_threshold": self.circuit_success_threshold, 314 "circuit_call_timeout": self.circuit_call_timeout, 315 } 316 317 def model_copy_immutable(self, **overrides: ConfigValue) -> OutlineClientConfig: 318 """Create immutable copy with overrides (optimized validation). 319 320 :param overrides: Configuration parameters to override 321 :return: Deep copy of configuration with applied updates 322 :raises ValueError: If invalid override keys provided 323 324 Example: 325 >>> new_config = config.model_copy_immutable(timeout=20) 326 """ 327 # Optimized: Use frozenset intersection for O(1) validation 328 valid_keys = frozenset(ConfigOverrides.__annotations__.keys()) 329 provided_keys = frozenset(overrides.keys()) 330 invalid = provided_keys - valid_keys 331 332 if invalid: 333 raise ValueError( 334 f"Invalid configuration keys: {', '.join(sorted(invalid))}. " 335 f"Valid keys: {', '.join(sorted(valid_keys))}" 336 ) 337 338 # Pydantic's model_copy is already optimized 339 return cast( # type: ignore[redundant-cast, unused-ignore] 340 OutlineClientConfig, self.model_copy(deep=True, update=overrides) 341 ) 342 343 @property 344 def circuit_config(self) -> CircuitConfig | None: 345 """Get circuit breaker configuration if enabled. 346 347 Returns None if circuit breaker is disabled, otherwise CircuitConfig instance. 348 Cached as property for performance. 349 350 :return: Circuit config or None if disabled 351 """ 352 if not self.enable_circuit_breaker: 353 return None 354 355 return CircuitConfig( 356 failure_threshold=self.circuit_failure_threshold, 357 recovery_timeout=self.circuit_recovery_timeout, 358 success_threshold=self.circuit_success_threshold, 359 call_timeout=self.circuit_call_timeout, 360 ) 361 362 # ===== Factory Methods ===== 363 364 @classmethod 365 def from_env( 366 cls, 367 env_file: str | Path | None = None, 368 **overrides: ConfigValue, 369 ) -> OutlineClientConfig: 370 """Load configuration from environment with overrides. 371 372 :param env_file: Path to .env file 373 :param overrides: Configuration parameters to override 374 :return: Configuration instance 375 :raises ConfigurationError: If environment configuration is invalid 376 377 Example: 378 >>> config = OutlineClientConfig.from_env( 379 ... env_file=".env.prod", 380 ... timeout=20, 381 ... enable_logging=True 382 ... ) 383 """ 384 # Fast path: validate overrides early 385 valid_keys = frozenset(ConfigOverrides.__annotations__.keys()) 386 filtered_overrides = cast( 387 ConfigOverrides, 388 {k: v for k, v in overrides.items() if k in valid_keys}, 389 ) 390 391 if not env_file: 392 return cls( # type: ignore[call-arg, unused-ignore] 393 **filtered_overrides 394 ) 395 396 match env_file: 397 case str(): 398 env_path = Path(env_file) 399 case Path(): 400 env_path = env_file 401 case _: 402 raise TypeError( 403 f"env_file must be str or Path, got {type(env_file).__name__}" 404 ) 405 406 if not env_path.exists(): 407 raise ConfigurationError( 408 f"Environment file not found: {env_path}", 409 field="env_file", 410 ) 411 412 return cls( # type: ignore[call-arg, unused-ignore] 413 _env_file=str(env_path), 414 **filtered_overrides, 415 ) 416 417 @classmethod 418 def create_minimal( 419 cls, 420 api_url: str, 421 cert_sha256: str | SecretStr, 422 **overrides: ConfigValue, 423 ) -> OutlineClientConfig: 424 """Create minimal configuration (optimized validation). 425 426 :param api_url: API URL 427 :param cert_sha256: Certificate fingerprint 428 :param overrides: Optional configuration parameters 429 :return: Configuration instance 430 :raises TypeError: If cert_sha256 is not str or SecretStr 431 432 Example: 433 >>> config = OutlineClientConfig.create_minimal( 434 ... api_url="https://server.com/path", 435 ... cert_sha256="a" * 64, 436 ... timeout=20 437 ... ) 438 """ 439 match cert_sha256: 440 case str(): 441 cert = SecretStr(cert_sha256) 442 case SecretStr(): 443 cert = cert_sha256 444 case _: 445 raise TypeError( 446 f"cert_sha256 must be str or SecretStr, " 447 f"got {type(cert_sha256).__name__}" 448 ) 449 450 valid_keys = frozenset(ConfigOverrides.__annotations__.keys()) 451 filtered_overrides = cast( 452 ConfigOverrides, 453 {k: v for k, v in overrides.items() if k in valid_keys}, 454 ) 455 456 return cls( 457 api_url=api_url, 458 cert_sha256=cert, 459 **filtered_overrides, 460 )
Main configuration.
178 @field_validator("api_url") 179 @classmethod 180 def validate_api_url(cls, v: str) -> str: 181 """Validate and normalize API URL with optimized regex. 182 183 :param v: URL to validate 184 :return: Validated URL 185 :raises ValueError: If URL is invalid 186 """ 187 return Validators.validate_url(v)
Validate and normalize API URL with optimized regex.
Parameters
- v: URL to validate
Returns
Validated URL
Raises
- ValueError: If URL is invalid
189 @field_validator("cert_sha256") 190 @classmethod 191 def validate_cert(cls, v: SecretStr) -> SecretStr: 192 """Validate certificate fingerprint with constant-time comparison. 193 194 :param v: Certificate fingerprint 195 :return: Validated fingerprint 196 :raises ValueError: If fingerprint is invalid 197 """ 198 return Validators.validate_cert_fingerprint(v)
Validate certificate fingerprint with constant-time comparison.
Parameters
- v: Certificate fingerprint
Returns
Validated fingerprint
Raises
- ValueError: If fingerprint is invalid
200 @field_validator("user_agent") 201 @classmethod 202 def validate_user_agent(cls, v: str) -> str: 203 """Validate user agent string with efficient control char check. 204 205 :param v: User agent to validate 206 :return: Validated user agent 207 :raises ValueError: If user agent is invalid 208 """ 209 v = Validators.validate_string_not_empty(v, "User agent") 210 211 # Efficient control character check using generator 212 if any(ord(c) < 32 for c in v): 213 raise ValueError("User agent contains invalid control characters") 214 215 return v
Validate user agent string with efficient control char check.
Parameters
- v: User agent to validate
Returns
Validated user agent
Raises
- ValueError: If user agent is invalid
217 @model_validator(mode="after") 218 def validate_config(self) -> Self: 219 """Additional validation after model creation with pattern matching. 220 221 :return: Validated configuration instance 222 """ 223 # Security warning for HTTP using pattern matching 224 match (self.api_url, "localhost" in self.api_url): 225 case (url, False) if "http://" in url: 226 _log_if_enabled( 227 logging.WARNING, 228 "Using HTTP for non-localhost connection. " 229 "This is insecure and should only be used for testing.", 230 ) 231 232 # Optional SSRF protection for private networks (no DNS resolution) 233 Validators.validate_url( 234 self.api_url, 235 allow_private_networks=self.allow_private_networks, 236 resolve_dns=False, 237 ) 238 239 # Circuit breaker timeout adjustment with caching 240 if self.enable_circuit_breaker: 241 max_request_time = self._get_max_request_time() 242 243 if self.circuit_call_timeout < max_request_time: 244 _log_if_enabled( 245 logging.WARNING, 246 f"Circuit timeout ({self.circuit_call_timeout}s) is less than " 247 f"max request time ({max_request_time}s). " 248 f"Auto-adjusting to {max_request_time}s.", 249 ) 250 object.__setattr__(self, "circuit_call_timeout", max_request_time) 251 252 return self
Additional validation after model creation with pattern matching.
Returns
Validated configuration instance
288 @cached_property 289 def get_sanitized_config(self) -> ConfigDict: 290 """Get configuration with sensitive data masked (cached). 291 292 Safe for logging, debugging, and display. 293 294 Performance: ~20x speedup with caching for repeated calls 295 Memory: Single cached result per instance 296 297 :return: Sanitized configuration dictionary 298 """ 299 return { 300 "api_url": Validators.sanitize_url_for_logging(self.api_url), 301 "cert_sha256": "***MASKED***", 302 "timeout": self.timeout, 303 "retry_attempts": self.retry_attempts, 304 "max_connections": self.max_connections, 305 "rate_limit": self.rate_limit, 306 "user_agent": self.user_agent, 307 "enable_circuit_breaker": self.enable_circuit_breaker, 308 "enable_logging": self.enable_logging, 309 "json_format": self.json_format, 310 "allow_private_networks": self.allow_private_networks, 311 "circuit_failure_threshold": self.circuit_failure_threshold, 312 "circuit_recovery_timeout": self.circuit_recovery_timeout, 313 "circuit_success_threshold": self.circuit_success_threshold, 314 "circuit_call_timeout": self.circuit_call_timeout, 315 }
Get configuration with sensitive data masked (cached).
Safe for logging, debugging, and display.
Performance: ~20x speedup with caching for repeated calls Memory: Single cached result per instance
Returns
Sanitized configuration dictionary
317 def model_copy_immutable(self, **overrides: ConfigValue) -> OutlineClientConfig: 318 """Create immutable copy with overrides (optimized validation). 319 320 :param overrides: Configuration parameters to override 321 :return: Deep copy of configuration with applied updates 322 :raises ValueError: If invalid override keys provided 323 324 Example: 325 >>> new_config = config.model_copy_immutable(timeout=20) 326 """ 327 # Optimized: Use frozenset intersection for O(1) validation 328 valid_keys = frozenset(ConfigOverrides.__annotations__.keys()) 329 provided_keys = frozenset(overrides.keys()) 330 invalid = provided_keys - valid_keys 331 332 if invalid: 333 raise ValueError( 334 f"Invalid configuration keys: {', '.join(sorted(invalid))}. " 335 f"Valid keys: {', '.join(sorted(valid_keys))}" 336 ) 337 338 # Pydantic's model_copy is already optimized 339 return cast( # type: ignore[redundant-cast, unused-ignore] 340 OutlineClientConfig, self.model_copy(deep=True, update=overrides) 341 )
Create immutable copy with overrides (optimized validation).
Parameters
- overrides: Configuration parameters to override
Returns
Deep copy of configuration with applied updates
Raises
- ValueError: If invalid override keys provided
Example:
>>> new_config = config.model_copy_immutable(timeout=20)
343 @property 344 def circuit_config(self) -> CircuitConfig | None: 345 """Get circuit breaker configuration if enabled. 346 347 Returns None if circuit breaker is disabled, otherwise CircuitConfig instance. 348 Cached as property for performance. 349 350 :return: Circuit config or None if disabled 351 """ 352 if not self.enable_circuit_breaker: 353 return None 354 355 return CircuitConfig( 356 failure_threshold=self.circuit_failure_threshold, 357 recovery_timeout=self.circuit_recovery_timeout, 358 success_threshold=self.circuit_success_threshold, 359 call_timeout=self.circuit_call_timeout, 360 )
Get circuit breaker configuration if enabled.
Returns None if circuit breaker is disabled, otherwise CircuitConfig instance. Cached as property for performance.
Returns
Circuit config or None if disabled
364 @classmethod 365 def from_env( 366 cls, 367 env_file: str | Path | None = None, 368 **overrides: ConfigValue, 369 ) -> OutlineClientConfig: 370 """Load configuration from environment with overrides. 371 372 :param env_file: Path to .env file 373 :param overrides: Configuration parameters to override 374 :return: Configuration instance 375 :raises ConfigurationError: If environment configuration is invalid 376 377 Example: 378 >>> config = OutlineClientConfig.from_env( 379 ... env_file=".env.prod", 380 ... timeout=20, 381 ... enable_logging=True 382 ... ) 383 """ 384 # Fast path: validate overrides early 385 valid_keys = frozenset(ConfigOverrides.__annotations__.keys()) 386 filtered_overrides = cast( 387 ConfigOverrides, 388 {k: v for k, v in overrides.items() if k in valid_keys}, 389 ) 390 391 if not env_file: 392 return cls( # type: ignore[call-arg, unused-ignore] 393 **filtered_overrides 394 ) 395 396 match env_file: 397 case str(): 398 env_path = Path(env_file) 399 case Path(): 400 env_path = env_file 401 case _: 402 raise TypeError( 403 f"env_file must be str or Path, got {type(env_file).__name__}" 404 ) 405 406 if not env_path.exists(): 407 raise ConfigurationError( 408 f"Environment file not found: {env_path}", 409 field="env_file", 410 ) 411 412 return cls( # type: ignore[call-arg, unused-ignore] 413 _env_file=str(env_path), 414 **filtered_overrides, 415 )
Load configuration from environment with overrides.
Parameters
- env_file: Path to .env file
- overrides: Configuration parameters to override
Returns
Configuration instance
Raises
- ConfigurationError: If environment configuration is invalid
Example:
>>> config = OutlineClientConfig.from_env( ... env_file=".env.prod", ... timeout=20, ... enable_logging=True ... )
417 @classmethod 418 def create_minimal( 419 cls, 420 api_url: str, 421 cert_sha256: str | SecretStr, 422 **overrides: ConfigValue, 423 ) -> OutlineClientConfig: 424 """Create minimal configuration (optimized validation). 425 426 :param api_url: API URL 427 :param cert_sha256: Certificate fingerprint 428 :param overrides: Optional configuration parameters 429 :return: Configuration instance 430 :raises TypeError: If cert_sha256 is not str or SecretStr 431 432 Example: 433 >>> config = OutlineClientConfig.create_minimal( 434 ... api_url="https://server.com/path", 435 ... cert_sha256="a" * 64, 436 ... timeout=20 437 ... ) 438 """ 439 match cert_sha256: 440 case str(): 441 cert = SecretStr(cert_sha256) 442 case SecretStr(): 443 cert = cert_sha256 444 case _: 445 raise TypeError( 446 f"cert_sha256 must be str or SecretStr, " 447 f"got {type(cert_sha256).__name__}" 448 ) 449 450 valid_keys = frozenset(ConfigOverrides.__annotations__.keys()) 451 filtered_overrides = cast( 452 ConfigOverrides, 453 {k: v for k, v in overrides.items() if k in valid_keys}, 454 ) 455 456 return cls( 457 api_url=api_url, 458 cert_sha256=cert, 459 **filtered_overrides, 460 )
Create minimal configuration (optimized validation).
Parameters
- api_url: API URL
- cert_sha256: Certificate fingerprint
- overrides: Optional configuration parameters
Returns
Configuration instance
Raises
- TypeError: If cert_sha256 is not str or SecretStr
Example:
>>> config = OutlineClientConfig.create_minimal( ... api_url="https://server.com/path", ... cert_sha256="a" * 64, ... timeout=20 ... )
407class OutlineConnectionError(OutlineError): 408 """Network connection failure. 409 410 Attributes: 411 host: Host that failed 412 port: Port that failed 413 414 Example: 415 >>> error = OutlineConnectionError( 416 ... "Connection refused", host="server.com", port=443 417 ... ) 418 >>> error.is_retryable # True 419 """ 420 421 __slots__ = ("host", "port") 422 423 _is_retryable: ClassVar[bool] = True 424 _default_retry_delay: ClassVar[float] = 2.0 425 426 def __init__( 427 self, 428 message: str, 429 *, 430 host: str | None = None, 431 port: int | None = None, 432 ) -> None: 433 """Initialize connection error. 434 435 Args: 436 message: Error message 437 host: Host that failed 438 port: Port that failed 439 """ 440 safe_details: dict[str, Any] | None = None 441 if host or port is not None: 442 safe_details = {} 443 if host: 444 safe_details["host"] = host 445 if port is not None: 446 safe_details["port"] = port 447 448 super().__init__(message, safe_details=safe_details) 449 450 self.host = host 451 self.port = port
Network connection failure.
Attributes:
- host: Host that failed
- port: Port that failed
Example:
>>> error = OutlineConnectionError( ... "Connection refused", host="server.com", port=443 ... ) >>> error.is_retryable # True
426 def __init__( 427 self, 428 message: str, 429 *, 430 host: str | None = None, 431 port: int | None = None, 432 ) -> None: 433 """Initialize connection error. 434 435 Args: 436 message: Error message 437 host: Host that failed 438 port: Port that failed 439 """ 440 safe_details: dict[str, Any] | None = None 441 if host or port is not None: 442 safe_details = {} 443 if host: 444 safe_details["host"] = host 445 if port is not None: 446 safe_details["port"] = port 447 448 super().__init__(message, safe_details=safe_details) 449 450 self.host = host 451 self.port = port
Initialize connection error.
Arguments:
- message: Error message
- host: Host that failed
- port: Port that failed
42class OutlineError(Exception): 43 """Base exception for all PyOutlineAPI errors. 44 45 Provides rich error context, retry guidance, and safe serialization 46 with automatic credential sanitization. 47 48 Attributes: 49 is_retryable: Whether this error type should be retried 50 default_retry_delay: Suggested delay before retry in seconds 51 52 Example: 53 >>> try: 54 ... raise OutlineError("Connection failed", details={"host": "server"}) 55 ... except OutlineError as e: 56 ... print(e.safe_details) # {'host': 'server'} 57 """ 58 59 __slots__ = ("_cached_str", "_details", "_message", "_safe_details") 60 61 _is_retryable: ClassVar[bool] = False 62 _default_retry_delay: ClassVar[float] = 1.0 63 64 def __init__( 65 self, 66 message: object, 67 *, 68 details: dict[str, Any] | None = None, 69 safe_details: dict[str, Any] | None = None, 70 ) -> None: 71 """Initialize exception with automatic credential sanitization. 72 73 Args: 74 message: Error message (automatically sanitized) 75 details: Internal details (may contain sensitive data) 76 safe_details: Safe details for logging/display 77 78 Raises: 79 ValueError: If message exceeds maximum length after sanitization 80 """ 81 # Validate and sanitize message 82 if not isinstance(message, str): 83 message = str(message) 84 85 # Sanitize credentials from message 86 sanitized_message = CredentialSanitizer.sanitize(message) 87 88 # Truncate if too long 89 if len(sanitized_message) > _MAX_MESSAGE_LENGTH: 90 sanitized_message = sanitized_message[:_MAX_MESSAGE_LENGTH] + "..." 91 92 self._message = sanitized_message 93 super().__init__(sanitized_message) 94 95 self._details: dict[str, Any] | MappingProxyType[str, Any] = ( 96 dict(details) if details else _EMPTY_DICT 97 ) 98 self._safe_details: dict[str, Any] | MappingProxyType[str, Any] = ( 99 dict(safe_details) if safe_details else _EMPTY_DICT 100 ) 101 102 self._cached_str: str | None = None 103 104 @property 105 def details(self) -> dict[str, Any]: 106 """Get internal error details (may contain sensitive data). 107 108 Warning: 109 Use with caution - may contain credentials or sensitive information. 110 For logging, use ``safe_details`` instead. 111 112 Returns: 113 Copy of internal details dictionary 114 """ 115 if self._details is _EMPTY_DICT: 116 return {} 117 return self._details.copy() 118 119 @property 120 def safe_details(self) -> dict[str, Any]: 121 """Get sanitized error details safe for logging. 122 123 Returns: 124 Copy of safe details dictionary 125 """ 126 if self._safe_details is _EMPTY_DICT: 127 return {} 128 return self._safe_details.copy() 129 130 def _format_details(self) -> str: 131 """Format safe details for string representation. 132 133 :return: Formatted details string 134 """ 135 if not self._safe_details: 136 return "" 137 138 parts = [f"{k}={v}" for k, v in self._safe_details.items()] 139 return f" ({', '.join(parts)})" 140 141 def __str__(self) -> str: 142 """Safe string representation using safe_details. 143 144 Cached for performance on repeated access. 145 146 :return: String representation 147 """ 148 if self._cached_str is None: 149 self._cached_str = f"{self._message}{self._format_details()}" 150 return self._cached_str 151 152 def __repr__(self) -> str: 153 """Safe repr without sensitive data. 154 155 :return: String representation 156 """ 157 class_name = self.__class__.__name__ 158 return f"{class_name}({self._message!r})" 159 160 @property 161 def is_retryable(self) -> bool: 162 """Return whether this error type should be retried.""" 163 return self._is_retryable 164 165 @property 166 def default_retry_delay(self) -> float: 167 """Return suggested delay before retry in seconds.""" 168 return self._default_retry_delay
Base exception for all PyOutlineAPI errors.
Provides rich error context, retry guidance, and safe serialization with automatic credential sanitization.
Attributes:
- is_retryable: Whether this error type should be retried
- default_retry_delay: Suggested delay before retry in seconds
Example:
>>> try: ... raise OutlineError("Connection failed", details={"host": "server"}) ... except OutlineError as e: ... print(e.safe_details) # {'host': 'server'}
64 def __init__( 65 self, 66 message: object, 67 *, 68 details: dict[str, Any] | None = None, 69 safe_details: dict[str, Any] | None = None, 70 ) -> None: 71 """Initialize exception with automatic credential sanitization. 72 73 Args: 74 message: Error message (automatically sanitized) 75 details: Internal details (may contain sensitive data) 76 safe_details: Safe details for logging/display 77 78 Raises: 79 ValueError: If message exceeds maximum length after sanitization 80 """ 81 # Validate and sanitize message 82 if not isinstance(message, str): 83 message = str(message) 84 85 # Sanitize credentials from message 86 sanitized_message = CredentialSanitizer.sanitize(message) 87 88 # Truncate if too long 89 if len(sanitized_message) > _MAX_MESSAGE_LENGTH: 90 sanitized_message = sanitized_message[:_MAX_MESSAGE_LENGTH] + "..." 91 92 self._message = sanitized_message 93 super().__init__(sanitized_message) 94 95 self._details: dict[str, Any] | MappingProxyType[str, Any] = ( 96 dict(details) if details else _EMPTY_DICT 97 ) 98 self._safe_details: dict[str, Any] | MappingProxyType[str, Any] = ( 99 dict(safe_details) if safe_details else _EMPTY_DICT 100 ) 101 102 self._cached_str: str | None = None
Initialize exception with automatic credential sanitization.
Arguments:
- message: Error message (automatically sanitized)
- details: Internal details (may contain sensitive data)
- safe_details: Safe details for logging/display
Raises:
- ValueError: If message exceeds maximum length after sanitization
104 @property 105 def details(self) -> dict[str, Any]: 106 """Get internal error details (may contain sensitive data). 107 108 Warning: 109 Use with caution - may contain credentials or sensitive information. 110 For logging, use ``safe_details`` instead. 111 112 Returns: 113 Copy of internal details dictionary 114 """ 115 if self._details is _EMPTY_DICT: 116 return {} 117 return self._details.copy()
Get internal error details (may contain sensitive data).
Warning:
Use with caution - may contain credentials or sensitive information. For logging, use
safe_detailsinstead.
Returns:
Copy of internal details dictionary
119 @property 120 def safe_details(self) -> dict[str, Any]: 121 """Get sanitized error details safe for logging. 122 123 Returns: 124 Copy of safe details dictionary 125 """ 126 if self._safe_details is _EMPTY_DICT: 127 return {} 128 return self._safe_details.copy()
Get sanitized error details safe for logging.
Returns:
Copy of safe details dictionary
454class OutlineTimeoutError(OutlineError): 455 """Operation timeout. 456 457 Attributes: 458 timeout: Timeout value in seconds 459 operation: Operation that timed out 460 461 Example: 462 >>> error = OutlineTimeoutError( 463 ... "Request timeout", timeout=30.0, operation="get_server_info" 464 ... ) 465 >>> error.is_retryable # True 466 """ 467 468 __slots__ = ("operation", "timeout") 469 470 _is_retryable: ClassVar[bool] = True 471 _default_retry_delay: ClassVar[float] = 2.0 472 473 def __init__( 474 self, 475 message: str, 476 *, 477 timeout: float | None = None, 478 operation: str | None = None, 479 ) -> None: 480 """Initialize timeout error. 481 482 Args: 483 message: Error message 484 timeout: Timeout value in seconds 485 operation: Operation that timed out 486 """ 487 safe_details: dict[str, Any] | None = None 488 if timeout is not None or operation: 489 safe_details = {} 490 if timeout is not None: 491 safe_details["timeout"] = round(timeout, 2) 492 if operation: 493 safe_details["operation"] = operation 494 495 super().__init__(message, safe_details=safe_details) 496 497 self.timeout = timeout 498 self.operation = operation
Operation timeout.
Attributes:
- timeout: Timeout value in seconds
- operation: Operation that timed out
Example:
>>> error = OutlineTimeoutError( ... "Request timeout", timeout=30.0, operation="get_server_info" ... ) >>> error.is_retryable # True
473 def __init__( 474 self, 475 message: str, 476 *, 477 timeout: float | None = None, 478 operation: str | None = None, 479 ) -> None: 480 """Initialize timeout error. 481 482 Args: 483 message: Error message 484 timeout: Timeout value in seconds 485 operation: Operation that timed out 486 """ 487 safe_details: dict[str, Any] | None = None 488 if timeout is not None or operation: 489 safe_details = {} 490 if timeout is not None: 491 safe_details["timeout"] = round(timeout, 2) 492 if operation: 493 safe_details["operation"] = operation 494 495 super().__init__(message, safe_details=safe_details) 496 497 self.timeout = timeout 498 self.operation = operation
Initialize timeout error.
Arguments:
- message: Error message
- timeout: Timeout value in seconds
- operation: Operation that timed out
423class PeakDeviceCount(BaseValidatedModel): 424 """Peak device count with timestamp. 425 426 SCHEMA: Based on experimental metrics connection peakDeviceCount object 427 """ 428 429 data: int 430 timestamp: TimestampSec
Peak device count with timestamp.
SCHEMA: Based on experimental metrics connection peakDeviceCount object
522class PortRequest(BaseValidatedModel): 523 """Request model for setting default port. 524 525 SCHEMA: Based on PUT /server/port-for-new-access-keys request body 526 """ 527 528 port: Port
Request model for setting default port.
SCHEMA: Based on PUT /server/port-for-new-access-keys request body
484class ProductionConfig(OutlineClientConfig): 485 """Production configuration with strict security. 486 487 Enforces HTTPS and enables all safety features: 488 - Circuit breaker enabled 489 - Logging disabled (performance) 490 - HTTPS enforcement 491 - Strict validation 492 """ 493 494 model_config = SettingsConfigDict( 495 env_prefix=_PROD_ENV_PREFIX, 496 env_file=".env.prod", 497 case_sensitive=False, 498 extra="forbid", 499 ) 500 501 enable_circuit_breaker: bool = True 502 enable_logging: bool = False 503 allow_private_networks: bool = False 504 resolve_dns_for_ssrf: bool = True 505 506 @model_validator(mode="after") 507 def enforce_security(self) -> Self: 508 """Enforce production security with optimized checks. 509 510 :return: Validated configuration 511 :raises ConfigurationError: If HTTP is used in production 512 """ 513 match self.api_url: 514 case url if "http://" in url: 515 raise ConfigurationError( 516 "Production environment must use HTTPS", 517 field="api_url", 518 security_issue=True, 519 ) 520 521 if not self.enable_circuit_breaker: 522 _log_if_enabled( 523 logging.WARNING, 524 "Circuit breaker disabled in production. Not recommended.", 525 ) 526 527 return self
Production configuration with strict security.
Enforces HTTPS and enables all safety features:
- Circuit breaker enabled
- Logging disabled (performance)
- HTTPS enforcement
- Strict validation
506 @model_validator(mode="after") 507 def enforce_security(self) -> Self: 508 """Enforce production security with optimized checks. 509 510 :return: Validated configuration 511 :raises ConfigurationError: If HTTP is used in production 512 """ 513 match self.api_url: 514 case url if "http://" in url: 515 raise ConfigurationError( 516 "Production environment must use HTTPS", 517 field="api_url", 518 security_issue=True, 519 ) 520 521 if not self.enable_circuit_breaker: 522 _log_if_enabled( 523 logging.WARNING, 524 "Circuit breaker disabled in production. Not recommended.", 525 ) 526 527 return self
Enforce production security with optimized checks.
Returns
Validated configuration
Raises
- ConfigurationError: If HTTP is used in production
38class ResponseParser: 39 """High-performance utility class for parsing and validating API responses.""" 40 41 __slots__ = () # Stateless class - zero memory overhead 42 43 @staticmethod 44 @overload 45 def parse( 46 data: dict[str, JsonValue], 47 model: type[T], 48 *, 49 as_json: Literal[True] = True, 50 ) -> JsonDict: ... # pragma: no cover 51 52 @staticmethod 53 @overload 54 def parse( 55 data: dict[str, JsonValue], 56 model: type[T], 57 *, 58 as_json: Literal[False] = False, 59 ) -> T: ... # pragma: no cover 60 61 @staticmethod 62 @overload 63 def parse( 64 data: dict[str, JsonValue], 65 model: type[T], 66 *, 67 as_json: bool, 68 ) -> T | JsonDict: ... # pragma: no cover 69 70 @staticmethod 71 def parse( 72 data: dict[str, JsonValue], 73 model: type[T], 74 *, 75 as_json: bool = False, 76 ) -> T | JsonDict: 77 """Parse and validate response data with comprehensive error handling. 78 79 Type-safe overloads ensure correct return type based on as_json parameter. 80 81 :param data: Raw response data from API 82 :param model: Pydantic model class for validation 83 :param as_json: Return raw JSON dict instead of model instance 84 :return: Validated model instance or JSON dict 85 :raises ValidationError: If validation fails with detailed error info 86 87 Example: 88 >>> data = {"name": "test", "id": 123} 89 >>> # Type-safe: returns MyModel instance 90 >>> result = ResponseParser.parse(data, MyModel, as_json=False) 91 >>> # Type-safe: returns dict 92 >>> json_result = ResponseParser.parse(data, MyModel, as_json=True) 93 """ 94 if not isinstance(data, dict): 95 raise OutlineValidationError( 96 f"Expected dict, got {type(data).__name__}", 97 model=model.__name__, 98 ) 99 100 if not data and logger.isEnabledFor(Constants.LOG_LEVEL_DEBUG): 101 logger.debug("Parsing empty dict for model %s", model.__name__) 102 103 try: 104 validated = model.model_validate(data) 105 106 if as_json: 107 return cast( # type: ignore[redundant-cast, unused-ignore] 108 JsonDict, validated.model_dump(by_alias=True) 109 ) 110 return cast(T, validated) # type: ignore[redundant-cast, unused-ignore] 111 112 except ValidationError as e: 113 errors = e.errors() 114 115 if not errors: 116 raise OutlineValidationError( 117 "Validation failed with no error details", 118 model=model.__name__, 119 ) from e 120 121 first_error = errors[0] 122 field = ".".join(str(loc) for loc in first_error.get("loc", ())) 123 message = first_error.get("msg", "Validation failed") 124 125 error_count = len(errors) 126 if error_count > 1: 127 if logger.isEnabledFor(Constants.LOG_LEVEL_WARNING): 128 logger.warning( 129 "Multiple validation errors for %s: %d error(s)", 130 model.__name__, 131 error_count, 132 ) 133 134 if logger.isEnabledFor(Constants.LOG_LEVEL_DEBUG): 135 logger.debug("Validation error details:") 136 logged_count = min(error_count, _MAX_LOGGED_ERRORS) 137 138 for i, error in enumerate(errors[:logged_count], 1): 139 error_field = ".".join(str(loc) for loc in error.get("loc", ())) 140 error_msg = error.get("msg", "Unknown error") 141 logger.debug(" %d. %s: %s", i, error_field, error_msg) 142 143 if error_count > _MAX_LOGGED_ERRORS: 144 remaining = error_count - _MAX_LOGGED_ERRORS 145 logger.debug(" ... and %d more error(s)", remaining) 146 147 raise OutlineValidationError( 148 message, 149 field=field, 150 model=model.__name__, 151 ) from e 152 153 except Exception as e: 154 # Catch any other unexpected errors during validation 155 if logger.isEnabledFor(Constants.LOG_LEVEL_ERROR): 156 logger.error( 157 "Unexpected error during validation: %s", 158 e, 159 exc_info=True, 160 ) 161 raise OutlineValidationError( 162 f"Unexpected error during validation: {e}", 163 model=model.__name__, 164 ) from e 165 166 @staticmethod 167 def parse_simple(data: Mapping[str, JsonValue] | object) -> bool: 168 """Parse simple success/error responses efficiently. 169 170 Handles various response formats with minimal overhead: 171 - {"success": true/false} 172 - {"error": "..."} → False 173 - {"message": "..."} → False 174 - Empty dict → True (assumed success) 175 176 :param data: Response data 177 :return: True if successful, False otherwise 178 179 Example: 180 >>> ResponseParser.parse_simple({"success": True}) 181 True 182 >>> ResponseParser.parse_simple({"error": "Something failed"}) 183 False 184 >>> ResponseParser.parse_simple({}) 185 True 186 """ 187 if not isinstance(data, dict): 188 if logger.isEnabledFor(Constants.LOG_LEVEL_WARNING): 189 logger.warning( 190 "Expected dict in parse_simple, got %s", 191 type(data).__name__, 192 ) 193 return False 194 195 if "success" in data: 196 success = data["success"] 197 if not isinstance(success, bool): 198 if logger.isEnabledFor(Constants.LOG_LEVEL_WARNING): 199 logger.warning( 200 "success field is not bool: %s, coercing to bool", 201 type(success).__name__, 202 ) 203 return bool(success) 204 return success 205 206 return "error" not in data and "message" not in data 207 208 @staticmethod 209 def validate_response_structure( 210 data: Mapping[str, JsonValue] | object, 211 required_fields: Sequence[str] | None = None, 212 ) -> bool: 213 """Validate response structure without full parsing. 214 215 Lightweight validation before expensive Pydantic validation. 216 Useful for early rejection of malformed responses. 217 218 :param data: Response data to validate 219 :param required_fields: Sequence of required field names 220 :return: True if structure is valid 221 222 Example: 223 >>> data = {"id": 1, "name": "test"} 224 >>> ResponseParser.validate_response_structure(data, ["id", "name"]) 225 True 226 >>> ResponseParser.validate_response_structure(data, ["id", "missing"]) 227 False 228 """ 229 if not isinstance(data, dict): 230 return False 231 232 if not data and not required_fields: 233 return True 234 235 if not required_fields: 236 return True 237 238 return all(field in data for field in required_fields) 239 240 @staticmethod 241 def extract_error_message(data: Mapping[str, JsonValue] | object) -> str | None: 242 """Extract error message from response data efficiently. 243 244 Checks common error field names in order of preference. 245 Uses pre-computed tuple for fast iteration. 246 247 :param data: Response data 248 :return: Error message or None if not found 249 250 Example: 251 >>> ResponseParser.extract_error_message({"error": "Not found"}) 252 'Not found' 253 >>> ResponseParser.extract_error_message({"message": "Failed"}) 254 'Failed' 255 >>> ResponseParser.extract_error_message({"success": True}) 256 None 257 """ 258 if not isinstance(data, dict): 259 return None 260 261 for field in _ERROR_FIELDS: 262 if field in data: 263 value = data[field] 264 # Fast path: already a string 265 if isinstance(value, str): 266 return value 267 # Convert non-string to string (None → None) 268 return str(value) if value is not None else None 269 270 return None 271 272 @staticmethod 273 def is_error_response(data: Mapping[str, object] | object) -> bool: 274 """Check if response indicates an error efficiently. 275 276 Fast boolean check for error indicators in response. 277 278 :param data: Response data 279 :return: True if response indicates an error 280 281 Example: 282 >>> ResponseParser.is_error_response({"error": "Failed"}) 283 True 284 >>> ResponseParser.is_error_response({"success": False}) 285 True 286 >>> ResponseParser.is_error_response({"success": True}) 287 False 288 >>> ResponseParser.is_error_response({}) 289 False 290 """ 291 if not isinstance(data, dict): 292 return False 293 294 if "error" in data or "error_message" in data: 295 return True 296 297 if "success" in data: 298 success = data["success"] 299 return success is False 300 301 # No error indicators found 302 return False
High-performance utility class for parsing and validating API responses.
70 @staticmethod 71 def parse( 72 data: dict[str, JsonValue], 73 model: type[T], 74 *, 75 as_json: bool = False, 76 ) -> T | JsonDict: 77 """Parse and validate response data with comprehensive error handling. 78 79 Type-safe overloads ensure correct return type based on as_json parameter. 80 81 :param data: Raw response data from API 82 :param model: Pydantic model class for validation 83 :param as_json: Return raw JSON dict instead of model instance 84 :return: Validated model instance or JSON dict 85 :raises ValidationError: If validation fails with detailed error info 86 87 Example: 88 >>> data = {"name": "test", "id": 123} 89 >>> # Type-safe: returns MyModel instance 90 >>> result = ResponseParser.parse(data, MyModel, as_json=False) 91 >>> # Type-safe: returns dict 92 >>> json_result = ResponseParser.parse(data, MyModel, as_json=True) 93 """ 94 if not isinstance(data, dict): 95 raise OutlineValidationError( 96 f"Expected dict, got {type(data).__name__}", 97 model=model.__name__, 98 ) 99 100 if not data and logger.isEnabledFor(Constants.LOG_LEVEL_DEBUG): 101 logger.debug("Parsing empty dict for model %s", model.__name__) 102 103 try: 104 validated = model.model_validate(data) 105 106 if as_json: 107 return cast( # type: ignore[redundant-cast, unused-ignore] 108 JsonDict, validated.model_dump(by_alias=True) 109 ) 110 return cast(T, validated) # type: ignore[redundant-cast, unused-ignore] 111 112 except ValidationError as e: 113 errors = e.errors() 114 115 if not errors: 116 raise OutlineValidationError( 117 "Validation failed with no error details", 118 model=model.__name__, 119 ) from e 120 121 first_error = errors[0] 122 field = ".".join(str(loc) for loc in first_error.get("loc", ())) 123 message = first_error.get("msg", "Validation failed") 124 125 error_count = len(errors) 126 if error_count > 1: 127 if logger.isEnabledFor(Constants.LOG_LEVEL_WARNING): 128 logger.warning( 129 "Multiple validation errors for %s: %d error(s)", 130 model.__name__, 131 error_count, 132 ) 133 134 if logger.isEnabledFor(Constants.LOG_LEVEL_DEBUG): 135 logger.debug("Validation error details:") 136 logged_count = min(error_count, _MAX_LOGGED_ERRORS) 137 138 for i, error in enumerate(errors[:logged_count], 1): 139 error_field = ".".join(str(loc) for loc in error.get("loc", ())) 140 error_msg = error.get("msg", "Unknown error") 141 logger.debug(" %d. %s: %s", i, error_field, error_msg) 142 143 if error_count > _MAX_LOGGED_ERRORS: 144 remaining = error_count - _MAX_LOGGED_ERRORS 145 logger.debug(" ... and %d more error(s)", remaining) 146 147 raise OutlineValidationError( 148 message, 149 field=field, 150 model=model.__name__, 151 ) from e 152 153 except Exception as e: 154 # Catch any other unexpected errors during validation 155 if logger.isEnabledFor(Constants.LOG_LEVEL_ERROR): 156 logger.error( 157 "Unexpected error during validation: %s", 158 e, 159 exc_info=True, 160 ) 161 raise OutlineValidationError( 162 f"Unexpected error during validation: {e}", 163 model=model.__name__, 164 ) from e
Parse and validate response data with comprehensive error handling.
Type-safe overloads ensure correct return type based on as_json parameter.
Parameters
- data: Raw response data from API
- model: Pydantic model class for validation
- as_json: Return raw JSON dict instead of model instance
Returns
Validated model instance or JSON dict
Raises
- ValidationError: If validation fails with detailed error info
Example:
>>> data = {"name": "test", "id": 123} >>> # Type-safe: returns MyModel instance >>> result = ResponseParser.parse(data, MyModel, as_json=False) >>> # Type-safe: returns dict >>> json_result = ResponseParser.parse(data, MyModel, as_json=True)
166 @staticmethod 167 def parse_simple(data: Mapping[str, JsonValue] | object) -> bool: 168 """Parse simple success/error responses efficiently. 169 170 Handles various response formats with minimal overhead: 171 - {"success": true/false} 172 - {"error": "..."} → False 173 - {"message": "..."} → False 174 - Empty dict → True (assumed success) 175 176 :param data: Response data 177 :return: True if successful, False otherwise 178 179 Example: 180 >>> ResponseParser.parse_simple({"success": True}) 181 True 182 >>> ResponseParser.parse_simple({"error": "Something failed"}) 183 False 184 >>> ResponseParser.parse_simple({}) 185 True 186 """ 187 if not isinstance(data, dict): 188 if logger.isEnabledFor(Constants.LOG_LEVEL_WARNING): 189 logger.warning( 190 "Expected dict in parse_simple, got %s", 191 type(data).__name__, 192 ) 193 return False 194 195 if "success" in data: 196 success = data["success"] 197 if not isinstance(success, bool): 198 if logger.isEnabledFor(Constants.LOG_LEVEL_WARNING): 199 logger.warning( 200 "success field is not bool: %s, coercing to bool", 201 type(success).__name__, 202 ) 203 return bool(success) 204 return success 205 206 return "error" not in data and "message" not in data
Parse simple success/error responses efficiently.
Handles various response formats with minimal overhead:
- {"success": true/false}
- {"error": "..."} → False
- {"message": "..."} → False
- Empty dict → True (assumed success)
Parameters
- data: Response data
Returns
True if successful, False otherwise
Example:
>>> ResponseParser.parse_simple({"success": True}) True >>> ResponseParser.parse_simple({"error": "Something failed"}) False >>> ResponseParser.parse_simple({}) True
208 @staticmethod 209 def validate_response_structure( 210 data: Mapping[str, JsonValue] | object, 211 required_fields: Sequence[str] | None = None, 212 ) -> bool: 213 """Validate response structure without full parsing. 214 215 Lightweight validation before expensive Pydantic validation. 216 Useful for early rejection of malformed responses. 217 218 :param data: Response data to validate 219 :param required_fields: Sequence of required field names 220 :return: True if structure is valid 221 222 Example: 223 >>> data = {"id": 1, "name": "test"} 224 >>> ResponseParser.validate_response_structure(data, ["id", "name"]) 225 True 226 >>> ResponseParser.validate_response_structure(data, ["id", "missing"]) 227 False 228 """ 229 if not isinstance(data, dict): 230 return False 231 232 if not data and not required_fields: 233 return True 234 235 if not required_fields: 236 return True 237 238 return all(field in data for field in required_fields)
Validate response structure without full parsing.
Lightweight validation before expensive Pydantic validation. Useful for early rejection of malformed responses.
Parameters
- data: Response data to validate
- required_fields: Sequence of required field names
Returns
True if structure is valid
Example:
>>> data = {"id": 1, "name": "test"} >>> ResponseParser.validate_response_structure(data, ["id", "name"]) True >>> ResponseParser.validate_response_structure(data, ["id", "missing"]) False
240 @staticmethod 241 def extract_error_message(data: Mapping[str, JsonValue] | object) -> str | None: 242 """Extract error message from response data efficiently. 243 244 Checks common error field names in order of preference. 245 Uses pre-computed tuple for fast iteration. 246 247 :param data: Response data 248 :return: Error message or None if not found 249 250 Example: 251 >>> ResponseParser.extract_error_message({"error": "Not found"}) 252 'Not found' 253 >>> ResponseParser.extract_error_message({"message": "Failed"}) 254 'Failed' 255 >>> ResponseParser.extract_error_message({"success": True}) 256 None 257 """ 258 if not isinstance(data, dict): 259 return None 260 261 for field in _ERROR_FIELDS: 262 if field in data: 263 value = data[field] 264 # Fast path: already a string 265 if isinstance(value, str): 266 return value 267 # Convert non-string to string (None → None) 268 return str(value) if value is not None else None 269 270 return None
Extract error message from response data efficiently.
Checks common error field names in order of preference. Uses pre-computed tuple for fast iteration.
Parameters
- data: Response data
Returns
Error message or None if not found
Example:
>>> ResponseParser.extract_error_message({"error": "Not found"}) 'Not found' >>> ResponseParser.extract_error_message({"message": "Failed"}) 'Failed' >>> ResponseParser.extract_error_message({"success": True}) None
272 @staticmethod 273 def is_error_response(data: Mapping[str, object] | object) -> bool: 274 """Check if response indicates an error efficiently. 275 276 Fast boolean check for error indicators in response. 277 278 :param data: Response data 279 :return: True if response indicates an error 280 281 Example: 282 >>> ResponseParser.is_error_response({"error": "Failed"}) 283 True 284 >>> ResponseParser.is_error_response({"success": False}) 285 True 286 >>> ResponseParser.is_error_response({"success": True}) 287 False 288 >>> ResponseParser.is_error_response({}) 289 False 290 """ 291 if not isinstance(data, dict): 292 return False 293 294 if "error" in data or "error_message" in data: 295 return True 296 297 if "success" in data: 298 success = data["success"] 299 return success is False 300 301 # No error indicators found 302 return False
Check if response indicates an error efficiently.
Fast boolean check for error indicators in response.
Parameters
- data: Response data
Returns
True if response indicates an error
Example:
>>> ResponseParser.is_error_response({"error": "Failed"}) True >>> ResponseParser.is_error_response({"success": False}) True >>> ResponseParser.is_error_response({"success": True}) False >>> ResponseParser.is_error_response({}) False
339class SecureIDGenerator: 340 """Cryptographically secure ID generation.""" 341 342 __slots__ = () 343 344 @staticmethod 345 def generate_correlation_id() -> str: 346 """Generate secure correlation ID with 128 bits entropy. 347 348 Format: {timestamp_us}-{random_hex} 349 350 :return: Correlation ID string 351 """ 352 # 16 bytes = 128 bits of entropy 353 random_part = secrets.token_hex(16) 354 355 # Microsecond timestamp for uniqueness and ordering 356 timestamp = int(time.time() * 1_000_000) 357 358 return f"{timestamp}-{random_part}" 359 360 @staticmethod 361 def generate_request_id() -> str: 362 """Generate secure request ID. 363 364 Alias for correlation ID for API compatibility. 365 366 :return: Request ID string 367 """ 368 return SecureIDGenerator.generate_correlation_id()
Cryptographically secure ID generation.
344 @staticmethod 345 def generate_correlation_id() -> str: 346 """Generate secure correlation ID with 128 bits entropy. 347 348 Format: {timestamp_us}-{random_hex} 349 350 :return: Correlation ID string 351 """ 352 # 16 bytes = 128 bits of entropy 353 random_part = secrets.token_hex(16) 354 355 # Microsecond timestamp for uniqueness and ordering 356 timestamp = int(time.time() * 1_000_000) 357 358 return f"{timestamp}-{random_part}"
Generate secure correlation ID with 128 bits entropy.
Format: {timestamp_us}-{random_hex}
Returns
Correlation ID string
360 @staticmethod 361 def generate_request_id() -> str: 362 """Generate secure request ID. 363 364 Alias for correlation ID for API compatibility. 365 366 :return: Request ID string 367 """ 368 return SecureIDGenerator.generate_correlation_id()
Generate secure request ID.
Alias for correlation ID for API compatibility.
Returns
Request ID string
257class Server(BaseValidatedModel): 258 """Server information model with optimized properties. 259 260 SCHEMA: Based on GET /server response 261 """ 262 263 name: str | None = None 264 server_id: str = Field(alias="serverId") 265 metrics_enabled: bool = Field(alias="metricsEnabled") 266 created_timestamp_ms: TimestampMs = Field(alias="createdTimestampMs") 267 port_for_new_access_keys: Port = Field(alias="portForNewAccessKeys") 268 hostname_for_access_keys: str | None = Field(None, alias="hostnameForAccessKeys") 269 access_key_data_limit: DataLimit | None = Field(None, alias="accessKeyDataLimit") 270 version: str | None = None 271 272 @field_validator("name", mode="before") 273 @classmethod 274 def validate_name(cls, v: str) -> str: 275 """Validate server name. 276 277 :param v: Server name 278 :return: Validated name 279 :raises ValueError: If name is empty 280 """ 281 validated = Validators.validate_name(v) 282 if validated is None: 283 raise ValueError("Server name cannot be empty") 284 return validated 285 286 @property 287 def has_global_limit(self) -> bool: 288 """Check if server has global data limit (optimized). 289 290 :return: True if global limit exists 291 """ 292 return self.access_key_data_limit is not None 293 294 @cached_property 295 def created_timestamp_seconds(self) -> float: 296 """Get creation timestamp in seconds (cached). 297 298 NOTE: Cached because timestamp is immutable 299 300 :return: Timestamp in seconds 301 """ 302 return self.created_timestamp_ms / _MS_IN_SEC
Server information model with optimized properties.
SCHEMA: Based on GET /server response
Unix timestamp in milliseconds
Port number (1-65535)
272 @field_validator("name", mode="before") 273 @classmethod 274 def validate_name(cls, v: str) -> str: 275 """Validate server name. 276 277 :param v: Server name 278 :return: Validated name 279 :raises ValueError: If name is empty 280 """ 281 validated = Validators.validate_name(v) 282 if validated is None: 283 raise ValueError("Server name cannot be empty") 284 return validated
Validate server name.
Parameters
- v: Server name
Returns
Validated name
Raises
- ValueError: If name is empty
286 @property 287 def has_global_limit(self) -> bool: 288 """Check if server has global data limit (optimized). 289 290 :return: True if global limit exists 291 """ 292 return self.access_key_data_limit is not None
Check if server has global data limit (optimized).
Returns
True if global limit exists
294 @cached_property 295 def created_timestamp_seconds(self) -> float: 296 """Get creation timestamp in seconds (cached). 297 298 NOTE: Cached because timestamp is immutable 299 300 :return: Timestamp in seconds 301 """ 302 return self.created_timestamp_ms / _MS_IN_SEC
Get creation timestamp in seconds (cached).
NOTE: Cached because timestamp is immutable
Returns
Timestamp in seconds
455class ServerExperimentalMetric(BaseValidatedModel): 456 """Server-level experimental metrics. 457 458 SCHEMA: Based on experimental metrics server object 459 """ 460 461 tunnel_time: TunnelTime = Field(alias="tunnelTime") 462 data_transferred: DataTransferred = Field(alias="dataTransferred") 463 bandwidth: BandwidthInfo 464 locations: list[LocationMetric]
Server-level experimental metrics.
SCHEMA: Based on experimental metrics server object
308class ServerMetrics(BaseValidatedModel): 309 """Transfer metrics with optimized aggregations. 310 311 SCHEMA: Based on GET /metrics/transfer response 312 """ 313 314 bytes_transferred_by_user_id: BytesPerUserDict = Field( 315 alias="bytesTransferredByUserId" 316 ) 317 318 @cached_property 319 def total_bytes(self) -> int: 320 """Calculate total bytes with caching. 321 322 :return: Total bytes transferred 323 """ 324 return sum(self.bytes_transferred_by_user_id.values()) 325 326 @cached_property 327 def total_gigabytes(self) -> float: 328 """Get total in gigabytes (uses cached total_bytes). 329 330 :return: Total GB transferred 331 """ 332 return self.total_bytes / _BYTES_IN_GB 333 334 @cached_property 335 def user_count(self) -> int: 336 """Get number of users (cached). 337 338 :return: Number of users 339 """ 340 return len(self.bytes_transferred_by_user_id) 341 342 def get_user_bytes(self, user_id: str) -> int: 343 """Get bytes for specific user (O(1) dict lookup). 344 345 :param user_id: User/key ID 346 :return: Bytes transferred or 0 if not found 347 """ 348 return self.bytes_transferred_by_user_id.get(user_id, 0) 349 350 def top_users(self, limit: int = 10) -> list[tuple[str, int]]: 351 """Get top users by bytes transferred (optimized sorting). 352 353 :param limit: Number of top users to return 354 :return: List of (user_id, bytes) tuples 355 """ 356 return sorted( 357 self.bytes_transferred_by_user_id.items(), 358 key=lambda x: x[1], 359 reverse=True, 360 )[:limit]
Transfer metrics with optimized aggregations.
SCHEMA: Based on GET /metrics/transfer response
318 @cached_property 319 def total_bytes(self) -> int: 320 """Calculate total bytes with caching. 321 322 :return: Total bytes transferred 323 """ 324 return sum(self.bytes_transferred_by_user_id.values())
Calculate total bytes with caching.
Returns
Total bytes transferred
326 @cached_property 327 def total_gigabytes(self) -> float: 328 """Get total in gigabytes (uses cached total_bytes). 329 330 :return: Total GB transferred 331 """ 332 return self.total_bytes / _BYTES_IN_GB
Get total in gigabytes (uses cached total_bytes).
Returns
Total GB transferred
334 @cached_property 335 def user_count(self) -> int: 336 """Get number of users (cached). 337 338 :return: Number of users 339 """ 340 return len(self.bytes_transferred_by_user_id)
Get number of users (cached).
Returns
Number of users
342 def get_user_bytes(self, user_id: str) -> int: 343 """Get bytes for specific user (O(1) dict lookup). 344 345 :param user_id: User/key ID 346 :return: Bytes transferred or 0 if not found 347 """ 348 return self.bytes_transferred_by_user_id.get(user_id, 0)
Get bytes for specific user (O(1) dict lookup).
Parameters
- user_id: User/key ID
Returns
Bytes transferred or 0 if not found
350 def top_users(self, limit: int = 10) -> list[tuple[str, int]]: 351 """Get top users by bytes transferred (optimized sorting). 352 353 :param limit: Number of top users to return 354 :return: List of (user_id, bytes) tuples 355 """ 356 return sorted( 357 self.bytes_transferred_by_user_id.items(), 358 key=lambda x: x[1], 359 reverse=True, 360 )[:limit]
Get top users by bytes transferred (optimized sorting).
Parameters
- limit: Number of top users to return
Returns
List of (user_id, bytes) tuples
504class ServerNameRequest(BaseValidatedModel): 505 """Request model for renaming server. 506 507 SCHEMA: Based on PUT /name request body 508 """ 509 510 name: str = Field(min_length=1, max_length=255)
Request model for renaming server.
SCHEMA: Based on PUT /name request body
633class ServerSummary(BaseValidatedModel): 634 """Server summary with optimized aggregations.""" 635 636 server: dict[str, Any] 637 access_keys_count: int 638 healthy: bool 639 transfer_metrics: BytesPerUserDict | None = None 640 experimental_metrics: dict[str, Any] | None = None 641 error: str | None = None 642 643 @property 644 def total_bytes_transferred(self) -> int: 645 """Get total bytes with early return optimization. 646 647 :return: Total bytes or 0 if no metrics 648 """ 649 if not self.transfer_metrics: 650 return 0 # Early return 651 return sum(self.transfer_metrics.values()) 652 653 @property 654 def total_gigabytes_transferred(self) -> float: 655 """Get total GB (uses total_bytes_transferred). 656 657 :return: Total GB or 0.0 if no metrics 658 """ 659 return self.total_bytes_transferred / _BYTES_IN_GB 660 661 @property 662 def has_errors(self) -> bool: 663 """Check if summary has errors (optimized None check). 664 665 :return: True if errors present 666 """ 667 return self.error is not None
Server summary with optimized aggregations.
643 @property 644 def total_bytes_transferred(self) -> int: 645 """Get total bytes with early return optimization. 646 647 :return: Total bytes or 0 if no metrics 648 """ 649 if not self.transfer_metrics: 650 return 0 # Early return 651 return sum(self.transfer_metrics.values())
Get total bytes with early return optimization.
Returns
Total bytes or 0 if no metrics
653 @property 654 def total_gigabytes_transferred(self) -> float: 655 """Get total GB (uses total_bytes_transferred). 656 657 :return: Total GB or 0.0 if no metrics 658 """ 659 return self.total_bytes_transferred / _BYTES_IN_GB
Get total GB (uses total_bytes_transferred).
Returns
Total GB or 0.0 if no metrics
363class TunnelTime(BaseValidatedModel, TimeConversionMixin): 364 """Tunnel time metric with time conversions. 365 366 SCHEMA: Based on experimental metrics tunnelTime object 367 """ 368 369 seconds: int = Field(ge=0)
Tunnel time metric with time conversions.
SCHEMA: Based on experimental metrics tunnelTime object
362class ValidationError(OutlineError): 363 """Data validation failure. 364 365 Raised when data fails validation against expected schema. 366 367 Attributes: 368 field: Field name that failed validation 369 model: Model name 370 371 Example: 372 >>> error = ValidationError( 373 ... "Invalid port number", field="port", model="ServerConfig" 374 ... ) 375 """ 376 377 __slots__ = ("field", "model") 378 379 def __init__( 380 self, 381 message: str, 382 *, 383 field: str | None = None, 384 model: str | None = None, 385 ) -> None: 386 """Initialize validation error. 387 388 Args: 389 message: Error message 390 field: Field name that failed validation 391 model: Model name 392 """ 393 safe_details: dict[str, Any] | None = None 394 if field or model: 395 safe_details = {} 396 if field: 397 safe_details["field"] = field 398 if model: 399 safe_details["model"] = model 400 401 super().__init__(message, safe_details=safe_details) 402 403 self.field = field 404 self.model = model
Data validation failure.
Raised when data fails validation against expected schema.
Attributes:
- field: Field name that failed validation
- model: Model name
Example:
>>> error = ValidationError( ... "Invalid port number", field="port", model="ServerConfig" ... )
379 def __init__( 380 self, 381 message: str, 382 *, 383 field: str | None = None, 384 model: str | None = None, 385 ) -> None: 386 """Initialize validation error. 387 388 Args: 389 message: Error message 390 field: Field name that failed validation 391 model: Model name 392 """ 393 safe_details: dict[str, Any] | None = None 394 if field or model: 395 safe_details = {} 396 if field: 397 safe_details["field"] = field 398 if model: 399 safe_details["model"] = model 400 401 super().__init__(message, safe_details=safe_details) 402 403 self.field = field 404 self.model = model
Initialize validation error.
Arguments:
- message: Error message
- field: Field name that failed validation
- model: Model name
433class Validators: 434 """Input validation utilities with security hardening.""" 435 436 __slots__ = () 437 438 @staticmethod 439 @lru_cache(maxsize=64) 440 def validate_cert_fingerprint(fingerprint: SecretStr) -> SecretStr: 441 """Validate and normalize certificate fingerprint. 442 443 :param fingerprint: SHA-256 fingerprint 444 :return: Normalized fingerprint (lowercase, no separators) 445 :raises ValueError: If format is invalid 446 """ 447 if not fingerprint: 448 raise ValueError("Certificate fingerprint cannot be empty") 449 450 # Remove common separators 451 cleaned = fingerprint.get_secret_value().lower() 452 453 # Validate hex format 454 if not re.match(r"^[a-f0-9]{64}$", cleaned): 455 raise ValueError( 456 f"Invalid certificate fingerprint format. " 457 f"Expected 64 hex characters, got: {len(cleaned)}" 458 ) 459 460 return SecretStr(cleaned) 461 462 @staticmethod 463 def validate_port(port: int) -> int: 464 """Validate port number. 465 466 :param port: Port number 467 :return: Validated port 468 :raises ValueError: If port is out of range 469 """ 470 if not is_valid_port(port): 471 raise ValueError( 472 f"Port must be between {Constants.MIN_PORT} and {Constants.MAX_PORT}" 473 ) 474 return port 475 476 @staticmethod 477 def validate_name(name: str) -> str: 478 """Validate name field. 479 480 :param name: Name to validate 481 :return: Validated name 482 :raises ValueError: If name is invalid 483 """ 484 if not name or not name.strip(): 485 raise ValueError("Name cannot be empty") 486 487 name = name.strip() 488 if len(name) > Constants.MAX_NAME_LENGTH: 489 raise ValueError( 490 f"Name too long: {len(name)} (max {Constants.MAX_NAME_LENGTH})" 491 ) 492 493 return name 494 495 @staticmethod 496 def validate_url( 497 url: str, 498 *, 499 allow_private_networks: bool = True, 500 resolve_dns: bool = False, 501 ) -> str: 502 """Validate and sanitize URL. 503 504 :param url: URL to validate 505 :param allow_private_networks: Allow private/local network addresses 506 :param resolve_dns: Resolve hostname and block private/reserved IPs 507 :return: Validated URL 508 :raises ValueError: If URL is invalid 509 """ 510 if not url or not url.strip(): 511 raise ValueError("URL cannot be empty") 512 513 url = url.strip() 514 515 if len(url) > Constants.MAX_URL_LENGTH: 516 raise ValueError( 517 f"URL too long: {len(url)} (max {Constants.MAX_URL_LENGTH})" 518 ) 519 520 # Check for null bytes 521 if "\x00" in url: 522 raise ValueError("URL contains null bytes") 523 524 # Parse URL 525 try: 526 parsed = urlparse(url) 527 if not parsed.scheme or not parsed.netloc: 528 raise ValueError("Invalid URL format") 529 except Exception as e: 530 raise ValueError(f"Invalid URL: {e}") from e 531 532 # SSRF protection for raw IPs in hostname (does not resolve DNS) 533 if ( 534 not allow_private_networks 535 and parsed.hostname 536 and SSRFProtection.is_blocked_ip(parsed.hostname) 537 ): 538 raise ValueError( 539 f"Access to {parsed.hostname} is blocked (SSRF protection)" 540 ) 541 542 # Explicitly block localhost when private networks are disallowed 543 if ( 544 not allow_private_networks 545 and parsed.hostname in SSRFProtection.ALLOWED_LOCALHOST 546 ): 547 raise ValueError( 548 f"Access to {parsed.hostname} is blocked (SSRF protection)" 549 ) 550 551 # Strict SSRF protection with DNS resolution (guards against rebinding) 552 if ( 553 resolve_dns 554 and not allow_private_networks 555 and parsed.hostname 556 and not SSRFProtection.is_blocked_ip(parsed.hostname) 557 and SSRFProtection.is_blocked_hostname(parsed.hostname) 558 ): 559 raise ValueError( 560 f"Access to {parsed.hostname} is blocked (SSRF protection)" 561 ) 562 563 return url 564 565 @staticmethod 566 def validate_string_not_empty(value: str, field_name: str) -> str: 567 """Validate string is not empty. 568 569 :param value: String value 570 :param field_name: Field name for error messages 571 :return: Stripped string 572 :raises ValueError: If string is empty 573 """ 574 if not value or not value.strip(): 575 raise ValueError(f"{field_name} cannot be empty") 576 return value.strip() 577 578 @staticmethod 579 def _validate_length(value: str, max_length: int, name: str) -> None: 580 """Validate string length. 581 582 :param value: String value 583 :param max_length: Maximum allowed length 584 :param name: Field name for error messages 585 :raises ValueError: If string is too long 586 """ 587 if len(value) > max_length: 588 raise ValueError(f"{name} too long: {len(value)} (max {max_length})") 589 590 @staticmethod 591 def _validate_no_null_bytes(value: str, name: str) -> None: 592 """Validate string contains no null bytes. 593 594 :param value: String value 595 :param name: Field name for error messages 596 :raises ValueError: If string contains null bytes 597 """ 598 if "\x00" in value: 599 raise ValueError(f"{name} contains null bytes") 600 601 @staticmethod 602 def validate_non_negative(value: DataLimit | int, name: str) -> int: 603 """Validate integer is non-negative. 604 605 :param value: Integer value 606 :param name: Field name for error messages 607 :return: Validated value 608 :raises ValueError: If value is negative 609 """ 610 from .models import DataLimit 611 612 raw_value = value.bytes if isinstance(value, DataLimit) else value 613 if raw_value < 0: 614 raise ValueError(f"{name} must be non-negative, got {raw_value}") 615 return raw_value 616 617 @staticmethod 618 def validate_since(value: str) -> str: 619 """Validate experimental metrics 'since' parameter. 620 621 Accepts: 622 - Relative durations: 24h, 7d, 30m, 15s 623 - ISO-8601 timestamps (e.g., 2024-01-01T00:00:00Z) 624 625 :param value: Since parameter 626 :return: Sanitized since value 627 :raises ValueError: If value is invalid 628 """ 629 if not value or not value.strip(): 630 raise ValueError("'since' parameter cannot be empty") 631 632 sanitized = value.strip() 633 634 # Relative format (number + suffix) 635 if len(sanitized) >= 2 and sanitized[-1] in {"h", "d", "m", "s"}: 636 number = sanitized[:-1] 637 if number.isdigit(): 638 return sanitized 639 640 # ISO-8601 timestamp (allow trailing Z) 641 iso_value = sanitized.replace("Z", "+00:00") 642 try: 643 datetime.fromisoformat(iso_value) 644 return sanitized 645 except ValueError: 646 raise ValueError( 647 "'since' must be a relative duration (e.g., '24h', '7d') " 648 "or ISO-8601 timestamp" 649 ) from None 650 651 @classmethod 652 @lru_cache(maxsize=256) 653 def validate_key_id(cls, key_id: str) -> str: 654 """Enhanced key_id validation. 655 656 :param key_id: Key ID to validate 657 :return: Validated key ID 658 :raises ValueError: If key ID is invalid 659 """ 660 clean_id = cls.validate_string_not_empty(key_id, "key_id") 661 cls._validate_length(clean_id, Constants.MAX_KEY_ID_LENGTH, "key_id") 662 cls._validate_no_null_bytes(clean_id, "key_id") 663 664 try: 665 decoded = urllib.parse.unquote(clean_id) 666 double_decoded = urllib.parse.unquote(decoded) 667 668 # Check all variants for malicious characters 669 for variant in [clean_id, decoded, double_decoded]: 670 if any(c in variant for c in {".", "/", "\\", "%", "\x00"}): 671 raise ValueError( 672 "key_id contains invalid characters (., /, \\, %, null)" 673 ) 674 except Exception as e: 675 raise ValueError(f"Invalid key_id encoding: {e}") from e 676 677 # Strict whitelist approach 678 allowed_chars = frozenset( 679 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-" 680 ) 681 if not all(c in allowed_chars for c in clean_id): 682 raise ValueError("key_id must be alphanumeric, dashes, underscores only") 683 684 return clean_id 685 686 @staticmethod 687 @lru_cache(maxsize=256) 688 def sanitize_url_for_logging(url: str) -> str: 689 """Remove secret path from URL for safe logging. 690 691 :param url: URL to sanitize 692 :return: Sanitized URL 693 """ 694 try: 695 parsed = urlparse(url) 696 return f"{parsed.scheme}://{parsed.netloc}/***" 697 except Exception: 698 return "***INVALID_URL***" 699 700 @staticmethod 701 @lru_cache(maxsize=512) 702 def sanitize_endpoint_for_logging(endpoint: str) -> str: 703 """Sanitize endpoint for safe logging. 704 705 :param endpoint: Endpoint to sanitize 706 :return: Sanitized endpoint 707 """ 708 if not endpoint: 709 return "***EMPTY***" 710 711 parts = endpoint.split("/") 712 sanitized = [part if len(part) <= 20 else "***" for part in parts] 713 return "/".join(sanitized)
Input validation utilities with security hardening.
438 @staticmethod 439 @lru_cache(maxsize=64) 440 def validate_cert_fingerprint(fingerprint: SecretStr) -> SecretStr: 441 """Validate and normalize certificate fingerprint. 442 443 :param fingerprint: SHA-256 fingerprint 444 :return: Normalized fingerprint (lowercase, no separators) 445 :raises ValueError: If format is invalid 446 """ 447 if not fingerprint: 448 raise ValueError("Certificate fingerprint cannot be empty") 449 450 # Remove common separators 451 cleaned = fingerprint.get_secret_value().lower() 452 453 # Validate hex format 454 if not re.match(r"^[a-f0-9]{64}$", cleaned): 455 raise ValueError( 456 f"Invalid certificate fingerprint format. " 457 f"Expected 64 hex characters, got: {len(cleaned)}" 458 ) 459 460 return SecretStr(cleaned)
Validate and normalize certificate fingerprint.
Parameters
- fingerprint: SHA-256 fingerprint
Returns
Normalized fingerprint (lowercase, no separators)
Raises
- ValueError: If format is invalid
462 @staticmethod 463 def validate_port(port: int) -> int: 464 """Validate port number. 465 466 :param port: Port number 467 :return: Validated port 468 :raises ValueError: If port is out of range 469 """ 470 if not is_valid_port(port): 471 raise ValueError( 472 f"Port must be between {Constants.MIN_PORT} and {Constants.MAX_PORT}" 473 ) 474 return port
Validate port number.
Parameters
- port: Port number
Returns
Validated port
Raises
- ValueError: If port is out of range
476 @staticmethod 477 def validate_name(name: str) -> str: 478 """Validate name field. 479 480 :param name: Name to validate 481 :return: Validated name 482 :raises ValueError: If name is invalid 483 """ 484 if not name or not name.strip(): 485 raise ValueError("Name cannot be empty") 486 487 name = name.strip() 488 if len(name) > Constants.MAX_NAME_LENGTH: 489 raise ValueError( 490 f"Name too long: {len(name)} (max {Constants.MAX_NAME_LENGTH})" 491 ) 492 493 return name
Validate name field.
Parameters
- name: Name to validate
Returns
Validated name
Raises
- ValueError: If name is invalid
495 @staticmethod 496 def validate_url( 497 url: str, 498 *, 499 allow_private_networks: bool = True, 500 resolve_dns: bool = False, 501 ) -> str: 502 """Validate and sanitize URL. 503 504 :param url: URL to validate 505 :param allow_private_networks: Allow private/local network addresses 506 :param resolve_dns: Resolve hostname and block private/reserved IPs 507 :return: Validated URL 508 :raises ValueError: If URL is invalid 509 """ 510 if not url or not url.strip(): 511 raise ValueError("URL cannot be empty") 512 513 url = url.strip() 514 515 if len(url) > Constants.MAX_URL_LENGTH: 516 raise ValueError( 517 f"URL too long: {len(url)} (max {Constants.MAX_URL_LENGTH})" 518 ) 519 520 # Check for null bytes 521 if "\x00" in url: 522 raise ValueError("URL contains null bytes") 523 524 # Parse URL 525 try: 526 parsed = urlparse(url) 527 if not parsed.scheme or not parsed.netloc: 528 raise ValueError("Invalid URL format") 529 except Exception as e: 530 raise ValueError(f"Invalid URL: {e}") from e 531 532 # SSRF protection for raw IPs in hostname (does not resolve DNS) 533 if ( 534 not allow_private_networks 535 and parsed.hostname 536 and SSRFProtection.is_blocked_ip(parsed.hostname) 537 ): 538 raise ValueError( 539 f"Access to {parsed.hostname} is blocked (SSRF protection)" 540 ) 541 542 # Explicitly block localhost when private networks are disallowed 543 if ( 544 not allow_private_networks 545 and parsed.hostname in SSRFProtection.ALLOWED_LOCALHOST 546 ): 547 raise ValueError( 548 f"Access to {parsed.hostname} is blocked (SSRF protection)" 549 ) 550 551 # Strict SSRF protection with DNS resolution (guards against rebinding) 552 if ( 553 resolve_dns 554 and not allow_private_networks 555 and parsed.hostname 556 and not SSRFProtection.is_blocked_ip(parsed.hostname) 557 and SSRFProtection.is_blocked_hostname(parsed.hostname) 558 ): 559 raise ValueError( 560 f"Access to {parsed.hostname} is blocked (SSRF protection)" 561 ) 562 563 return url
Validate and sanitize URL.
Parameters
- url: URL to validate
- allow_private_networks: Allow private/local network addresses
- resolve_dns: Resolve hostname and block private/reserved IPs
Returns
Validated URL
Raises
- ValueError: If URL is invalid
565 @staticmethod 566 def validate_string_not_empty(value: str, field_name: str) -> str: 567 """Validate string is not empty. 568 569 :param value: String value 570 :param field_name: Field name for error messages 571 :return: Stripped string 572 :raises ValueError: If string is empty 573 """ 574 if not value or not value.strip(): 575 raise ValueError(f"{field_name} cannot be empty") 576 return value.strip()
Validate string is not empty.
Parameters
- value: String value
- field_name: Field name for error messages
Returns
Stripped string
Raises
- ValueError: If string is empty
601 @staticmethod 602 def validate_non_negative(value: DataLimit | int, name: str) -> int: 603 """Validate integer is non-negative. 604 605 :param value: Integer value 606 :param name: Field name for error messages 607 :return: Validated value 608 :raises ValueError: If value is negative 609 """ 610 from .models import DataLimit 611 612 raw_value = value.bytes if isinstance(value, DataLimit) else value 613 if raw_value < 0: 614 raise ValueError(f"{name} must be non-negative, got {raw_value}") 615 return raw_value
Validate integer is non-negative.
Parameters
- value: Integer value
- name: Field name for error messages
Returns
Validated value
Raises
- ValueError: If value is negative
617 @staticmethod 618 def validate_since(value: str) -> str: 619 """Validate experimental metrics 'since' parameter. 620 621 Accepts: 622 - Relative durations: 24h, 7d, 30m, 15s 623 - ISO-8601 timestamps (e.g., 2024-01-01T00:00:00Z) 624 625 :param value: Since parameter 626 :return: Sanitized since value 627 :raises ValueError: If value is invalid 628 """ 629 if not value or not value.strip(): 630 raise ValueError("'since' parameter cannot be empty") 631 632 sanitized = value.strip() 633 634 # Relative format (number + suffix) 635 if len(sanitized) >= 2 and sanitized[-1] in {"h", "d", "m", "s"}: 636 number = sanitized[:-1] 637 if number.isdigit(): 638 return sanitized 639 640 # ISO-8601 timestamp (allow trailing Z) 641 iso_value = sanitized.replace("Z", "+00:00") 642 try: 643 datetime.fromisoformat(iso_value) 644 return sanitized 645 except ValueError: 646 raise ValueError( 647 "'since' must be a relative duration (e.g., '24h', '7d') " 648 "or ISO-8601 timestamp" 649 ) from None
Validate experimental metrics 'since' parameter.
Accepts:
- Relative durations: 24h, 7d, 30m, 15s
- ISO-8601 timestamps (e.g., 2024-01-01T00:00:00Z)
Parameters
- value: Since parameter
Returns
Sanitized since value
Raises
- ValueError: If value is invalid
651 @classmethod 652 @lru_cache(maxsize=256) 653 def validate_key_id(cls, key_id: str) -> str: 654 """Enhanced key_id validation. 655 656 :param key_id: Key ID to validate 657 :return: Validated key ID 658 :raises ValueError: If key ID is invalid 659 """ 660 clean_id = cls.validate_string_not_empty(key_id, "key_id") 661 cls._validate_length(clean_id, Constants.MAX_KEY_ID_LENGTH, "key_id") 662 cls._validate_no_null_bytes(clean_id, "key_id") 663 664 try: 665 decoded = urllib.parse.unquote(clean_id) 666 double_decoded = urllib.parse.unquote(decoded) 667 668 # Check all variants for malicious characters 669 for variant in [clean_id, decoded, double_decoded]: 670 if any(c in variant for c in {".", "/", "\\", "%", "\x00"}): 671 raise ValueError( 672 "key_id contains invalid characters (., /, \\, %, null)" 673 ) 674 except Exception as e: 675 raise ValueError(f"Invalid key_id encoding: {e}") from e 676 677 # Strict whitelist approach 678 allowed_chars = frozenset( 679 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-" 680 ) 681 if not all(c in allowed_chars for c in clean_id): 682 raise ValueError("key_id must be alphanumeric, dashes, underscores only") 683 684 return clean_id
Enhanced key_id validation.
Parameters
- key_id: Key ID to validate
Returns
Validated key ID
Raises
- ValueError: If key ID is invalid
686 @staticmethod 687 @lru_cache(maxsize=256) 688 def sanitize_url_for_logging(url: str) -> str: 689 """Remove secret path from URL for safe logging. 690 691 :param url: URL to sanitize 692 :return: Sanitized URL 693 """ 694 try: 695 parsed = urlparse(url) 696 return f"{parsed.scheme}://{parsed.netloc}/***" 697 except Exception: 698 return "***INVALID_URL***"
Remove secret path from URL for safe logging.
Parameters
- url: URL to sanitize
Returns
Sanitized URL
700 @staticmethod 701 @lru_cache(maxsize=512) 702 def sanitize_endpoint_for_logging(endpoint: str) -> str: 703 """Sanitize endpoint for safe logging. 704 705 :param endpoint: Endpoint to sanitize 706 :return: Sanitized endpoint 707 """ 708 if not endpoint: 709 return "***EMPTY***" 710 711 parts = endpoint.split("/") 712 sanitized = [part if len(part) <= 20 else "***" for part in parts] 713 return "/".join(sanitized)
Sanitize endpoint for safe logging.
Parameters
- endpoint: Endpoint to sanitize
Returns
Sanitized endpoint
575def audited( 576 *, 577 log_success: bool = True, 578 log_failure: bool = True, 579) -> Callable[[Callable[P, R]], Callable[P, R]]: 580 """Audit logging decorator with zero-config smart extraction. 581 582 Automatically extracts ALL information from function signature and execution: 583 - Action name: from function name 584 - Resource: from result.id, first parameter, or function analysis 585 - Details: from function signature (excluding None and defaults) 586 - Correlation ID: from instance._correlation_id if available 587 - Success/failure: from exception handling 588 589 Usage: 590 @audited() 591 async def create_access_key(self, name: str, port: int = 8080) -> AccessKey: 592 # action: "create_access_key" 593 # resource: result.id 594 # details: {"name": "...", "port": 8080} (if not default) 595 ... 596 597 @audited(log_success=False) 598 async def critical_operation(self, resource_id: str) -> bool: 599 # Only logs failures for alerting 600 ... 601 602 :param log_success: Log successful operations (default: True) 603 :param log_failure: Log failed operations (default: True) 604 :return: Decorated function with automatic audit logging 605 """ 606 607 def decorator(func: Callable[P, R]) -> Callable[P, R]: 608 # Determine if function is async at decoration time 609 is_async = inspect.iscoroutinefunction(func) 610 611 if is_async: 612 async_func = cast("Callable[P, Awaitable[object]]", func) 613 614 @wraps(func) 615 async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> object: 616 # Check for audit logger on instance 617 instance = args[0] if args else None 618 audit_logger = getattr(instance, "_audit_logger", None) 619 620 # No logger? Execute without audit 621 if audit_logger is None: 622 return await async_func(*args, **kwargs) 623 624 result: object | None = None 625 626 try: 627 result = await async_func(*args, **kwargs) 628 except Exception as e: 629 if log_failure: 630 ctx = AuditContext.from_call( 631 func=func, 632 instance=instance, 633 args=args, 634 kwargs=kwargs, 635 result=result, 636 exception=e, 637 ) 638 task = asyncio.create_task( 639 audit_logger.alog_action( 640 action=ctx.action, 641 resource=ctx.resource, 642 details=ctx.details, 643 correlation_id=ctx.correlation_id, 644 ) 645 ) 646 task.add_done_callback(lambda t: t.exception()) 647 raise 648 else: 649 if log_success: 650 ctx = AuditContext.from_call( 651 func=func, 652 instance=instance, 653 args=args, 654 kwargs=kwargs, 655 result=result, 656 exception=None, 657 ) 658 task = asyncio.create_task( 659 audit_logger.alog_action( 660 action=ctx.action, 661 resource=ctx.resource, 662 details=ctx.details, 663 correlation_id=ctx.correlation_id, 664 ) 665 ) 666 task.add_done_callback(lambda t: t.exception()) 667 return result 668 669 return cast("Callable[P, R]", async_wrapper) 670 671 else: 672 sync_func = cast("Callable[P, object]", func) 673 674 @wraps(func) 675 def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> object: 676 # Check for audit logger on instance 677 instance = args[0] if args else None 678 audit_logger = getattr(instance, "_audit_logger", None) 679 680 # No logger? Execute without audit 681 if audit_logger is None: 682 return sync_func(*args, **kwargs) 683 684 result: object | None = None 685 686 try: 687 result = sync_func(*args, **kwargs) 688 except Exception as e: 689 if log_failure: 690 ctx = AuditContext.from_call( 691 func=func, 692 instance=instance, 693 args=args, 694 kwargs=kwargs, 695 result=result, 696 exception=e, 697 ) 698 audit_logger.log_action( 699 action=ctx.action, 700 resource=ctx.resource, 701 details=ctx.details, 702 correlation_id=ctx.correlation_id, 703 ) 704 raise 705 else: 706 if log_success: 707 ctx = AuditContext.from_call( 708 func=func, 709 instance=instance, 710 args=args, 711 kwargs=kwargs, 712 result=result, 713 exception=None, 714 ) 715 audit_logger.log_action( 716 action=ctx.action, 717 resource=ctx.resource, 718 details=ctx.details, 719 correlation_id=ctx.correlation_id, 720 ) 721 return result 722 723 return cast("Callable[P, R]", sync_wrapper) 724 725 return decorator
Audit logging decorator with zero-config smart extraction.
Automatically extracts ALL information from function signature and execution:
- Action name: from function name
- Resource: from result.id, first parameter, or function analysis
- Details: from function signature (excluding None and defaults)
- Correlation ID: from instance._correlation_id if available
- Success/failure: from exception handling
Usage:
@audited() async def create_access_key(self, name: str, port: int = 8080) -> AccessKey: # action: "create_access_key" # resource: result.id # details: {"name": "...", "port": 8080} (if not default) ...
@audited(log_success=False) async def critical_operation(self, resource_id: str) -> bool: # Only logs failures for alerting ...
Parameters
- log_success: Log successful operations (default: True)
- log_failure: Log failed operations (default: True)
Returns
Decorated function with automatic audit logging
772def build_config_overrides( 773 **kwargs: int | str | bool | float | None, 774) -> dict[str, int | str | bool | float | None]: 775 """Build configuration overrides dictionary from kwargs. 776 777 DRY implementation - single source of truth for config building. 778 779 :param kwargs: Configuration parameters 780 :return: Dictionary containing only non-None values 781 782 Example: 783 >>> overrides = build_config_overrides(timeout=20, enable_logging=True) 784 >>> # Returns: {'timeout': 20, 'enable_logging': True} 785 """ 786 valid_keys = ConfigOverrides.__annotations__.keys() 787 return {k: v for k, v in kwargs.items() if k in valid_keys and v is not None}
Build configuration overrides dictionary from kwargs.
DRY implementation - single source of truth for config building.
Parameters
- kwargs: Configuration parameters
Returns
Dictionary containing only non-None values
Example:
>>> overrides = build_config_overrides(timeout=20, enable_logging=True) >>> # Returns: {'timeout': 20, 'enable_logging': True}
888def create_client( 889 api_url: str, 890 cert_sha256: str, 891 *, 892 audit_logger: AuditLogger | None = None, 893 metrics: MetricsCollector | None = None, 894 **overrides: Unpack[ConfigOverrides], 895) -> AsyncOutlineClient: 896 """Create client with minimal parameters. 897 898 Convenience function for quick client creation without 899 explicit configuration object. Uses modern **overrides approach. 900 901 :param api_url: API URL with secret path 902 :param cert_sha256: SHA-256 certificate fingerprint 903 :param audit_logger: Custom audit logger (optional) 904 :param metrics: Custom metrics collector (optional) 905 :param overrides: Configuration overrides (timeout, retry_attempts, etc.) 906 :return: Configured client instance (use with async context manager) 907 :raises ConfigurationError: If parameters are invalid 908 909 Example (advanced, prefer from_env for production): 910 >>> async with AsyncOutlineClient.from_env() as client: 911 ... info = await client.get_server_info() 912 """ 913 return AsyncOutlineClient( 914 api_url=api_url, 915 cert_sha256=cert_sha256, 916 audit_logger=audit_logger, 917 metrics=metrics, 918 **overrides, 919 )
Create client with minimal parameters.
Convenience function for quick client creation without explicit configuration object. Uses modern **overrides approach.
Parameters
- api_url: API URL with secret path
- cert_sha256: SHA-256 certificate fingerprint
- audit_logger: Custom audit logger (optional)
- metrics: Custom metrics collector (optional)
- overrides: Configuration overrides (timeout, retry_attempts, etc.)
Returns
Configured client instance (use with async context manager)
Raises
- ConfigurationError: If parameters are invalid
Example (advanced, prefer from_env for production):
async with AsyncOutlineClient.from_env() as client: ... info = await client.get_server_info()
593def create_env_template(path: str | Path = ".env.example") -> None: 594 """Create .env template file (optimized I/O). 595 596 Performance: Uses cached template and efficient Path operations 597 598 :param path: Path to template file 599 """ 600 # Pattern matching for path handling 601 match path: 602 case str(): 603 target_path = Path(path) 604 case Path(): 605 target_path = path 606 case _: 607 raise TypeError(f"path must be str or Path, got {type(path).__name__}") 608 609 # Use cached template 610 template = _get_env_template() 611 target_path.write_text(template, encoding="utf-8") 612 613 _log_if_enabled( 614 logging.INFO, 615 f"Created configuration template: {target_path}", 616 )
Create .env template file (optimized I/O).
Performance: Uses cached template and efficient Path operations
Parameters
- path: Path to template file
922def create_multi_server_manager( 923 configs: Sequence[OutlineClientConfig], 924 *, 925 audit_logger: AuditLogger | None = None, 926 metrics: MetricsCollector | None = None, 927 default_timeout: float = _DEFAULT_SERVER_TIMEOUT, 928) -> MultiServerManager: 929 """Create multiserver manager with configurations. 930 931 Convenience function for creating a manager for multiple servers. 932 933 :param configs: Sequence of server configurations 934 :param audit_logger: Shared audit logger 935 :param metrics: Shared metrics collector 936 :param default_timeout: Default operation timeout 937 :return: MultiServerManager instance (use with async context manager) 938 :raises ConfigurationError: If configurations are invalid 939 940 Example: 941 >>> configs = [ 942 ... OutlineClientConfig.create_minimal("https://s1.com/path", "a" * 64), 943 ... OutlineClientConfig.create_minimal("https://s2.com/path", "b" * 64), 944 ... ] 945 >>> async with create_multi_server_manager(configs) as manager: 946 ... health = await manager.health_check_all() 947 """ 948 return MultiServerManager( 949 configs=configs, 950 audit_logger=audit_logger, 951 metrics=metrics, 952 default_timeout=default_timeout, 953 )
Create multiserver manager with configurations.
Convenience function for creating a manager for multiple servers.
Parameters
- configs: Sequence of server configurations
- audit_logger: Shared audit logger
- metrics: Shared metrics collector
- default_timeout: Default operation timeout
Returns
MultiServerManager instance (use with async context manager)
Raises
- ConfigurationError: If configurations are invalid
Example:
>>> configs = [ ... OutlineClientConfig.create_minimal("https://s1.com/path", "a" * 64), ... OutlineClientConfig.create_minimal("https://s2.com/path", "b" * 64), ... ] >>> async with create_multi_server_manager(configs) as manager: ... health = await manager.health_check_all()
606def format_error_chain(error: Exception) -> list[dict[str, Any]]: 607 """Format exception chain for structured logging. 608 609 Args: 610 error: Exception to format 611 612 Returns: 613 List of error dictionaries ordered from root to leaf 614 615 Example: 616 >>> try: 617 ... raise ValueError("Inner") from KeyError("Outer") 618 ... except Exception as e: 619 ... chain = format_error_chain(e) 620 ... len(chain) # 2 621 """ 622 # Pre-allocate with reasonable size hint (most chains are 1-3 errors) 623 chain: list[dict[str, Any]] = [] 624 current: BaseException | None = error 625 626 while current is not None: 627 chain.append(get_safe_error_dict(current)) 628 current = current.__cause__ or current.__context__ 629 630 return chain
Format exception chain for structured logging.
Arguments:
- error: Exception to format
Returns:
List of error dictionaries ordered from root to leaf
Example:
>>> try: ... raise ValueError("Inner") from KeyError("Outer") ... except Exception as e: ... chain = format_error_chain(e) ... len(chain) # 2
778def get_audit_logger() -> AuditLogger | None: 779 """Get audit logger from current context. 780 781 :return: Audit logger instance or None 782 """ 783 return _audit_logger_context.get()
Get audit logger from current context.
Returns
Audit logger instance or None
786def get_or_create_audit_logger(instance_id: int | None = None) -> AuditLogger: 787 """Get or create audit logger with weak reference caching. 788 789 :param instance_id: Instance ID for caching (optional) 790 :return: Audit logger instance 791 """ 792 # Try context first 793 ctx_logger = _audit_logger_context.get() 794 if ctx_logger is not None: 795 return ctx_logger 796 797 # Try cache if instance_id provided 798 if instance_id is not None: 799 cached = _logger_cache.get(instance_id) 800 if cached is not None: 801 return cached 802 803 # Create new logger 804 logger_instance = DefaultAuditLogger() 805 806 # Cache if instance_id provided 807 if instance_id is not None: 808 _logger_cache[instance_id] = cast(AuditLogger, logger_instance) 809 810 return cast(AuditLogger, logger_instance)
Get or create audit logger with weak reference caching.
Parameters
- instance_id: Instance ID for caching (optional)
Returns
Audit logger instance
504def get_retry_delay(error: Exception) -> float | None: 505 """Get suggested retry delay for an error. 506 507 Args: 508 error: Exception to check 509 510 Returns: 511 Retry delay in seconds, or None if not retryable 512 513 Example: 514 >>> error = OutlineTimeoutError("Timeout") 515 >>> get_retry_delay(error) # 2.0 516 """ 517 if not isinstance(error, OutlineError): 518 return None 519 if not error.is_retryable: 520 return None 521 return error.default_retry_delay
Get suggested retry delay for an error.
Arguments:
- error: Exception to check
Returns:
Retry delay in seconds, or None if not retryable
Example:
>>> error = OutlineTimeoutError("Timeout") >>> get_retry_delay(error) # 2.0
542def get_safe_error_dict(error: BaseException) -> dict[str, Any]: 543 """Extract safe error information for logging. 544 545 Returns only safe information without sensitive data. 546 547 Args: 548 error: Exception to convert 549 550 Returns: 551 Safe error dictionary suitable for logging 552 553 Example: 554 >>> error = APIError("Not found", status_code=404) 555 >>> get_safe_error_dict(error) 556 {'type': 'APIError', 'message': 'Not found', 'status_code': 404, ...} 557 """ 558 result: dict[str, Any] = { 559 "type": type(error).__name__, 560 "message": str(error), 561 } 562 563 if not isinstance(error, OutlineError): 564 return result 565 566 result.update( 567 { 568 "retryable": error.is_retryable, 569 "retry_delay": error.default_retry_delay, 570 "safe_details": error.safe_details, 571 } 572 ) 573 574 match error: 575 case APIError(): 576 result["status_code"] = error.status_code 577 # Only compute these if status_code is not None 578 if error.status_code is not None: 579 result["is_client_error"] = error.is_client_error 580 result["is_server_error"] = error.is_server_error 581 case CircuitOpenError(): 582 result["retry_after"] = error.retry_after 583 case ConfigurationError(): 584 if error.field is not None: 585 result["field"] = error.field 586 result["security_issue"] = error.security_issue 587 case ValidationError(): 588 if error.field is not None: 589 result["field"] = error.field 590 if error.model is not None: 591 result["model"] = error.model 592 case OutlineConnectionError(): 593 if error.host is not None: 594 result["host"] = error.host 595 if error.port is not None: 596 result["port"] = error.port 597 case OutlineTimeoutError(): 598 if error.timeout is not None: 599 result["timeout"] = error.timeout 600 if error.operation is not None: 601 result["operation"] = error.operation 602 603 return result
Extract safe error information for logging.
Returns only safe information without sensitive data.
Arguments:
- error: Exception to convert
Returns:
Safe error dictionary suitable for logging
Example:
>>> error = APIError("Not found", status_code=404) >>> get_safe_error_dict(error) {'type': 'APIError', 'message': 'Not found', 'status_code': 404, ...}
258def get_version() -> str: 259 """Get package version string. 260 261 :return: Package version 262 """ 263 return __version__
Get package version string.
Returns
Package version
413def is_json_serializable(value: object) -> TypeGuard[JsonValue]: 414 """Type guard for JSON-serializable values. 415 416 :param value: Value to check 417 :return: True if value is JSON-serializable 418 """ 419 if value is None or isinstance(value, str | int | float | bool): 420 return True 421 if isinstance(value, dict): 422 return all( 423 isinstance(k, str) and is_json_serializable(v) for k, v in value.items() 424 ) 425 if isinstance(value, list): 426 return all(is_json_serializable(item) for item in value) 427 return False
Type guard for JSON-serializable values.
Parameters
- value: Value to check
Returns
True if value is JSON-serializable
524def is_retryable(error: Exception) -> bool: 525 """Check if error should be retried. 526 527 Args: 528 error: Exception to check 529 530 Returns: 531 True if error is retryable 532 533 Example: 534 >>> error = APIError("Server error", status_code=503) 535 >>> is_retryable(error) # True 536 """ 537 if isinstance(error, OutlineError): 538 return error.is_retryable 539 return False
Check if error should be retried.
Arguments:
- error: Exception to check
Returns:
True if error is retryable
Example:
>>> error = APIError("Server error", status_code=503) >>> is_retryable(error) # True
404def is_valid_bytes(value: object) -> TypeGuard[int]: 405 """Type guard for valid byte counts. 406 407 :param value: Value to check 408 :return: True if value is valid bytes 409 """ 410 return isinstance(value, int) and value >= 0
Type guard for valid byte counts.
Parameters
- value: Value to check
Returns
True if value is valid bytes
395def is_valid_port(value: object) -> TypeGuard[int]: 396 """Type guard for valid port numbers. 397 398 :param value: Value to check 399 :return: True if value is valid port 400 """ 401 return isinstance(value, int) and Constants.MIN_PORT <= value <= Constants.MAX_PORT
Type guard for valid port numbers.
Parameters
- value: Value to check
Returns
True if value is valid port
619def load_config( 620 environment: str = "custom", 621 **overrides: ConfigValue, 622) -> OutlineClientConfig: 623 """Load configuration for environment (optimized lookup). 624 625 :param environment: Environment name (development, production, custom) 626 :param overrides: Configuration parameters to override 627 :return: Configuration instance 628 :raises ValueError: If environment name is invalid 629 630 Example: 631 >>> config = load_config("production", timeout=20) 632 """ 633 env_lower = environment.lower() 634 635 # Fast validation with frozenset 636 if env_lower not in _VALID_ENVIRONMENTS: 637 valid_envs = ", ".join(sorted(_VALID_ENVIRONMENTS)) 638 raise ValueError(f"Invalid environment '{environment}'. Valid: {valid_envs}") 639 640 # Pattern matching for config selection (Python 3.10+) 641 config_class: type[OutlineClientConfig] 642 match env_lower: 643 case "development" | "dev": 644 config_class = DevelopmentConfig 645 case "production" | "prod": 646 config_class = ProductionConfig 647 case "custom": 648 config_class = OutlineClientConfig 649 case _: # Should never reach due to validation above 650 config_class = OutlineClientConfig 651 652 # Optimized override filtering 653 valid_keys = frozenset(ConfigOverrides.__annotations__.keys()) 654 filtered_overrides = cast( 655 ConfigOverrides, 656 {k: v for k, v in overrides.items() if k in valid_keys}, 657 ) 658 659 return config_class( # type: ignore[call-arg, unused-ignore] 660 **filtered_overrides 661 )
Load configuration for environment (optimized lookup).
Parameters
- environment: Environment name (development, production, custom)
- overrides: Configuration parameters to override
Returns
Configuration instance
Raises
- ValueError: If environment name is invalid
Example:
>>> config = load_config("production", timeout=20)
806def mask_sensitive_data( 807 data: Mapping[str, Any], 808 *, 809 sensitive_keys: frozenset[str] | None = None, 810 _depth: int = 0, 811) -> dict[str, Any]: 812 """Sensitive data masking with lazy copying and optimized recursion. 813 814 Uses lazy copying - only creates new dict when needed. 815 Includes recursion depth protection. 816 817 :param data: Data dictionary to mask 818 :param sensitive_keys: Set of sensitive key names (case-insensitive matching) 819 :param _depth: Current recursion depth (internal) 820 :return: Masked data dictionary (may be same object if no sensitive data found) 821 """ 822 # Guard against infinite recursion 823 if _depth > Constants.MAX_RECURSION_DEPTH: 824 return {"_error": "Max recursion depth exceeded"} 825 826 keys_to_mask = sensitive_keys or DEFAULT_SENSITIVE_KEYS 827 keys_lower = {k.lower() for k in keys_to_mask} 828 829 masked: dict[str, Any] | None = None 830 831 for key, value in data.items(): 832 # Check if key is sensitive 833 if key.lower() in keys_lower: 834 if masked is None: 835 masked = dict(data) 836 masked[key] = "***MASKED***" 837 continue 838 839 # Recursively handle nested dicts 840 if isinstance(value, dict): 841 nested = mask_sensitive_data( 842 value, sensitive_keys=keys_to_mask, _depth=_depth + 1 843 ) 844 if nested is not value: 845 if masked is None: 846 masked = dict(data) 847 masked[key] = nested 848 849 # Handle lists containing dicts 850 elif isinstance(value, list): 851 new_list: list[Any] = [] 852 list_modified = False 853 854 for item in value: 855 if isinstance(item, dict): 856 masked_item = mask_sensitive_data( 857 item, sensitive_keys=keys_to_mask, _depth=_depth + 1 858 ) 859 if masked_item is not item: 860 list_modified = True 861 new_list.append(masked_item) 862 else: 863 new_list.append(item) 864 865 if list_modified: 866 if masked is None: 867 masked = dict(data) 868 masked[key] = new_list 869 870 return masked if masked is not None else dict(data)
Sensitive data masking with lazy copying and optimized recursion.
Uses lazy copying - only creates new dict when needed. Includes recursion depth protection.
Parameters
- data: Data dictionary to mask
- sensitive_keys: Set of sensitive key names (case-insensitive matching)
- _depth: Current recursion depth (internal)
Returns
Masked data dictionary (may be same object if no sensitive data found)
277def print_type_info() -> None: 278 """Print information about available type aliases for advanced usage.""" 279 info = """ 280🎯 PyOutlineAPI Type Aliases for Advanced Usage 281=============================================== 282 283For creating custom AuditLogger: 284 from pyoutlineapi import AuditLogger, AuditDetails 285 286 class MyAuditLogger: 287 def log_action( 288 self, 289 action: str, 290 resource: str, 291 *, 292 details: AuditDetails | None = None, 293 ... 294 ) -> None: ... 295 296 async def alog_action( 297 self, 298 action: str, 299 resource: str, 300 *, 301 details: AuditDetails | None = None, 302 ... 303 ) -> None: ... 304 305For creating custom MetricsCollector: 306 from pyoutlineapi import MetricsCollector, MetricsTags 307 308 class MyMetrics: 309 def increment( 310 self, 311 metric: str, 312 *, 313 tags: MetricsTags | None = None 314 ) -> None: ... 315 316Available Type Aliases: 317 - TimestampMs, TimestampSec # Unix timestamps 318 - JsonPayload, ResponseData # JSON data types 319 - QueryParams # URL query parameters 320 - AuditDetails # Audit log details 321 - MetricsTags # Metrics tags 322 323Constants and Validators: 324 from pyoutlineapi import Constants, Validators 325 326 # Access constants 327 Constants.RETRY_STATUS_CODES 328 Constants.MIN_PORT, Constants.MAX_PORT 329 330 # Use validators 331 Validators.validate_port(8080) 332 Validators.validate_key_id("my-key") 333 334Utility Classes: 335 from pyoutlineapi import ( 336 CredentialSanitizer, 337 SecureIDGenerator, 338 ResponseParser, 339 ) 340 341 # Sanitize sensitive data 342 safe_url = CredentialSanitizer.sanitize(url) 343 344 # Generate secure IDs 345 secure_id = SecureIDGenerator.generate() 346 347 # Parse API responses 348 parsed = ResponseParser.parse(data, Model) 349 350📖 Documentation: https://github.com/orenlab/pyoutlineapi 351 """ 352 print(info)
Print information about available type aliases for advanced usage.
266def quick_setup() -> None: 267 """Create configuration template file for quick setup. 268 269 Creates `.env.example` file with all available configuration options. 270 """ 271 create_env_template() 272 print("✅ Created .env.example") 273 print("📝 Edit the file with your server details") 274 print("🚀 Then use: AsyncOutlineClient.from_env()")
Create configuration template file for quick setup.
Creates .env.example file with all available configuration options.
767def set_audit_logger(logger_instance: AuditLogger) -> None: 768 """Set audit logger for current async context. 769 770 Thread-safe and async-safe using contextvars. 771 Preferred over global state for high-load applications. 772 773 :param logger_instance: Audit logger instance 774 """ 775 _audit_logger_context.set(logger_instance)
Set audit logger for current async context.
Thread-safe and async-safe using contextvars. Preferred over global state for high-load applications.
Parameters
- logger_instance: Audit logger instance