pyoutlineapi
PyOutlineAPI: A modern, async-first Python client for the Outline VPN Server API.
Copyright (c) 2025 Denis Rozhnovskiy pytelemonbot@mail.ru All rights reserved.
This software is licensed under the MIT License.
You can find the full license text at:
Source code repository:
1""" 2PyOutlineAPI: A modern, async-first Python client for the Outline VPN Server API. 3 4Copyright (c) 2025 Denis Rozhnovskiy <pytelemonbot@mail.ru> 5All rights reserved. 6 7This software is licensed under the MIT License. 8You can find the full license text at: 9 https://opensource.org/licenses/MIT 10 11Source code repository: 12 https://github.com/orenlab/pyoutlineapi 13""" 14 15from __future__ import annotations 16 17import sys 18from importlib import metadata 19from typing import Final, TYPE_CHECKING 20 21 22def check_python_version(): 23 if sys.version_info < (3, 10): 24 raise RuntimeError("PyOutlineAPI requires Python 3.10 or higher") 25 26 27check_python_version() 28 29# Core client imports 30from .client import AsyncOutlineClient 31from .exceptions import APIError, OutlineError 32 33# Package metadata 34try: 35 __version__: str = metadata.version("pyoutlineapi") 36except metadata.PackageNotFoundError: # Fallback for development 37 __version__ = "0.3.0-dev" 38 39__author__: Final[str] = "Denis Rozhnovskiy" 40__email__: Final[str] = "pytelemonbot@mail.ru" 41__license__: Final[str] = "MIT" 42 43# Type checking imports 44if TYPE_CHECKING: 45 from .models import ( 46 AccessKey, 47 AccessKeyCreateRequest, 48 AccessKeyList, 49 AccessKeyNameRequest, 50 DataLimit, 51 DataLimitRequest, 52 ErrorResponse, 53 ExperimentalMetrics, 54 HostnameRequest, 55 MetricsEnabledRequest, 56 MetricsStatusResponse, 57 PortRequest, 58 Server, 59 ServerMetrics, 60 ServerNameRequest 61 ) 62 63# Runtime imports 64from .models import ( 65 AccessKey, 66 AccessKeyCreateRequest, 67 AccessKeyList, 68 AccessKeyNameRequest, 69 DataLimit, 70 DataLimitRequest, 71 ErrorResponse, 72 ExperimentalMetrics, 73 HostnameRequest, 74 MetricsEnabledRequest, 75 MetricsStatusResponse, 76 PortRequest, 77 Server, 78 ServerMetrics, 79 ServerNameRequest, 80) 81 82__all__: Final[list[str]] = [ 83 # Client 84 "AsyncOutlineClient", 85 "OutlineError", 86 "APIError", 87 # Models 88 "AccessKey", 89 "AccessKeyCreateRequest", 90 "AccessKeyList", 91 "AccessKeyNameRequest", 92 "DataLimit", 93 "DataLimitRequest", 94 "ErrorResponse", 95 "ExperimentalMetrics", 96 "HostnameRequest", 97 "MetricsEnabledRequest", 98 "MetricsStatusResponse", 99 "PortRequest", 100 "Server", 101 "ServerMetrics", 102 "ServerNameRequest", 103]
123class AsyncOutlineClient: 124 """ 125 Asynchronous client for the Outline VPN Server API. 126 127 Args: 128 api_url: Base URL for the Outline server API 129 cert_sha256: SHA-256 fingerprint of the server's TLS certificate 130 json_format: Return raw JSON instead of Pydantic models 131 timeout: Request timeout in seconds 132 retry_attempts: Number of retry attempts connecting to the API 133 enable_logging: Enable debug logging for API calls 134 user_agent: Custom user agent string 135 max_connections: Maximum number of connections in the pool 136 rate_limit_delay: Minimum delay between requests (seconds) 137 138 Examples: 139 >>> async def main(): 140 ... async with AsyncOutlineClient( 141 ... "https://example.com:1234/secret", 142 ... "ab12cd34...", 143 ... enable_logging=True 144 ... ) as client: 145 ... server_info = await client.get_server_info() 146 ... print(f"Server: {server_info.name}") 147 ... 148 ... # Or use as context manager factory 149 ... async with AsyncOutlineClient.create( 150 ... "https://example.com:1234/secret", 151 ... "ab12cd34..." 152 ... ) as client: 153 ... await client.get_server_info() 154 155 """ 156 157 def __init__( 158 self, 159 api_url: str, 160 cert_sha256: str, 161 *, 162 json_format: bool = False, 163 timeout: int = 30, 164 retry_attempts: int = 3, 165 enable_logging: bool = False, 166 user_agent: Optional[str] = None, 167 max_connections: int = 10, 168 rate_limit_delay: float = 0.0, 169 ) -> None: 170 171 # Validate api_url 172 if not api_url or not api_url.strip(): 173 raise ValueError("api_url cannot be empty or whitespace") 174 175 # Validate cert_sha256 176 if not cert_sha256 or not cert_sha256.strip(): 177 raise ValueError("cert_sha256 cannot be empty or whitespace") 178 179 # Additional validation for cert_sha256 format (should be hex) 180 cert_sha256_clean = cert_sha256.strip() 181 if not all(c in '0123456789abcdefABCDEF' for c in cert_sha256_clean): 182 raise ValueError("cert_sha256 must contain only hexadecimal characters") 183 184 # Check cert_sha256 length (SHA-256 should be 64 hex characters) 185 if len(cert_sha256_clean) != 64: 186 raise ValueError("cert_sha256 must be exactly 64 hexadecimal characters (SHA-256)") 187 188 self._api_url = api_url.rstrip("/") 189 self._cert_sha256 = cert_sha256 190 self._json_format = json_format 191 self._timeout = aiohttp.ClientTimeout(total=timeout) 192 self._ssl_context: Optional[Fingerprint] = None 193 self._session: Optional[aiohttp.ClientSession] = None 194 self._retry_attempts = retry_attempts 195 self._enable_logging = enable_logging 196 self._user_agent = user_agent or f"PyOutlineAPI/0.3.0" 197 self._max_connections = max_connections 198 self._rate_limit_delay = rate_limit_delay 199 self._last_request_time: float = 0.0 200 201 # Health check state 202 self._last_health_check: float = 0.0 203 self._health_check_interval: float = 300.0 # 5 minutes 204 self._is_healthy: bool = True 205 206 if enable_logging: 207 self._setup_logging() 208 209 @staticmethod 210 def _setup_logging() -> None: 211 """Setup logging configuration if not already configured.""" 212 if not logger.handlers: 213 handler = logging.StreamHandler() 214 formatter = logging.Formatter( 215 '%(asctime)s - %(name)s - %(levelname)s - %(message)s' 216 ) 217 handler.setFormatter(formatter) 218 logger.addHandler(handler) 219 logger.setLevel(logging.DEBUG) 220 221 @classmethod 222 @asynccontextmanager 223 async def create( 224 cls, 225 api_url: str, 226 cert_sha256: str, 227 **kwargs 228 ) -> AsyncGenerator[AsyncOutlineClient, None]: 229 """ 230 Factory method that returns an async context manager. 231 232 This is the recommended way to create clients for one-off operations. 233 """ 234 client = cls(api_url, cert_sha256, **kwargs) 235 async with client: 236 yield client 237 238 async def __aenter__(self) -> AsyncOutlineClient: 239 """Set up client session.""" 240 headers = {"User-Agent": self._user_agent} 241 242 connector = aiohttp.TCPConnector( 243 ssl=self._get_ssl_context(), 244 limit=self._max_connections, 245 limit_per_host=self._max_connections // 2, 246 enable_cleanup_closed=True, 247 ) 248 249 self._session = aiohttp.ClientSession( 250 timeout=self._timeout, 251 raise_for_status=False, 252 connector=connector, 253 headers=headers, 254 ) 255 256 if self._enable_logging: 257 logger.info(f"Initialized OutlineAPI client for {self._api_url}") 258 259 return self 260 261 async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 262 """Clean up client session.""" 263 if self._session: 264 await self._session.close() 265 self._session = None 266 267 if self._enable_logging: 268 logger.info("OutlineAPI client session closed") 269 270 async def _apply_rate_limiting(self) -> None: 271 """Apply rate limiting if configured.""" 272 if self._rate_limit_delay <= 0: 273 return 274 275 time_since_last = time.time() - self._last_request_time 276 if time_since_last < self._rate_limit_delay: 277 delay = self._rate_limit_delay - time_since_last 278 await asyncio.sleep(delay) 279 280 self._last_request_time = time.time() 281 282 async def health_check(self, force: bool = False) -> bool: 283 """ 284 Perform a health check on the Outline server. 285 286 Args: 287 force: Force health check even if recently performed 288 289 Returns: 290 True if server is healthy 291 """ 292 current_time = time.time() 293 294 if not force and (current_time - self._last_health_check) < self._health_check_interval: 295 return self._is_healthy 296 297 try: 298 await self.get_server_info() 299 self._is_healthy = True 300 if self._enable_logging: 301 logger.info("Health check passed") 302 303 return self._is_healthy 304 except Exception as e: 305 self._is_healthy = False 306 if self._enable_logging: 307 logger.warning(f"Health check failed: {e}") 308 309 return self._is_healthy 310 finally: 311 self._last_health_check = current_time 312 313 @overload 314 async def _parse_response( 315 self, 316 response: ClientResponse, 317 model: type[BaseModel], 318 json_format: Literal[True], 319 ) -> JsonDict: 320 ... 321 322 @overload 323 async def _parse_response( 324 self, 325 response: ClientResponse, 326 model: type[BaseModel], 327 json_format: Literal[False], 328 ) -> BaseModel: 329 ... 330 331 @overload 332 async def _parse_response( 333 self, response: ClientResponse, model: type[BaseModel], json_format: bool 334 ) -> ResponseType: 335 ... 336 337 @ensure_context 338 async def _parse_response( 339 self, 340 response: ClientResponse, 341 model: type[BaseModel], 342 json_format: bool = False, 343 ) -> ResponseType: 344 """ 345 Parse and validate API response data. 346 347 Args: 348 response: API response to parse 349 model: Pydantic model for validation 350 json_format: Whether to return raw JSON 351 352 Returns: 353 Validated response data 354 355 Raises: 356 ValueError: If response validation fails 357 """ 358 try: 359 data = await response.json() 360 validated = model.model_validate(data) 361 return validated.model_dump(by_alias=True) if json_format else validated 362 except aiohttp.ContentTypeError as content_error: 363 raise ValueError("Invalid response format") from content_error 364 except Exception as exception: 365 raise ValueError(f"Validation error: {exception}") from exception 366 367 @staticmethod 368 async def _handle_error_response(response: ClientResponse) -> None: 369 """Handle error responses from the API.""" 370 try: 371 error_data = await response.json() 372 error = ErrorResponse.model_validate(error_data) 373 raise APIError(f"{error.code}: {error.message}", response.status) 374 except (ValueError, aiohttp.ContentTypeError): 375 raise APIError( 376 f"HTTP {response.status}: {response.reason}", response.status 377 ) 378 379 @ensure_context 380 async def _request( 381 self, 382 method: str, 383 endpoint: str, 384 *, 385 json: Any = None, 386 params: Optional[JsonDict] = None, 387 ) -> Any: 388 """Make an API request.""" 389 await self._apply_rate_limiting() 390 url = self._build_url(endpoint) 391 return await self._make_request(method, url, json, params) 392 393 async def _make_request( 394 self, 395 method: str, 396 url: str, 397 json: Any = None, 398 params: Optional[JsonDict] = None, 399 ) -> Any: 400 """Internal method to execute the actual request with retry logic.""" 401 402 async def _do_request() -> Any: 403 if self._enable_logging: 404 # Don't log sensitive data 405 safe_url = url.split('?')[0] if '?' in url else url 406 logger.debug(f"Making {method} request to {safe_url}") 407 408 async with self._session.request( 409 method, 410 url, 411 json=json, 412 params=params, 413 raise_for_status=False, 414 ) as response: 415 if self._enable_logging: 416 logger.debug(f"Response: {response.status} {response.reason}") 417 418 if response.status >= 400: 419 await self._handle_error_response(response) 420 421 if response.status == 204: 422 return True 423 424 try: 425 # See #b1746e6 426 await response.json() 427 return response 428 except Exception as exception: 429 raise APIError( 430 f"Failed to process response: {exception}", response.status 431 ) 432 433 return await self._retry_request(_do_request, attempts=self._retry_attempts) 434 435 @staticmethod 436 async def _retry_request( 437 request_func: Callable[[], Awaitable[T]], 438 *, 439 attempts: int = DEFAULT_RETRY_ATTEMPTS, 440 delay: float = DEFAULT_RETRY_DELAY, 441 ) -> T: 442 """ 443 Execute request with retry logic. 444 445 Args: 446 request_func: Async function to execute 447 attempts: Maximum number of retry attempts 448 delay: Delay between retries in seconds 449 450 Returns: 451 Response from the successful request 452 453 Raises: 454 APIError: If all retry attempts fail 455 """ 456 last_error = None 457 458 for attempt in range(attempts): 459 try: 460 return await request_func() 461 except (aiohttp.ClientError, APIError) as error: 462 last_error = error 463 464 # Don't retry if it's not a retriable error 465 if isinstance(error, APIError) and ( 466 error.status_code not in RETRY_STATUS_CODES 467 ): 468 raise 469 470 # Don't sleep on the last attempt 471 if attempt < attempts - 1: 472 await asyncio.sleep(delay * (attempt + 1)) 473 474 raise APIError( 475 f"Request failed after {attempts} attempts: {last_error}", 476 getattr(last_error, "status_code", None), 477 ) 478 479 def _build_url(self, endpoint: str) -> str: 480 """Build and validate the full URL for the API request.""" 481 if not isinstance(endpoint, str): 482 raise ValueError("Endpoint must be a string") 483 484 url = f"{self._api_url}/{endpoint.lstrip('/')}" 485 parsed_url = urlparse(url) 486 487 if not all([parsed_url.scheme, parsed_url.netloc]): 488 raise ValueError(f"Invalid URL: {url}") 489 490 return url 491 492 def _get_ssl_context(self) -> Optional[Fingerprint]: 493 """Create an SSL context if a certificate fingerprint is provided.""" 494 if not self._cert_sha256: 495 return None 496 497 try: 498 return Fingerprint(binascii.unhexlify(self._cert_sha256)) 499 except binascii.Error as validation_error: 500 raise ValueError( 501 f"Invalid certificate SHA256: {self._cert_sha256}" 502 ) from validation_error 503 except Exception as exception: 504 raise OutlineError("Failed to create SSL context") from exception 505 506 # Server Management Methods 507 508 @log_method_call 509 async def get_server_info(self) -> Union[JsonDict, Server]: 510 """ 511 Get server information. 512 513 Returns: 514 Server information including name, ID, and configuration. 515 516 Examples: 517 >>> async def main(): 518 ... async with AsyncOutlineClient( 519 ... "https://example.com:1234/secret", 520 ... "ab12cd34..." 521 ... ) as client: 522 ... server = await client.get_server_info() 523 ... print(f"Server {server.name} running version {server.version}") 524 """ 525 response = await self._request("GET", "server") 526 return await self._parse_response( 527 response, Server, json_format=self._json_format 528 ) 529 530 @log_method_call 531 async def rename_server(self, name: str) -> bool: 532 """ 533 Rename the server. 534 535 Args: 536 name: New server name 537 538 Returns: 539 True if successful 540 541 Examples: 542 >>> async def main(): 543 ... async with AsyncOutlineClient( 544 ... "https://example.com:1234/secret", 545 ... "ab12cd34..." 546 ... ) as client: 547 ... success = await client.rename_server("My VPN Server") 548 ... if success: 549 ... print("Server renamed successfully") 550 """ 551 request = ServerNameRequest(name=name) 552 return await self._request( 553 "PUT", "name", json=request.model_dump(by_alias=True) 554 ) 555 556 @log_method_call 557 async def set_hostname(self, hostname: str) -> bool: 558 """ 559 Set server hostname for access keys. 560 561 Args: 562 hostname: New hostname or IP address 563 564 Returns: 565 True if successful 566 567 Raises: 568 APIError: If hostname is invalid 569 570 Examples: 571 >>> async def main(): 572 ... async with AsyncOutlineClient( 573 ... "https://example.com:1234/secret", 574 ... "ab12cd34..." 575 ... ) as client: 576 ... await client.set_hostname("vpn.example.com") 577 ... # Or use IP address 578 ... await client.set_hostname("203.0.113.1") 579 """ 580 request = HostnameRequest(hostname=hostname) 581 return await self._request( 582 "PUT", 583 "server/hostname-for-access-keys", 584 json=request.model_dump(by_alias=True), 585 ) 586 587 @log_method_call 588 async def set_default_port(self, port: int) -> bool: 589 """ 590 Set default port for new access keys. 591 592 Args: 593 port: Port number (1025-65535) 594 595 Returns: 596 True if successful 597 598 Raises: 599 APIError: If port is invalid or in use 600 601 Examples: 602 >>> async def main(): 603 ... async with AsyncOutlineClient( 604 ... "https://example.com:1234/secret", 605 ... "ab12cd34..." 606 ... ) as client: 607 ... await client.set_default_port(8388) 608 """ 609 if port < MIN_PORT or port > MAX_PORT: 610 raise ValueError( 611 f"Privileged ports are not allowed. Use range: {MIN_PORT}-{MAX_PORT}" 612 ) 613 614 request = PortRequest(port=port) 615 return await self._request( 616 "PUT", 617 "server/port-for-new-access-keys", 618 json=request.model_dump(by_alias=True), 619 ) 620 621 # Metrics Methods 622 623 @log_method_call 624 async def get_metrics_status(self) -> Union[JsonDict, MetricsStatusResponse]: 625 """ 626 Get whether metrics collection is enabled. 627 628 Returns: 629 Current metrics collection status 630 631 Examples: 632 >>> async def main(): 633 ... async with AsyncOutlineClient( 634 ... "https://example.com:1234/secret", 635 ... "ab12cd34..." 636 ... ) as client: 637 ... status = await client.get_metrics_status() 638 ... if status.metrics_enabled: 639 ... print("Metrics collection is enabled") 640 """ 641 response = await self._request("GET", "metrics/enabled") 642 return await self._parse_response( 643 response, MetricsStatusResponse, json_format=self._json_format 644 ) 645 646 @log_method_call 647 async def set_metrics_status(self, enabled: bool) -> bool: 648 """ 649 Enable or disable metrics collection. 650 651 Args: 652 enabled: Whether to enable metrics 653 654 Returns: 655 True if successful 656 657 Examples: 658 >>> async def main(): 659 ... async with AsyncOutlineClient( 660 ... "https://example.com:1234/secret", 661 ... "ab12cd34..." 662 ... ) as client: 663 ... # Enable metrics 664 ... await client.set_metrics_status(True) 665 ... # Check new status 666 ... status = await client.get_metrics_status() 667 """ 668 request = MetricsEnabledRequest(metricsEnabled=enabled) 669 return await self._request( 670 "PUT", "metrics/enabled", json=request.model_dump(by_alias=True) 671 ) 672 673 @log_method_call 674 async def get_transfer_metrics(self) -> Union[JsonDict, ServerMetrics]: 675 """ 676 Get transfer metrics for all access keys. 677 678 Returns: 679 Transfer metrics data for each access key 680 681 Examples: 682 >>> async def main(): 683 ... async with AsyncOutlineClient( 684 ... "https://example.com:1234/secret", 685 ... "ab12cd34..." 686 ... ) as client: 687 ... metrics = await client.get_transfer_metrics() 688 ... for user_id, bytes_transferred in metrics.bytes_transferred_by_user_id.items(): 689 ... print(f"User {user_id}: {bytes_transferred / 1024**3:.2f} GB") 690 """ 691 response = await self._request("GET", "metrics/transfer") 692 return await self._parse_response( 693 response, ServerMetrics, json_format=self._json_format 694 ) 695 696 @log_method_call 697 async def get_experimental_metrics( 698 self, since: str 699 ) -> Union[JsonDict, ExperimentalMetrics]: 700 """ 701 Get experimental server metrics. 702 703 Args: 704 since: Required time range filter (e.g., "24h", "7d", "30d", or ISO timestamp) 705 706 Returns: 707 Detailed server and access key metrics 708 709 Examples: 710 >>> async def main(): 711 ... async with AsyncOutlineClient( 712 ... "https://example.com:1234/secret", 713 ... "ab12cd34..." 714 ... ) as client: 715 ... # Get metrics for the last 24 hours 716 ... metrics = await client.get_experimental_metrics("24h") 717 ... print(f"Server tunnel time: {metrics.server.tunnel_time.seconds}s") 718 ... print(f"Server data transferred: {metrics.server.data_transferred.bytes} bytes") 719 ... 720 ... # Get metrics for the last 7 days 721 ... metrics = await client.get_experimental_metrics("7d") 722 ... 723 ... # Get metrics since specific timestamp 724 ... metrics = await client.get_experimental_metrics("2024-01-01T00:00:00Z") 725 """ 726 if not since or not since.strip(): 727 raise ValueError("Parameter 'since' is required and cannot be empty") 728 729 params = {"since": since} 730 response = await self._request( 731 "GET", "experimental/server/metrics", params=params 732 ) 733 return await self._parse_response( 734 response, ExperimentalMetrics, json_format=self._json_format 735 ) 736 737 # Access Key Management Methods 738 739 @log_method_call 740 async def create_access_key( 741 self, 742 *, 743 name: Optional[str] = None, 744 password: Optional[str] = None, 745 port: Optional[int] = None, 746 method: Optional[str] = None, 747 limit: Optional[DataLimit] = None, 748 ) -> Union[JsonDict, AccessKey]: 749 """ 750 Create a new access key. 751 752 Args: 753 name: Optional key name 754 password: Optional password 755 port: Optional port number (1-65535) 756 method: Optional encryption method 757 limit: Optional data transfer limit 758 759 Returns: 760 New access key details 761 762 Examples: 763 >>> async def main(): 764 ... async with AsyncOutlineClient( 765 ... "https://example.com:1234/secret", 766 ... "ab12cd34..." 767 ... ) as client: 768 ... # Create basic key 769 ... key = await client.create_access_key(name="User 1") 770 ... 771 ... # Create key with data limit 772 ... lim = DataLimit(bytes=5 * 1024**3) # 5 GB 773 ... key = await client.create_access_key( 774 ... name="Limited User", 775 ... port=8388, 776 ... limit=lim 777 ... ) 778 ... print(f"Created key: {key.access_url}") 779 """ 780 request = AccessKeyCreateRequest( 781 name=name, password=password, port=port, method=method, limit=limit 782 ) 783 response = await self._request( 784 "POST", 785 "access-keys", 786 json=request.model_dump(exclude_none=True, by_alias=True), 787 ) 788 return await self._parse_response( 789 response, AccessKey, json_format=self._json_format 790 ) 791 792 @log_method_call 793 async def create_access_key_with_id( 794 self, 795 key_id: str, 796 *, 797 name: Optional[str] = None, 798 password: Optional[str] = None, 799 port: Optional[int] = None, 800 method: Optional[str] = None, 801 limit: Optional[DataLimit] = None, 802 ) -> Union[JsonDict, AccessKey]: 803 """ 804 Create a new access key with specific ID. 805 806 Args: 807 key_id: Specific ID for the access key 808 name: Optional key name 809 password: Optional password 810 port: Optional port number (1-65535) 811 method: Optional encryption method 812 limit: Optional data transfer limit 813 814 Returns: 815 New access key details 816 817 Examples: 818 >>> async def main(): 819 ... async with AsyncOutlineClient( 820 ... "https://example.com:1234/secret", 821 ... "ab12cd34..." 822 ... ) as client: 823 ... key = await client.create_access_key_with_id( 824 ... "my-custom-id", 825 ... name="Custom Key" 826 ... ) 827 """ 828 request = AccessKeyCreateRequest( 829 name=name, password=password, port=port, method=method, limit=limit 830 ) 831 response = await self._request( 832 "PUT", 833 f"access-keys/{key_id}", 834 json=request.model_dump(exclude_none=True, by_alias=True), 835 ) 836 return await self._parse_response( 837 response, AccessKey, json_format=self._json_format 838 ) 839 840 @log_method_call 841 async def get_access_keys(self) -> Union[JsonDict, AccessKeyList]: 842 """ 843 Get all access keys. 844 845 Returns: 846 List of all access keys 847 848 Examples: 849 >>> async def main(): 850 ... async with AsyncOutlineClient( 851 ... "https://example.com:1234/secret", 852 ... "ab12cd34..." 853 ... ) as client: 854 ... keys = await client.get_access_keys() 855 ... for key in keys.access_keys: 856 ... print(f"Key {key.id}: {key.name or 'unnamed'}") 857 ... if key.data_limit: 858 ... print(f" Limit: {key.data_limit.bytes / 1024**3:.1f} GB") 859 """ 860 response = await self._request("GET", "access-keys") 861 return await self._parse_response( 862 response, AccessKeyList, json_format=self._json_format 863 ) 864 865 @log_method_call 866 async def get_access_key(self, key_id: str) -> Union[JsonDict, AccessKey]: 867 """ 868 Get specific access key. 869 870 Args: 871 key_id: Access key ID 872 873 Returns: 874 Access key details 875 876 Raises: 877 APIError: If key doesn't exist 878 879 Examples: 880 >>> async def main(): 881 ... async with AsyncOutlineClient( 882 ... "https://example.com:1234/secret", 883 ... "ab12cd34..." 884 ... ) as client: 885 ... key = await client.get_access_key("1") 886 ... print(f"Port: {key.port}") 887 ... print(f"URL: {key.access_url}") 888 """ 889 response = await self._request("GET", f"access-keys/{key_id}") 890 return await self._parse_response( 891 response, AccessKey, json_format=self._json_format 892 ) 893 894 @log_method_call 895 async def rename_access_key(self, key_id: str, name: str) -> bool: 896 """ 897 Rename access key. 898 899 Args: 900 key_id: Access key ID 901 name: New name 902 903 Returns: 904 True if successful 905 906 Raises: 907 APIError: If key doesn't exist 908 909 Examples: 910 >>> async def main(): 911 ... async with AsyncOutlineClient( 912 ... "https://example.com:1234/secret", 913 ... "ab12cd34..." 914 ... ) as client: 915 ... # Rename key 916 ... await client.rename_access_key("1", "Alice") 917 ... 918 ... # Verify new name 919 ... key = await client.get_access_key("1") 920 ... assert key.name == "Alice" 921 """ 922 request = AccessKeyNameRequest(name=name) 923 return await self._request( 924 "PUT", f"access-keys/{key_id}/name", json=request.model_dump(by_alias=True) 925 ) 926 927 @log_method_call 928 async def delete_access_key(self, key_id: str) -> bool: 929 """ 930 Delete access key. 931 932 Args: 933 key_id: Access key ID 934 935 Returns: 936 True if successful 937 938 Raises: 939 APIError: If key doesn't exist 940 941 Examples: 942 >>> async def main(): 943 ... async with AsyncOutlineClient( 944 ... "https://example.com:1234/secret", 945 ... "ab12cd34..." 946 ... ) as client: 947 ... if await client.delete_access_key("1"): 948 ... print("Key deleted") 949 """ 950 return await self._request("DELETE", f"access-keys/{key_id}") 951 952 @log_method_call 953 async def set_access_key_data_limit(self, key_id: str, bytes_limit: int) -> bool: 954 """ 955 Set data transfer limit for access key. 956 957 Args: 958 key_id: Access key ID 959 bytes_limit: Limit in bytes (must be non-negative) 960 961 Returns: 962 True if successful 963 964 Raises: 965 APIError: If key doesn't exist or limit is invalid 966 967 Examples: 968 >>> async def main(): 969 ... async with AsyncOutlineClient( 970 ... "https://example.com:1234/secret", 971 ... "ab12cd34..." 972 ... ) as client: 973 ... # Set 5 GB limit 974 ... limit = 5 * 1024**3 # 5 GB in bytes 975 ... await client.set_access_key_data_limit("1", limit) 976 ... 977 ... # Verify limit 978 ... key = await client.get_access_key("1") 979 ... assert key.data_limit and key.data_limit.bytes == limit 980 """ 981 request = DataLimitRequest(limit=DataLimit(bytes=bytes_limit)) 982 return await self._request( 983 "PUT", 984 f"access-keys/{key_id}/data-limit", 985 json=request.model_dump(by_alias=True), 986 ) 987 988 @log_method_call 989 async def remove_access_key_data_limit(self, key_id: str) -> bool: 990 """ 991 Remove data transfer limit from access key. 992 993 Args: 994 key_id: Access key ID 995 996 Returns: 997 True if successful 998 999 Raises: 1000 APIError: If key doesn't exist 1001 1002 Examples: 1003 >>> async def main(): 1004 ... async with AsyncOutlineClient( 1005 ... "https://example.com:1234/secret", 1006 ... "ab12cd34..." 1007 ... ) as client: 1008 ... await client.remove_access_key_data_limit("1") 1009 """ 1010 return await self._request("DELETE", f"access-keys/{key_id}/data-limit") 1011 1012 # Global Data Limit Methods 1013 1014 @log_method_call 1015 async def set_global_data_limit(self, bytes_limit: int) -> bool: 1016 """ 1017 Set global data transfer limit for all access keys. 1018 1019 Args: 1020 bytes_limit: Limit in bytes (must be non-negative) 1021 1022 Returns: 1023 True if successful 1024 1025 Examples: 1026 >>> async def main(): 1027 ... async with AsyncOutlineClient( 1028 ... "https://example.com:1234/secret", 1029 ... "ab12cd34..." 1030 ... ) as client: 1031 ... # Set 100 GB global limit 1032 ... await client.set_global_data_limit(100 * 1024**3) 1033 """ 1034 request = DataLimitRequest(limit=DataLimit(bytes=bytes_limit)) 1035 return await self._request( 1036 "PUT", 1037 "server/access-key-data-limit", 1038 json=request.model_dump(by_alias=True), 1039 ) 1040 1041 @log_method_call 1042 async def remove_global_data_limit(self) -> bool: 1043 """ 1044 Remove global data transfer limit. 1045 1046 Returns: 1047 True if successful 1048 1049 Examples: 1050 >>> async def main(): 1051 ... async with AsyncOutlineClient( 1052 ... "https://example.com:1234/secret", 1053 ... "ab12cd34..." 1054 ... ) as client: 1055 ... await client.remove_global_data_limit() 1056 """ 1057 return await self._request("DELETE", "server/access-key-data-limit") 1058 1059 # Batch Operations 1060 1061 async def batch_create_access_keys( 1062 self, 1063 keys_config: list[dict[str, Any]], 1064 fail_fast: bool = True 1065 ) -> list[Union[AccessKey, Exception]]: 1066 """ 1067 Create multiple access keys in batch. 1068 1069 Args: 1070 keys_config: List of key configurations (same as create_access_key kwargs) 1071 fail_fast: If True, stop on first error. If False, continue and return errors. 1072 1073 Returns: 1074 List of created keys or exceptions 1075 1076 Examples: 1077 >>> async def main(): 1078 ... async with AsyncOutlineClient( 1079 ... "https://example.com:1234/secret", 1080 ... "ab12cd34..." 1081 ... ) as client: 1082 ... configs = [ 1083 ... {"name": "User1", "limit": DataLimit(bytes=1024**3)}, 1084 ... {"name": "User2", "port": 8388}, 1085 ... ] 1086 ... res = await client.batch_create_access_keys(configs) 1087 """ 1088 results = [] 1089 1090 for config in keys_config: 1091 try: 1092 key = await self.create_access_key(**config) 1093 results.append(key) 1094 except Exception as e: 1095 if fail_fast: 1096 raise 1097 results.append(e) 1098 1099 return results 1100 1101 async def get_server_summary(self, metrics_since: str = "24h") -> dict[str, Any]: 1102 """ 1103 Get comprehensive server summary including info, metrics, and key count. 1104 1105 Args: 1106 metrics_since: Time range for experimental metrics (default: "24h") 1107 1108 Returns: 1109 Dictionary with server info, health status, and statistics 1110 """ 1111 summary = {} 1112 1113 try: 1114 # Get basic server info 1115 server_info = await self.get_server_info() 1116 summary["server"] = server_info.model_dump() if isinstance(server_info, BaseModel) else server_info 1117 1118 # Get access keys count 1119 keys = await self.get_access_keys() 1120 key_list = keys.access_keys if isinstance(keys, BaseModel) else keys.get("accessKeys", []) 1121 summary["access_keys_count"] = len(key_list) 1122 1123 # Get metrics if available 1124 try: 1125 metrics_status = await self.get_metrics_status() 1126 if (isinstance(metrics_status, BaseModel) and metrics_status.metrics_enabled) or \ 1127 (isinstance(metrics_status, dict) and metrics_status.get("metricsEnabled")): 1128 transfer_metrics = await self.get_transfer_metrics() 1129 summary["transfer_metrics"] = transfer_metrics.model_dump() if isinstance(transfer_metrics, 1130 BaseModel) else transfer_metrics 1131 1132 # Try to get experimental metrics 1133 try: 1134 experimental_metrics = await self.get_experimental_metrics(metrics_since) 1135 summary["experimental_metrics"] = experimental_metrics.model_dump() if isinstance( 1136 experimental_metrics, 1137 BaseModel) else experimental_metrics 1138 except Exception: 1139 summary["experimental_metrics"] = None 1140 except Exception: 1141 summary["transfer_metrics"] = None 1142 summary["experimental_metrics"] = None 1143 1144 summary["healthy"] = True 1145 1146 except Exception as e: 1147 summary["healthy"] = False 1148 summary["error"] = str(e) 1149 1150 return summary 1151 1152 # Utility and management methods 1153 1154 def configure_logging(self, level: str = "INFO", format_string: Optional[str] = None) -> None: 1155 """ 1156 Configure logging for the client. 1157 1158 Args: 1159 level: Logging level (DEBUG, INFO, WARNING, ERROR) 1160 format_string: Custom format string for log messages 1161 """ 1162 self._enable_logging = True 1163 1164 # Clear existing handlers 1165 logger.handlers.clear() 1166 1167 handler = logging.StreamHandler() 1168 if format_string: 1169 formatter = logging.Formatter(format_string) 1170 else: 1171 formatter = logging.Formatter( 1172 '%(asctime)s - %(name)s - %(levelname)s - %(message)s' 1173 ) 1174 handler.setFormatter(formatter) 1175 logger.addHandler(handler) 1176 logger.setLevel(getattr(logging, level.upper())) 1177 1178 @property 1179 def is_healthy(self) -> bool: 1180 """Check if the last health check passed.""" 1181 return self._is_healthy 1182 1183 @property 1184 def session(self) -> Optional[aiohttp.ClientSession]: 1185 """Access the current client session.""" 1186 return self._session 1187 1188 @property 1189 def api_url(self) -> str: 1190 """Get the API URL (without sensitive parts).""" 1191 from urllib.parse import urlparse 1192 parsed = urlparse(self._api_url) 1193 return f"{parsed.scheme}://{parsed.netloc}" 1194 1195 def __repr__(self) -> str: 1196 """String representation of the client.""" 1197 status = "connected" if self._session and not self._session.closed else "disconnected" 1198 return f"AsyncOutlineClient(url={self.api_url}, status={status})"
Asynchronous client for the Outline VPN Server API.
Arguments:
- api_url: Base URL for the Outline server API
- cert_sha256: SHA-256 fingerprint of the server's TLS certificate
- json_format: Return raw JSON instead of Pydantic models
- timeout: Request timeout in seconds
- retry_attempts: Number of retry attempts connecting to the API
- enable_logging: Enable debug logging for API calls
- user_agent: Custom user agent string
- max_connections: Maximum number of connections in the pool
- rate_limit_delay: Minimum delay between requests (seconds)
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34...", ... enable_logging=True ... ) as client: ... server_info = await client.get_server_info() ... print(f"Server: {server_info.name}") ... ... # Or use as context manager factory ... async with AsyncOutlineClient.create( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... await client.get_server_info()
157 def __init__( 158 self, 159 api_url: str, 160 cert_sha256: str, 161 *, 162 json_format: bool = False, 163 timeout: int = 30, 164 retry_attempts: int = 3, 165 enable_logging: bool = False, 166 user_agent: Optional[str] = None, 167 max_connections: int = 10, 168 rate_limit_delay: float = 0.0, 169 ) -> None: 170 171 # Validate api_url 172 if not api_url or not api_url.strip(): 173 raise ValueError("api_url cannot be empty or whitespace") 174 175 # Validate cert_sha256 176 if not cert_sha256 or not cert_sha256.strip(): 177 raise ValueError("cert_sha256 cannot be empty or whitespace") 178 179 # Additional validation for cert_sha256 format (should be hex) 180 cert_sha256_clean = cert_sha256.strip() 181 if not all(c in '0123456789abcdefABCDEF' for c in cert_sha256_clean): 182 raise ValueError("cert_sha256 must contain only hexadecimal characters") 183 184 # Check cert_sha256 length (SHA-256 should be 64 hex characters) 185 if len(cert_sha256_clean) != 64: 186 raise ValueError("cert_sha256 must be exactly 64 hexadecimal characters (SHA-256)") 187 188 self._api_url = api_url.rstrip("/") 189 self._cert_sha256 = cert_sha256 190 self._json_format = json_format 191 self._timeout = aiohttp.ClientTimeout(total=timeout) 192 self._ssl_context: Optional[Fingerprint] = None 193 self._session: Optional[aiohttp.ClientSession] = None 194 self._retry_attempts = retry_attempts 195 self._enable_logging = enable_logging 196 self._user_agent = user_agent or f"PyOutlineAPI/0.3.0" 197 self._max_connections = max_connections 198 self._rate_limit_delay = rate_limit_delay 199 self._last_request_time: float = 0.0 200 201 # Health check state 202 self._last_health_check: float = 0.0 203 self._health_check_interval: float = 300.0 # 5 minutes 204 self._is_healthy: bool = True 205 206 if enable_logging: 207 self._setup_logging()
221 @classmethod 222 @asynccontextmanager 223 async def create( 224 cls, 225 api_url: str, 226 cert_sha256: str, 227 **kwargs 228 ) -> AsyncGenerator[AsyncOutlineClient, None]: 229 """ 230 Factory method that returns an async context manager. 231 232 This is the recommended way to create clients for one-off operations. 233 """ 234 client = cls(api_url, cert_sha256, **kwargs) 235 async with client: 236 yield client
Factory method that returns an async context manager.
This is the recommended way to create clients for one-off operations.
282 async def health_check(self, force: bool = False) -> bool: 283 """ 284 Perform a health check on the Outline server. 285 286 Args: 287 force: Force health check even if recently performed 288 289 Returns: 290 True if server is healthy 291 """ 292 current_time = time.time() 293 294 if not force and (current_time - self._last_health_check) < self._health_check_interval: 295 return self._is_healthy 296 297 try: 298 await self.get_server_info() 299 self._is_healthy = True 300 if self._enable_logging: 301 logger.info("Health check passed") 302 303 return self._is_healthy 304 except Exception as e: 305 self._is_healthy = False 306 if self._enable_logging: 307 logger.warning(f"Health check failed: {e}") 308 309 return self._is_healthy 310 finally: 311 self._last_health_check = current_time
Perform a health check on the Outline server.
Arguments:
- force: Force health check even if recently performed
Returns:
True if server is healthy
508 @log_method_call 509 async def get_server_info(self) -> Union[JsonDict, Server]: 510 """ 511 Get server information. 512 513 Returns: 514 Server information including name, ID, and configuration. 515 516 Examples: 517 >>> async def main(): 518 ... async with AsyncOutlineClient( 519 ... "https://example.com:1234/secret", 520 ... "ab12cd34..." 521 ... ) as client: 522 ... server = await client.get_server_info() 523 ... print(f"Server {server.name} running version {server.version}") 524 """ 525 response = await self._request("GET", "server") 526 return await self._parse_response( 527 response, Server, json_format=self._json_format 528 )
Get server information.
Returns:
Server information including name, ID, and configuration.
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... server = await client.get_server_info() ... print(f"Server {server.name} running version {server.version}")
530 @log_method_call 531 async def rename_server(self, name: str) -> bool: 532 """ 533 Rename the server. 534 535 Args: 536 name: New server name 537 538 Returns: 539 True if successful 540 541 Examples: 542 >>> async def main(): 543 ... async with AsyncOutlineClient( 544 ... "https://example.com:1234/secret", 545 ... "ab12cd34..." 546 ... ) as client: 547 ... success = await client.rename_server("My VPN Server") 548 ... if success: 549 ... print("Server renamed successfully") 550 """ 551 request = ServerNameRequest(name=name) 552 return await self._request( 553 "PUT", "name", json=request.model_dump(by_alias=True) 554 )
Rename the server.
Arguments:
- name: New server name
Returns:
True if successful
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... success = await client.rename_server("My VPN Server") ... if success: ... print("Server renamed successfully")
556 @log_method_call 557 async def set_hostname(self, hostname: str) -> bool: 558 """ 559 Set server hostname for access keys. 560 561 Args: 562 hostname: New hostname or IP address 563 564 Returns: 565 True if successful 566 567 Raises: 568 APIError: If hostname is invalid 569 570 Examples: 571 >>> async def main(): 572 ... async with AsyncOutlineClient( 573 ... "https://example.com:1234/secret", 574 ... "ab12cd34..." 575 ... ) as client: 576 ... await client.set_hostname("vpn.example.com") 577 ... # Or use IP address 578 ... await client.set_hostname("203.0.113.1") 579 """ 580 request = HostnameRequest(hostname=hostname) 581 return await self._request( 582 "PUT", 583 "server/hostname-for-access-keys", 584 json=request.model_dump(by_alias=True), 585 )
Set server hostname for access keys.
Arguments:
- hostname: New hostname or IP address
Returns:
True if successful
Raises:
- APIError: If hostname is invalid
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... await client.set_hostname("vpn.example.com") ... # Or use IP address ... await client.set_hostname("203.0.113.1")
587 @log_method_call 588 async def set_default_port(self, port: int) -> bool: 589 """ 590 Set default port for new access keys. 591 592 Args: 593 port: Port number (1025-65535) 594 595 Returns: 596 True if successful 597 598 Raises: 599 APIError: If port is invalid or in use 600 601 Examples: 602 >>> async def main(): 603 ... async with AsyncOutlineClient( 604 ... "https://example.com:1234/secret", 605 ... "ab12cd34..." 606 ... ) as client: 607 ... await client.set_default_port(8388) 608 """ 609 if port < MIN_PORT or port > MAX_PORT: 610 raise ValueError( 611 f"Privileged ports are not allowed. Use range: {MIN_PORT}-{MAX_PORT}" 612 ) 613 614 request = PortRequest(port=port) 615 return await self._request( 616 "PUT", 617 "server/port-for-new-access-keys", 618 json=request.model_dump(by_alias=True), 619 )
Set default port for new access keys.
Arguments:
- port: Port number (1025-65535)
Returns:
True if successful
Raises:
- APIError: If port is invalid or in use
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... await client.set_default_port(8388)
623 @log_method_call 624 async def get_metrics_status(self) -> Union[JsonDict, MetricsStatusResponse]: 625 """ 626 Get whether metrics collection is enabled. 627 628 Returns: 629 Current metrics collection status 630 631 Examples: 632 >>> async def main(): 633 ... async with AsyncOutlineClient( 634 ... "https://example.com:1234/secret", 635 ... "ab12cd34..." 636 ... ) as client: 637 ... status = await client.get_metrics_status() 638 ... if status.metrics_enabled: 639 ... print("Metrics collection is enabled") 640 """ 641 response = await self._request("GET", "metrics/enabled") 642 return await self._parse_response( 643 response, MetricsStatusResponse, json_format=self._json_format 644 )
Get whether metrics collection is enabled.
Returns:
Current metrics collection status
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... status = await client.get_metrics_status() ... if status.metrics_enabled: ... print("Metrics collection is enabled")
646 @log_method_call 647 async def set_metrics_status(self, enabled: bool) -> bool: 648 """ 649 Enable or disable metrics collection. 650 651 Args: 652 enabled: Whether to enable metrics 653 654 Returns: 655 True if successful 656 657 Examples: 658 >>> async def main(): 659 ... async with AsyncOutlineClient( 660 ... "https://example.com:1234/secret", 661 ... "ab12cd34..." 662 ... ) as client: 663 ... # Enable metrics 664 ... await client.set_metrics_status(True) 665 ... # Check new status 666 ... status = await client.get_metrics_status() 667 """ 668 request = MetricsEnabledRequest(metricsEnabled=enabled) 669 return await self._request( 670 "PUT", "metrics/enabled", json=request.model_dump(by_alias=True) 671 )
Enable or disable metrics collection.
Arguments:
- enabled: Whether to enable metrics
Returns:
True if successful
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... # Enable metrics ... await client.set_metrics_status(True) ... # Check new status ... status = await client.get_metrics_status()
673 @log_method_call 674 async def get_transfer_metrics(self) -> Union[JsonDict, ServerMetrics]: 675 """ 676 Get transfer metrics for all access keys. 677 678 Returns: 679 Transfer metrics data for each access key 680 681 Examples: 682 >>> async def main(): 683 ... async with AsyncOutlineClient( 684 ... "https://example.com:1234/secret", 685 ... "ab12cd34..." 686 ... ) as client: 687 ... metrics = await client.get_transfer_metrics() 688 ... for user_id, bytes_transferred in metrics.bytes_transferred_by_user_id.items(): 689 ... print(f"User {user_id}: {bytes_transferred / 1024**3:.2f} GB") 690 """ 691 response = await self._request("GET", "metrics/transfer") 692 return await self._parse_response( 693 response, ServerMetrics, json_format=self._json_format 694 )
Get transfer metrics for all access keys.
Returns:
Transfer metrics data for each access key
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... metrics = await client.get_transfer_metrics() ... for user_id, bytes_transferred in metrics.bytes_transferred_by_user_id.items(): ... print(f"User {user_id}: {bytes_transferred / 1024**3:.2f} GB")
696 @log_method_call 697 async def get_experimental_metrics( 698 self, since: str 699 ) -> Union[JsonDict, ExperimentalMetrics]: 700 """ 701 Get experimental server metrics. 702 703 Args: 704 since: Required time range filter (e.g., "24h", "7d", "30d", or ISO timestamp) 705 706 Returns: 707 Detailed server and access key metrics 708 709 Examples: 710 >>> async def main(): 711 ... async with AsyncOutlineClient( 712 ... "https://example.com:1234/secret", 713 ... "ab12cd34..." 714 ... ) as client: 715 ... # Get metrics for the last 24 hours 716 ... metrics = await client.get_experimental_metrics("24h") 717 ... print(f"Server tunnel time: {metrics.server.tunnel_time.seconds}s") 718 ... print(f"Server data transferred: {metrics.server.data_transferred.bytes} bytes") 719 ... 720 ... # Get metrics for the last 7 days 721 ... metrics = await client.get_experimental_metrics("7d") 722 ... 723 ... # Get metrics since specific timestamp 724 ... metrics = await client.get_experimental_metrics("2024-01-01T00:00:00Z") 725 """ 726 if not since or not since.strip(): 727 raise ValueError("Parameter 'since' is required and cannot be empty") 728 729 params = {"since": since} 730 response = await self._request( 731 "GET", "experimental/server/metrics", params=params 732 ) 733 return await self._parse_response( 734 response, ExperimentalMetrics, json_format=self._json_format 735 )
Get experimental server metrics.
Arguments:
- since: Required time range filter (e.g., "24h", "7d", "30d", or ISO timestamp)
Returns:
Detailed server and access key metrics
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... # Get metrics for the last 24 hours ... metrics = await client.get_experimental_metrics("24h") ... print(f"Server tunnel time: {metrics.server.tunnel_time.seconds}s") ... print(f"Server data transferred: {metrics.server.data_transferred.bytes} bytes") ... ... # Get metrics for the last 7 days ... metrics = await client.get_experimental_metrics("7d") ... ... # Get metrics since specific timestamp ... metrics = await client.get_experimental_metrics("2024-01-01T00:00:00Z")
739 @log_method_call 740 async def create_access_key( 741 self, 742 *, 743 name: Optional[str] = None, 744 password: Optional[str] = None, 745 port: Optional[int] = None, 746 method: Optional[str] = None, 747 limit: Optional[DataLimit] = None, 748 ) -> Union[JsonDict, AccessKey]: 749 """ 750 Create a new access key. 751 752 Args: 753 name: Optional key name 754 password: Optional password 755 port: Optional port number (1-65535) 756 method: Optional encryption method 757 limit: Optional data transfer limit 758 759 Returns: 760 New access key details 761 762 Examples: 763 >>> async def main(): 764 ... async with AsyncOutlineClient( 765 ... "https://example.com:1234/secret", 766 ... "ab12cd34..." 767 ... ) as client: 768 ... # Create basic key 769 ... key = await client.create_access_key(name="User 1") 770 ... 771 ... # Create key with data limit 772 ... lim = DataLimit(bytes=5 * 1024**3) # 5 GB 773 ... key = await client.create_access_key( 774 ... name="Limited User", 775 ... port=8388, 776 ... limit=lim 777 ... ) 778 ... print(f"Created key: {key.access_url}") 779 """ 780 request = AccessKeyCreateRequest( 781 name=name, password=password, port=port, method=method, limit=limit 782 ) 783 response = await self._request( 784 "POST", 785 "access-keys", 786 json=request.model_dump(exclude_none=True, by_alias=True), 787 ) 788 return await self._parse_response( 789 response, AccessKey, json_format=self._json_format 790 )
Create a new access key.
Arguments:
- name: Optional key name
- password: Optional password
- port: Optional port number (1-65535)
- method: Optional encryption method
- limit: Optional data transfer limit
Returns:
New access key details
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... # Create basic key ... key = await client.create_access_key(name="User 1") ... ... # Create key with data limit ... lim = DataLimit(bytes=5 * 1024**3) # 5 GB ... key = await client.create_access_key( ... name="Limited User", ... port=8388, ... limit=lim ... ) ... print(f"Created key: {key.access_url}")
792 @log_method_call 793 async def create_access_key_with_id( 794 self, 795 key_id: str, 796 *, 797 name: Optional[str] = None, 798 password: Optional[str] = None, 799 port: Optional[int] = None, 800 method: Optional[str] = None, 801 limit: Optional[DataLimit] = None, 802 ) -> Union[JsonDict, AccessKey]: 803 """ 804 Create a new access key with specific ID. 805 806 Args: 807 key_id: Specific ID for the access key 808 name: Optional key name 809 password: Optional password 810 port: Optional port number (1-65535) 811 method: Optional encryption method 812 limit: Optional data transfer limit 813 814 Returns: 815 New access key details 816 817 Examples: 818 >>> async def main(): 819 ... async with AsyncOutlineClient( 820 ... "https://example.com:1234/secret", 821 ... "ab12cd34..." 822 ... ) as client: 823 ... key = await client.create_access_key_with_id( 824 ... "my-custom-id", 825 ... name="Custom Key" 826 ... ) 827 """ 828 request = AccessKeyCreateRequest( 829 name=name, password=password, port=port, method=method, limit=limit 830 ) 831 response = await self._request( 832 "PUT", 833 f"access-keys/{key_id}", 834 json=request.model_dump(exclude_none=True, by_alias=True), 835 ) 836 return await self._parse_response( 837 response, AccessKey, json_format=self._json_format 838 )
Create a new access key with specific ID.
Arguments:
- key_id: Specific ID for the access key
- name: Optional key name
- password: Optional password
- port: Optional port number (1-65535)
- method: Optional encryption method
- limit: Optional data transfer limit
Returns:
New access key details
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... key = await client.create_access_key_with_id( ... "my-custom-id", ... name="Custom Key" ... )
840 @log_method_call 841 async def get_access_keys(self) -> Union[JsonDict, AccessKeyList]: 842 """ 843 Get all access keys. 844 845 Returns: 846 List of all access keys 847 848 Examples: 849 >>> async def main(): 850 ... async with AsyncOutlineClient( 851 ... "https://example.com:1234/secret", 852 ... "ab12cd34..." 853 ... ) as client: 854 ... keys = await client.get_access_keys() 855 ... for key in keys.access_keys: 856 ... print(f"Key {key.id}: {key.name or 'unnamed'}") 857 ... if key.data_limit: 858 ... print(f" Limit: {key.data_limit.bytes / 1024**3:.1f} GB") 859 """ 860 response = await self._request("GET", "access-keys") 861 return await self._parse_response( 862 response, AccessKeyList, json_format=self._json_format 863 )
Get all access keys.
Returns:
List of all access keys
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... keys = await client.get_access_keys() ... for key in keys.access_keys: ... print(f"Key {key.id}: {key.name or 'unnamed'}") ... if key.data_limit: ... print(f" Limit: {key.data_limit.bytes / 1024**3:.1f} GB")
865 @log_method_call 866 async def get_access_key(self, key_id: str) -> Union[JsonDict, AccessKey]: 867 """ 868 Get specific access key. 869 870 Args: 871 key_id: Access key ID 872 873 Returns: 874 Access key details 875 876 Raises: 877 APIError: If key doesn't exist 878 879 Examples: 880 >>> async def main(): 881 ... async with AsyncOutlineClient( 882 ... "https://example.com:1234/secret", 883 ... "ab12cd34..." 884 ... ) as client: 885 ... key = await client.get_access_key("1") 886 ... print(f"Port: {key.port}") 887 ... print(f"URL: {key.access_url}") 888 """ 889 response = await self._request("GET", f"access-keys/{key_id}") 890 return await self._parse_response( 891 response, AccessKey, json_format=self._json_format 892 )
Get specific access key.
Arguments:
- key_id: Access key ID
Returns:
Access key details
Raises:
- APIError: If key doesn't exist
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... key = await client.get_access_key("1") ... print(f"Port: {key.port}") ... print(f"URL: {key.access_url}")
894 @log_method_call 895 async def rename_access_key(self, key_id: str, name: str) -> bool: 896 """ 897 Rename access key. 898 899 Args: 900 key_id: Access key ID 901 name: New name 902 903 Returns: 904 True if successful 905 906 Raises: 907 APIError: If key doesn't exist 908 909 Examples: 910 >>> async def main(): 911 ... async with AsyncOutlineClient( 912 ... "https://example.com:1234/secret", 913 ... "ab12cd34..." 914 ... ) as client: 915 ... # Rename key 916 ... await client.rename_access_key("1", "Alice") 917 ... 918 ... # Verify new name 919 ... key = await client.get_access_key("1") 920 ... assert key.name == "Alice" 921 """ 922 request = AccessKeyNameRequest(name=name) 923 return await self._request( 924 "PUT", f"access-keys/{key_id}/name", json=request.model_dump(by_alias=True) 925 )
Rename access key.
Arguments:
- key_id: Access key ID
- name: New name
Returns:
True if successful
Raises:
- APIError: If key doesn't exist
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... # Rename key ... await client.rename_access_key("1", "Alice") ... ... # Verify new name ... key = await client.get_access_key("1") ... assert key.name == "Alice"
927 @log_method_call 928 async def delete_access_key(self, key_id: str) -> bool: 929 """ 930 Delete access key. 931 932 Args: 933 key_id: Access key ID 934 935 Returns: 936 True if successful 937 938 Raises: 939 APIError: If key doesn't exist 940 941 Examples: 942 >>> async def main(): 943 ... async with AsyncOutlineClient( 944 ... "https://example.com:1234/secret", 945 ... "ab12cd34..." 946 ... ) as client: 947 ... if await client.delete_access_key("1"): 948 ... print("Key deleted") 949 """ 950 return await self._request("DELETE", f"access-keys/{key_id}")
Delete access key.
Arguments:
- key_id: Access key ID
Returns:
True if successful
Raises:
- APIError: If key doesn't exist
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... if await client.delete_access_key("1"): ... print("Key deleted")
952 @log_method_call 953 async def set_access_key_data_limit(self, key_id: str, bytes_limit: int) -> bool: 954 """ 955 Set data transfer limit for access key. 956 957 Args: 958 key_id: Access key ID 959 bytes_limit: Limit in bytes (must be non-negative) 960 961 Returns: 962 True if successful 963 964 Raises: 965 APIError: If key doesn't exist or limit is invalid 966 967 Examples: 968 >>> async def main(): 969 ... async with AsyncOutlineClient( 970 ... "https://example.com:1234/secret", 971 ... "ab12cd34..." 972 ... ) as client: 973 ... # Set 5 GB limit 974 ... limit = 5 * 1024**3 # 5 GB in bytes 975 ... await client.set_access_key_data_limit("1", limit) 976 ... 977 ... # Verify limit 978 ... key = await client.get_access_key("1") 979 ... assert key.data_limit and key.data_limit.bytes == limit 980 """ 981 request = DataLimitRequest(limit=DataLimit(bytes=bytes_limit)) 982 return await self._request( 983 "PUT", 984 f"access-keys/{key_id}/data-limit", 985 json=request.model_dump(by_alias=True), 986 )
Set data transfer limit for access key.
Arguments:
- key_id: Access key ID
- bytes_limit: Limit in bytes (must be non-negative)
Returns:
True if successful
Raises:
- APIError: If key doesn't exist or limit is invalid
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... # Set 5 GB limit ... limit = 5 * 1024**3 # 5 GB in bytes ... await client.set_access_key_data_limit("1", limit) ... ... # Verify limit ... key = await client.get_access_key("1") ... assert key.data_limit and key.data_limit.bytes == limit
988 @log_method_call 989 async def remove_access_key_data_limit(self, key_id: str) -> bool: 990 """ 991 Remove data transfer limit from access key. 992 993 Args: 994 key_id: Access key ID 995 996 Returns: 997 True if successful 998 999 Raises: 1000 APIError: If key doesn't exist 1001 1002 Examples: 1003 >>> async def main(): 1004 ... async with AsyncOutlineClient( 1005 ... "https://example.com:1234/secret", 1006 ... "ab12cd34..." 1007 ... ) as client: 1008 ... await client.remove_access_key_data_limit("1") 1009 """ 1010 return await self._request("DELETE", f"access-keys/{key_id}/data-limit")
Remove data transfer limit from access key.
Arguments:
- key_id: Access key ID
Returns:
True if successful
Raises:
- APIError: If key doesn't exist
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... await client.remove_access_key_data_limit("1")
1014 @log_method_call 1015 async def set_global_data_limit(self, bytes_limit: int) -> bool: 1016 """ 1017 Set global data transfer limit for all access keys. 1018 1019 Args: 1020 bytes_limit: Limit in bytes (must be non-negative) 1021 1022 Returns: 1023 True if successful 1024 1025 Examples: 1026 >>> async def main(): 1027 ... async with AsyncOutlineClient( 1028 ... "https://example.com:1234/secret", 1029 ... "ab12cd34..." 1030 ... ) as client: 1031 ... # Set 100 GB global limit 1032 ... await client.set_global_data_limit(100 * 1024**3) 1033 """ 1034 request = DataLimitRequest(limit=DataLimit(bytes=bytes_limit)) 1035 return await self._request( 1036 "PUT", 1037 "server/access-key-data-limit", 1038 json=request.model_dump(by_alias=True), 1039 )
Set global data transfer limit for all access keys.
Arguments:
- bytes_limit: Limit in bytes (must be non-negative)
Returns:
True if successful
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... # Set 100 GB global limit ... await client.set_global_data_limit(100 * 1024**3)
1041 @log_method_call 1042 async def remove_global_data_limit(self) -> bool: 1043 """ 1044 Remove global data transfer limit. 1045 1046 Returns: 1047 True if successful 1048 1049 Examples: 1050 >>> async def main(): 1051 ... async with AsyncOutlineClient( 1052 ... "https://example.com:1234/secret", 1053 ... "ab12cd34..." 1054 ... ) as client: 1055 ... await client.remove_global_data_limit() 1056 """ 1057 return await self._request("DELETE", "server/access-key-data-limit")
Remove global data transfer limit.
Returns:
True if successful
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... await client.remove_global_data_limit()
1061 async def batch_create_access_keys( 1062 self, 1063 keys_config: list[dict[str, Any]], 1064 fail_fast: bool = True 1065 ) -> list[Union[AccessKey, Exception]]: 1066 """ 1067 Create multiple access keys in batch. 1068 1069 Args: 1070 keys_config: List of key configurations (same as create_access_key kwargs) 1071 fail_fast: If True, stop on first error. If False, continue and return errors. 1072 1073 Returns: 1074 List of created keys or exceptions 1075 1076 Examples: 1077 >>> async def main(): 1078 ... async with AsyncOutlineClient( 1079 ... "https://example.com:1234/secret", 1080 ... "ab12cd34..." 1081 ... ) as client: 1082 ... configs = [ 1083 ... {"name": "User1", "limit": DataLimit(bytes=1024**3)}, 1084 ... {"name": "User2", "port": 8388}, 1085 ... ] 1086 ... res = await client.batch_create_access_keys(configs) 1087 """ 1088 results = [] 1089 1090 for config in keys_config: 1091 try: 1092 key = await self.create_access_key(**config) 1093 results.append(key) 1094 except Exception as e: 1095 if fail_fast: 1096 raise 1097 results.append(e) 1098 1099 return results
Create multiple access keys in batch.
Arguments:
- keys_config: List of key configurations (same as create_access_key kwargs)
- fail_fast: If True, stop on first error. If False, continue and return errors.
Returns:
List of created keys or exceptions
Examples:
>>> async def main(): ... async with AsyncOutlineClient( ... "https://example.com:1234/secret", ... "ab12cd34..." ... ) as client: ... configs = [ ... {"name": "User1", "limit": DataLimit(bytes=1024**3)}, ... {"name": "User2", "port": 8388}, ... ] ... res = await client.batch_create_access_keys(configs)
1101 async def get_server_summary(self, metrics_since: str = "24h") -> dict[str, Any]: 1102 """ 1103 Get comprehensive server summary including info, metrics, and key count. 1104 1105 Args: 1106 metrics_since: Time range for experimental metrics (default: "24h") 1107 1108 Returns: 1109 Dictionary with server info, health status, and statistics 1110 """ 1111 summary = {} 1112 1113 try: 1114 # Get basic server info 1115 server_info = await self.get_server_info() 1116 summary["server"] = server_info.model_dump() if isinstance(server_info, BaseModel) else server_info 1117 1118 # Get access keys count 1119 keys = await self.get_access_keys() 1120 key_list = keys.access_keys if isinstance(keys, BaseModel) else keys.get("accessKeys", []) 1121 summary["access_keys_count"] = len(key_list) 1122 1123 # Get metrics if available 1124 try: 1125 metrics_status = await self.get_metrics_status() 1126 if (isinstance(metrics_status, BaseModel) and metrics_status.metrics_enabled) or \ 1127 (isinstance(metrics_status, dict) and metrics_status.get("metricsEnabled")): 1128 transfer_metrics = await self.get_transfer_metrics() 1129 summary["transfer_metrics"] = transfer_metrics.model_dump() if isinstance(transfer_metrics, 1130 BaseModel) else transfer_metrics 1131 1132 # Try to get experimental metrics 1133 try: 1134 experimental_metrics = await self.get_experimental_metrics(metrics_since) 1135 summary["experimental_metrics"] = experimental_metrics.model_dump() if isinstance( 1136 experimental_metrics, 1137 BaseModel) else experimental_metrics 1138 except Exception: 1139 summary["experimental_metrics"] = None 1140 except Exception: 1141 summary["transfer_metrics"] = None 1142 summary["experimental_metrics"] = None 1143 1144 summary["healthy"] = True 1145 1146 except Exception as e: 1147 summary["healthy"] = False 1148 summary["error"] = str(e) 1149 1150 return summary
Get comprehensive server summary including info, metrics, and key count.
Arguments:
- metrics_since: Time range for experimental metrics (default: "24h")
Returns:
Dictionary with server info, health status, and statistics
1154 def configure_logging(self, level: str = "INFO", format_string: Optional[str] = None) -> None: 1155 """ 1156 Configure logging for the client. 1157 1158 Args: 1159 level: Logging level (DEBUG, INFO, WARNING, ERROR) 1160 format_string: Custom format string for log messages 1161 """ 1162 self._enable_logging = True 1163 1164 # Clear existing handlers 1165 logger.handlers.clear() 1166 1167 handler = logging.StreamHandler() 1168 if format_string: 1169 formatter = logging.Formatter(format_string) 1170 else: 1171 formatter = logging.Formatter( 1172 '%(asctime)s - %(name)s - %(levelname)s - %(message)s' 1173 ) 1174 handler.setFormatter(formatter) 1175 logger.addHandler(handler) 1176 logger.setLevel(getattr(logging, level.upper()))
Configure logging for the client.
Arguments:
- level: Logging level (DEBUG, INFO, WARNING, ERROR)
- format_string: Custom format string for log messages
1178 @property 1179 def is_healthy(self) -> bool: 1180 """Check if the last health check passed.""" 1181 return self._is_healthy
Check if the last health check passed.
Base exception for Outline client errors.
23class APIError(OutlineError): 24 """Raised when API requests fail.""" 25 26 def __init__( 27 self, 28 message: str, 29 status_code: Optional[int] = None, 30 attempt: Optional[int] = None, 31 ) -> None: 32 super().__init__(message) 33 self.status_code = status_code 34 self.attempt = attempt 35 36 def __str__(self) -> str: 37 msg = super().__str__() 38 if self.attempt is not None: 39 msg = f"[Attempt {self.attempt}] {msg}" 40 return msg
Raised when API requests fail.
34class AccessKey(BaseModel): 35 """Access key details.""" 36 37 id: str = Field(description="Access key identifier") 38 name: Optional[str] = Field(None, description="Access key name") 39 password: str = Field(description="Access key password") 40 port: int = Field(gt=0, lt=65536, description="Port number") 41 method: str = Field(description="Encryption method") 42 access_url: str = Field(alias="accessUrl", description="Complete access URL") 43 data_limit: Optional[DataLimit] = Field( 44 None, alias="dataLimit", description="Data limit for this key" 45 )
Access key details.
194class AccessKeyCreateRequest(BaseModel): 195 """ 196 Request parameters for creating an access key. 197 Per OpenAPI: /access-keys POST request body 198 """ 199 200 name: Optional[str] = Field(None, description="Access key name") 201 method: Optional[str] = Field(None, description="Encryption method") 202 password: Optional[str] = Field(None, description="Access key password") 203 port: Optional[int] = Field(None, gt=0, lt=65536, description="Port number") 204 limit: Optional[DataLimit] = Field(None, description="Data limit for this key")
Request parameters for creating an access key. Per OpenAPI: /access-keys POST request body
48class AccessKeyList(BaseModel): 49 """List of access keys.""" 50 51 access_keys: list[AccessKey] = Field(alias="accessKeys")
List of access keys.
225class AccessKeyNameRequest(BaseModel): 226 """Request for renaming access key.""" 227 228 name: str = Field(description="New access key name")
Request for renaming access key.
21class DataLimit(BaseModel): 22 """Data transfer limit configuration.""" 23 24 bytes: int = Field(ge=0, description="Data limit in bytes") 25 26 @classmethod 27 @field_validator("bytes") 28 def validate_bytes(cls, v: int) -> int: 29 if v < 0: 30 raise ValueError("bytes must be non-negative") 31 return v
Data transfer limit configuration.
26 @classmethod 27 @field_validator("bytes") 28 def validate_bytes(cls, v: int) -> int: 29 if v < 0: 30 raise ValueError("bytes must be non-negative") 31 return v
Wrap a classmethod, staticmethod, property or unbound function and act as a descriptor that allows us to detect decorated items from the class' attributes.
This class' __get__ returns the wrapped item's __get__ result, which makes it transparent for classmethods and staticmethods.
Attributes:
- wrapped: The decorator that has to be wrapped.
- decorator_info: The decorator info.
- shim: A wrapper function to wrap V1 style function.
231class DataLimitRequest(BaseModel): 232 """Request for setting data limit.""" 233 234 limit: DataLimit = Field(description="Data limit configuration")
Request for setting data limit.
253class ErrorResponse(BaseModel): 254 """ 255 Error response structure. 256 Per OpenAPI: 404 and 400 responses 257 """ 258 259 code: str = Field(description="Error code") 260 message: str = Field(description="Error message")
Error response structure. Per OpenAPI: 404 and 400 responses
151class ExperimentalMetrics(BaseModel): 152 """ 153 Experimental metrics data structure. 154 Per OpenAPI: /experimental/server/metrics endpoint 155 """ 156 157 server: ServerExperimentalMetric = Field(description="Server metrics") 158 access_keys: list[AccessKeyMetric] = Field( 159 alias="accessKeys", description="Access key metrics" 160 )
Experimental metrics data structure. Per OpenAPI: /experimental/server/metrics endpoint
213class HostnameRequest(BaseModel): 214 """Request for changing hostname.""" 215 216 hostname: str = Field(description="New hostname or IP address")
Request for changing hostname.
237class MetricsEnabledRequest(BaseModel): 238 """Request for enabling/disabling metrics.""" 239 240 metrics_enabled: bool = Field( 241 alias="metricsEnabled", description="Enable or disable metrics" 242 )
Request for enabling/disabling metrics.
245class MetricsStatusResponse(BaseModel): 246 """Response for /metrics/enabled endpoint.""" 247 248 metrics_enabled: bool = Field( 249 alias="metricsEnabled", description="Current metrics status" 250 )
Response for /metrics/enabled endpoint.
219class PortRequest(BaseModel): 220 """Request for changing default port.""" 221 222 port: int = Field(gt=0, lt=65536, description="New default port")
Request for changing default port.
163class Server(BaseModel): 164 """ 165 Server information. 166 Per OpenAPI: /server endpoint schema 167 """ 168 169 name: str = Field(description="Server name") 170 server_id: str = Field(alias="serverId", description="Unique server identifier") 171 metrics_enabled: bool = Field( 172 alias="metricsEnabled", description="Metrics sharing status" 173 ) 174 created_timestamp_ms: int = Field( 175 alias="createdTimestampMs", description="Creation timestamp in milliseconds" 176 ) 177 version: str = Field(description="Server version") 178 port_for_new_access_keys: int = Field( 179 alias="portForNewAccessKeys", 180 gt=0, 181 lt=65536, 182 description="Default port for new keys", 183 ) 184 hostname_for_access_keys: Optional[str] = Field( 185 None, alias="hostnameForAccessKeys", description="Hostname for access keys" 186 ) 187 access_key_data_limit: Optional[DataLimit] = Field( 188 None, 189 alias="accessKeyDataLimit", 190 description="Global data limit for access keys", 191 )
Server information. Per OpenAPI: /server endpoint schema
54class ServerMetrics(BaseModel): 55 """ 56 Server metrics data for data transferred per access key. 57 Per OpenAPI: /metrics/transfer endpoint 58 """ 59 60 bytes_transferred_by_user_id: dict[str, int] = Field( 61 alias="bytesTransferredByUserId", 62 description="Data transferred by each access key ID", 63 )
Server metrics data for data transferred per access key. Per OpenAPI: /metrics/transfer endpoint
207class ServerNameRequest(BaseModel): 208 """Request for renaming server.""" 209 210 name: str = Field(description="New server name")
Request for renaming server.