pyoutlineapi

PyOutlineAPI: A modern, async-first Python client for the Outline VPN Server API.

Copyright (c) 2025 Denis Rozhnovskiy pytelemonbot@mail.ru All rights reserved.

This software is licensed under the MIT License.

You can find the full license text at:

https://opensource.org/licenses/MIT

Source code repository:

https://github.com/orenlab/pyoutlineapi

  1"""
  2PyOutlineAPI: A modern, async-first Python client for the Outline VPN Server API.
  3
  4Copyright (c) 2025 Denis Rozhnovskiy <pytelemonbot@mail.ru>
  5All rights reserved.
  6
  7This software is licensed under the MIT License.
  8You can find the full license text at:
  9    https://opensource.org/licenses/MIT
 10
 11Source code repository:
 12    https://github.com/orenlab/pyoutlineapi
 13"""
 14
 15from __future__ import annotations
 16
 17import sys
 18from importlib import metadata
 19from typing import Final, TYPE_CHECKING
 20
 21
 22def check_python_version():
 23    if sys.version_info < (3, 10):
 24        raise RuntimeError("PyOutlineAPI requires Python 3.10 or higher")
 25
 26
 27check_python_version()
 28
 29# Core client imports
 30from .client import AsyncOutlineClient
 31from .exceptions import APIError, OutlineError
 32
 33# Package metadata
 34try:
 35    __version__: str = metadata.version("pyoutlineapi")
 36except metadata.PackageNotFoundError:  # Fallback for development
 37    __version__ = "0.3.0-dev"
 38
 39__author__: Final[str] = "Denis Rozhnovskiy"
 40__email__: Final[str] = "pytelemonbot@mail.ru"
 41__license__: Final[str] = "MIT"
 42
 43# Type checking imports
 44if TYPE_CHECKING:
 45    from .models import (
 46        AccessKey,
 47        AccessKeyCreateRequest,
 48        AccessKeyList,
 49        AccessKeyNameRequest,
 50        DataLimit,
 51        DataLimitRequest,
 52        ErrorResponse,
 53        ExperimentalMetrics,
 54        HostnameRequest,
 55        MetricsEnabledRequest,
 56        MetricsStatusResponse,
 57        PortRequest,
 58        Server,
 59        ServerMetrics,
 60        ServerNameRequest
 61    )
 62
 63# Runtime imports
 64from .models import (
 65    AccessKey,
 66    AccessKeyCreateRequest,
 67    AccessKeyList,
 68    AccessKeyNameRequest,
 69    DataLimit,
 70    DataLimitRequest,
 71    ErrorResponse,
 72    ExperimentalMetrics,
 73    HostnameRequest,
 74    MetricsEnabledRequest,
 75    MetricsStatusResponse,
 76    PortRequest,
 77    Server,
 78    ServerMetrics,
 79    ServerNameRequest,
 80)
 81
 82__all__: Final[list[str]] = [
 83    # Client
 84    "AsyncOutlineClient",
 85    "OutlineError",
 86    "APIError",
 87    # Models
 88    "AccessKey",
 89    "AccessKeyCreateRequest",
 90    "AccessKeyList",
 91    "AccessKeyNameRequest",
 92    "DataLimit",
 93    "DataLimitRequest",
 94    "ErrorResponse",
 95    "ExperimentalMetrics",
 96    "HostnameRequest",
 97    "MetricsEnabledRequest",
 98    "MetricsStatusResponse",
 99    "PortRequest",
100    "Server",
101    "ServerMetrics",
102    "ServerNameRequest",
103]
class AsyncOutlineClient:
 123class AsyncOutlineClient:
 124    """
 125    Asynchronous client for the Outline VPN Server API.
 126
 127    Args:
 128        api_url: Base URL for the Outline server API
 129        cert_sha256: SHA-256 fingerprint of the server's TLS certificate
 130        json_format: Return raw JSON instead of Pydantic models
 131        timeout: Request timeout in seconds
 132        retry_attempts: Number of retry attempts connecting to the API
 133        enable_logging: Enable debug logging for API calls
 134        user_agent: Custom user agent string
 135        max_connections: Maximum number of connections in the pool
 136        rate_limit_delay: Minimum delay between requests (seconds)
 137
 138    Examples:
 139        >>> async def main():
 140        ...     async with AsyncOutlineClient(
 141        ...         "https://example.com:1234/secret",
 142        ...         "ab12cd34...",
 143        ...         enable_logging=True
 144        ...     ) as client:
 145        ...         server_info = await client.get_server_info()
 146        ...         print(f"Server: {server_info.name}")
 147        ...
 148        ...     # Or use as context manager factory
 149        ...     async with AsyncOutlineClient.create(
 150        ...         "https://example.com:1234/secret",
 151        ...         "ab12cd34..."
 152        ...     ) as client:
 153        ...         await client.get_server_info()
 154
 155    """
 156
 157    def __init__(
 158            self,
 159            api_url: str,
 160            cert_sha256: str,
 161            *,
 162            json_format: bool = False,
 163            timeout: int = 30,
 164            retry_attempts: int = 3,
 165            enable_logging: bool = False,
 166            user_agent: Optional[str] = None,
 167            max_connections: int = 10,
 168            rate_limit_delay: float = 0.0,
 169    ) -> None:
 170
 171        # Validate api_url
 172        if not api_url or not api_url.strip():
 173            raise ValueError("api_url cannot be empty or whitespace")
 174
 175        # Validate cert_sha256
 176        if not cert_sha256 or not cert_sha256.strip():
 177            raise ValueError("cert_sha256 cannot be empty or whitespace")
 178
 179        # Additional validation for cert_sha256 format (should be hex)
 180        cert_sha256_clean = cert_sha256.strip()
 181        if not all(c in '0123456789abcdefABCDEF' for c in cert_sha256_clean):
 182            raise ValueError("cert_sha256 must contain only hexadecimal characters")
 183
 184        # Check cert_sha256 length (SHA-256 should be 64 hex characters)
 185        if len(cert_sha256_clean) != 64:
 186            raise ValueError("cert_sha256 must be exactly 64 hexadecimal characters (SHA-256)")
 187
 188        self._api_url = api_url.rstrip("/")
 189        self._cert_sha256 = cert_sha256
 190        self._json_format = json_format
 191        self._timeout = aiohttp.ClientTimeout(total=timeout)
 192        self._ssl_context: Optional[Fingerprint] = None
 193        self._session: Optional[aiohttp.ClientSession] = None
 194        self._retry_attempts = retry_attempts
 195        self._enable_logging = enable_logging
 196        self._user_agent = user_agent or f"PyOutlineAPI/0.3.0"
 197        self._max_connections = max_connections
 198        self._rate_limit_delay = rate_limit_delay
 199        self._last_request_time: float = 0.0
 200
 201        # Health check state
 202        self._last_health_check: float = 0.0
 203        self._health_check_interval: float = 300.0  # 5 minutes
 204        self._is_healthy: bool = True
 205
 206        if enable_logging:
 207            self._setup_logging()
 208
 209    @staticmethod
 210    def _setup_logging() -> None:
 211        """Setup logging configuration if not already configured."""
 212        if not logger.handlers:
 213            handler = logging.StreamHandler()
 214            formatter = logging.Formatter(
 215                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
 216            )
 217            handler.setFormatter(formatter)
 218            logger.addHandler(handler)
 219            logger.setLevel(logging.DEBUG)
 220
 221    @classmethod
 222    @asynccontextmanager
 223    async def create(
 224            cls,
 225            api_url: str,
 226            cert_sha256: str,
 227            **kwargs
 228    ) -> AsyncGenerator[AsyncOutlineClient, None]:
 229        """
 230        Factory method that returns an async context manager.
 231
 232        This is the recommended way to create clients for one-off operations.
 233        """
 234        client = cls(api_url, cert_sha256, **kwargs)
 235        async with client:
 236            yield client
 237
 238    async def __aenter__(self) -> AsyncOutlineClient:
 239        """Set up client session."""
 240        headers = {"User-Agent": self._user_agent}
 241
 242        connector = aiohttp.TCPConnector(
 243            ssl=self._get_ssl_context(),
 244            limit=self._max_connections,
 245            limit_per_host=self._max_connections // 2,
 246            enable_cleanup_closed=True,
 247        )
 248
 249        self._session = aiohttp.ClientSession(
 250            timeout=self._timeout,
 251            raise_for_status=False,
 252            connector=connector,
 253            headers=headers,
 254        )
 255
 256        if self._enable_logging:
 257            logger.info(f"Initialized OutlineAPI client for {self._api_url}")
 258
 259        return self
 260
 261    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
 262        """Clean up client session."""
 263        if self._session:
 264            await self._session.close()
 265            self._session = None
 266
 267            if self._enable_logging:
 268                logger.info("OutlineAPI client session closed")
 269
 270    async def _apply_rate_limiting(self) -> None:
 271        """Apply rate limiting if configured."""
 272        if self._rate_limit_delay <= 0:
 273            return
 274
 275        time_since_last = time.time() - self._last_request_time
 276        if time_since_last < self._rate_limit_delay:
 277            delay = self._rate_limit_delay - time_since_last
 278            await asyncio.sleep(delay)
 279
 280        self._last_request_time = time.time()
 281
 282    async def health_check(self, force: bool = False) -> bool:
 283        """
 284        Perform a health check on the Outline server.
 285
 286        Args:
 287            force: Force health check even if recently performed
 288
 289        Returns:
 290            True if server is healthy
 291        """
 292        current_time = time.time()
 293
 294        if not force and (current_time - self._last_health_check) < self._health_check_interval:
 295            return self._is_healthy
 296
 297        try:
 298            await self.get_server_info()
 299            self._is_healthy = True
 300            if self._enable_logging:
 301                logger.info("Health check passed")
 302
 303            return self._is_healthy
 304        except Exception as e:
 305            self._is_healthy = False
 306            if self._enable_logging:
 307                logger.warning(f"Health check failed: {e}")
 308
 309            return self._is_healthy
 310        finally:
 311            self._last_health_check = current_time
 312
 313    @overload
 314    async def _parse_response(
 315            self,
 316            response: ClientResponse,
 317            model: type[BaseModel],
 318            json_format: Literal[True],
 319    ) -> JsonDict:
 320        ...
 321
 322    @overload
 323    async def _parse_response(
 324            self,
 325            response: ClientResponse,
 326            model: type[BaseModel],
 327            json_format: Literal[False],
 328    ) -> BaseModel:
 329        ...
 330
 331    @overload
 332    async def _parse_response(
 333            self, response: ClientResponse, model: type[BaseModel], json_format: bool
 334    ) -> ResponseType:
 335        ...
 336
 337    @ensure_context
 338    async def _parse_response(
 339            self,
 340            response: ClientResponse,
 341            model: type[BaseModel],
 342            json_format: bool = False,
 343    ) -> ResponseType:
 344        """
 345        Parse and validate API response data.
 346
 347        Args:
 348            response: API response to parse
 349            model: Pydantic model for validation
 350            json_format: Whether to return raw JSON
 351
 352        Returns:
 353            Validated response data
 354
 355        Raises:
 356            ValueError: If response validation fails
 357        """
 358        try:
 359            data = await response.json()
 360            validated = model.model_validate(data)
 361            return validated.model_dump(by_alias=True) if json_format else validated
 362        except aiohttp.ContentTypeError as content_error:
 363            raise ValueError("Invalid response format") from content_error
 364        except Exception as exception:
 365            raise ValueError(f"Validation error: {exception}") from exception
 366
 367    @staticmethod
 368    async def _handle_error_response(response: ClientResponse) -> None:
 369        """Handle error responses from the API."""
 370        try:
 371            error_data = await response.json()
 372            error = ErrorResponse.model_validate(error_data)
 373            raise APIError(f"{error.code}: {error.message}", response.status)
 374        except (ValueError, aiohttp.ContentTypeError):
 375            raise APIError(
 376                f"HTTP {response.status}: {response.reason}", response.status
 377            )
 378
 379    @ensure_context
 380    async def _request(
 381            self,
 382            method: str,
 383            endpoint: str,
 384            *,
 385            json: Any = None,
 386            params: Optional[JsonDict] = None,
 387    ) -> Any:
 388        """Make an API request."""
 389        await self._apply_rate_limiting()
 390        url = self._build_url(endpoint)
 391        return await self._make_request(method, url, json, params)
 392
 393    async def _make_request(
 394            self,
 395            method: str,
 396            url: str,
 397            json: Any = None,
 398            params: Optional[JsonDict] = None,
 399    ) -> Any:
 400        """Internal method to execute the actual request with retry logic."""
 401
 402        async def _do_request() -> Any:
 403            if self._enable_logging:
 404                # Don't log sensitive data
 405                safe_url = url.split('?')[0] if '?' in url else url
 406                logger.debug(f"Making {method} request to {safe_url}")
 407
 408            async with self._session.request(
 409                    method,
 410                    url,
 411                    json=json,
 412                    params=params,
 413                    raise_for_status=False,
 414            ) as response:
 415                if self._enable_logging:
 416                    logger.debug(f"Response: {response.status} {response.reason}")
 417
 418                if response.status >= 400:
 419                    await self._handle_error_response(response)
 420
 421                if response.status == 204:
 422                    return True
 423
 424                try:
 425                    # See #b1746e6
 426                    await response.json()
 427                    return response
 428                except Exception as exception:
 429                    raise APIError(
 430                        f"Failed to process response: {exception}", response.status
 431                    )
 432
 433        return await self._retry_request(_do_request, attempts=self._retry_attempts)
 434
 435    @staticmethod
 436    async def _retry_request(
 437            request_func: Callable[[], Awaitable[T]],
 438            *,
 439            attempts: int = DEFAULT_RETRY_ATTEMPTS,
 440            delay: float = DEFAULT_RETRY_DELAY,
 441    ) -> T:
 442        """
 443        Execute request with retry logic.
 444
 445        Args:
 446            request_func: Async function to execute
 447            attempts: Maximum number of retry attempts
 448            delay: Delay between retries in seconds
 449
 450        Returns:
 451            Response from the successful request
 452
 453        Raises:
 454            APIError: If all retry attempts fail
 455        """
 456        last_error = None
 457
 458        for attempt in range(attempts):
 459            try:
 460                return await request_func()
 461            except (aiohttp.ClientError, APIError) as error:
 462                last_error = error
 463
 464                # Don't retry if it's not a retriable error
 465                if isinstance(error, APIError) and (
 466                        error.status_code not in RETRY_STATUS_CODES
 467                ):
 468                    raise
 469
 470                # Don't sleep on the last attempt
 471                if attempt < attempts - 1:
 472                    await asyncio.sleep(delay * (attempt + 1))
 473
 474        raise APIError(
 475            f"Request failed after {attempts} attempts: {last_error}",
 476            getattr(last_error, "status_code", None),
 477        )
 478
 479    def _build_url(self, endpoint: str) -> str:
 480        """Build and validate the full URL for the API request."""
 481        if not isinstance(endpoint, str):
 482            raise ValueError("Endpoint must be a string")
 483
 484        url = f"{self._api_url}/{endpoint.lstrip('/')}"
 485        parsed_url = urlparse(url)
 486
 487        if not all([parsed_url.scheme, parsed_url.netloc]):
 488            raise ValueError(f"Invalid URL: {url}")
 489
 490        return url
 491
 492    def _get_ssl_context(self) -> Optional[Fingerprint]:
 493        """Create an SSL context if a certificate fingerprint is provided."""
 494        if not self._cert_sha256:
 495            return None
 496
 497        try:
 498            return Fingerprint(binascii.unhexlify(self._cert_sha256))
 499        except binascii.Error as validation_error:
 500            raise ValueError(
 501                f"Invalid certificate SHA256: {self._cert_sha256}"
 502            ) from validation_error
 503        except Exception as exception:
 504            raise OutlineError("Failed to create SSL context") from exception
 505
 506    # Server Management Methods
 507
 508    @log_method_call
 509    async def get_server_info(self) -> Union[JsonDict, Server]:
 510        """
 511        Get server information.
 512
 513        Returns:
 514            Server information including name, ID, and configuration.
 515
 516        Examples:
 517            >>> async def main():
 518            ...     async with AsyncOutlineClient(
 519            ...         "https://example.com:1234/secret",
 520            ...         "ab12cd34..."
 521            ...     ) as client:
 522            ...         server = await client.get_server_info()
 523            ...         print(f"Server {server.name} running version {server.version}")
 524        """
 525        response = await self._request("GET", "server")
 526        return await self._parse_response(
 527            response, Server, json_format=self._json_format
 528        )
 529
 530    @log_method_call
 531    async def rename_server(self, name: str) -> bool:
 532        """
 533        Rename the server.
 534
 535        Args:
 536            name: New server name
 537
 538        Returns:
 539            True if successful
 540
 541        Examples:
 542            >>> async def main():
 543            ...     async with AsyncOutlineClient(
 544            ...         "https://example.com:1234/secret",
 545            ...         "ab12cd34..."
 546            ...     ) as client:
 547            ...         success = await client.rename_server("My VPN Server")
 548            ...         if success:
 549            ...             print("Server renamed successfully")
 550        """
 551        request = ServerNameRequest(name=name)
 552        return await self._request(
 553            "PUT", "name", json=request.model_dump(by_alias=True)
 554        )
 555
 556    @log_method_call
 557    async def set_hostname(self, hostname: str) -> bool:
 558        """
 559        Set server hostname for access keys.
 560
 561        Args:
 562            hostname: New hostname or IP address
 563
 564        Returns:
 565            True if successful
 566
 567        Raises:
 568            APIError: If hostname is invalid
 569
 570        Examples:
 571            >>> async def main():
 572            ...     async with AsyncOutlineClient(
 573            ...         "https://example.com:1234/secret",
 574            ...         "ab12cd34..."
 575            ...     ) as client:
 576            ...         await client.set_hostname("vpn.example.com")
 577            ...         # Or use IP address
 578            ...         await client.set_hostname("203.0.113.1")
 579        """
 580        request = HostnameRequest(hostname=hostname)
 581        return await self._request(
 582            "PUT",
 583            "server/hostname-for-access-keys",
 584            json=request.model_dump(by_alias=True),
 585        )
 586
 587    @log_method_call
 588    async def set_default_port(self, port: int) -> bool:
 589        """
 590        Set default port for new access keys.
 591
 592        Args:
 593            port: Port number (1025-65535)
 594
 595        Returns:
 596            True if successful
 597
 598        Raises:
 599            APIError: If port is invalid or in use
 600
 601        Examples:
 602            >>> async def main():
 603            ...     async with AsyncOutlineClient(
 604            ...         "https://example.com:1234/secret",
 605            ...         "ab12cd34..."
 606            ...     ) as client:
 607            ...         await client.set_default_port(8388)
 608        """
 609        if port < MIN_PORT or port > MAX_PORT:
 610            raise ValueError(
 611                f"Privileged ports are not allowed. Use range: {MIN_PORT}-{MAX_PORT}"
 612            )
 613
 614        request = PortRequest(port=port)
 615        return await self._request(
 616            "PUT",
 617            "server/port-for-new-access-keys",
 618            json=request.model_dump(by_alias=True),
 619        )
 620
 621    # Metrics Methods
 622
 623    @log_method_call
 624    async def get_metrics_status(self) -> Union[JsonDict, MetricsStatusResponse]:
 625        """
 626        Get whether metrics collection is enabled.
 627
 628        Returns:
 629            Current metrics collection status
 630
 631        Examples:
 632            >>> async def main():
 633            ...     async with AsyncOutlineClient(
 634            ...         "https://example.com:1234/secret",
 635            ...         "ab12cd34..."
 636            ...     ) as client:
 637            ...         status = await client.get_metrics_status()
 638            ...         if status.metrics_enabled:
 639            ...             print("Metrics collection is enabled")
 640        """
 641        response = await self._request("GET", "metrics/enabled")
 642        return await self._parse_response(
 643            response, MetricsStatusResponse, json_format=self._json_format
 644        )
 645
 646    @log_method_call
 647    async def set_metrics_status(self, enabled: bool) -> bool:
 648        """
 649        Enable or disable metrics collection.
 650
 651        Args:
 652            enabled: Whether to enable metrics
 653
 654        Returns:
 655            True if successful
 656
 657        Examples:
 658            >>> async def main():
 659            ...     async with AsyncOutlineClient(
 660            ...         "https://example.com:1234/secret",
 661            ...         "ab12cd34..."
 662            ...     ) as client:
 663            ...         # Enable metrics
 664            ...         await client.set_metrics_status(True)
 665            ...         # Check new status
 666            ...         status = await client.get_metrics_status()
 667        """
 668        request = MetricsEnabledRequest(metricsEnabled=enabled)
 669        return await self._request(
 670            "PUT", "metrics/enabled", json=request.model_dump(by_alias=True)
 671        )
 672
 673    @log_method_call
 674    async def get_transfer_metrics(self) -> Union[JsonDict, ServerMetrics]:
 675        """
 676        Get transfer metrics for all access keys.
 677
 678        Returns:
 679            Transfer metrics data for each access key
 680
 681        Examples:
 682            >>> async def main():
 683            ...     async with AsyncOutlineClient(
 684            ...         "https://example.com:1234/secret",
 685            ...         "ab12cd34..."
 686            ...     ) as client:
 687            ...         metrics = await client.get_transfer_metrics()
 688            ...         for user_id, bytes_transferred in metrics.bytes_transferred_by_user_id.items():
 689            ...             print(f"User {user_id}: {bytes_transferred / 1024**3:.2f} GB")
 690        """
 691        response = await self._request("GET", "metrics/transfer")
 692        return await self._parse_response(
 693            response, ServerMetrics, json_format=self._json_format
 694        )
 695
 696    @log_method_call
 697    async def get_experimental_metrics(
 698            self, since: str
 699    ) -> Union[JsonDict, ExperimentalMetrics]:
 700        """
 701        Get experimental server metrics.
 702
 703        Args:
 704            since: Required time range filter (e.g., "24h", "7d", "30d", or ISO timestamp)
 705
 706        Returns:
 707            Detailed server and access key metrics
 708
 709        Examples:
 710            >>> async def main():
 711            ...     async with AsyncOutlineClient(
 712            ...         "https://example.com:1234/secret",
 713            ...         "ab12cd34..."
 714            ...     ) as client:
 715            ...         # Get metrics for the last 24 hours
 716            ...         metrics = await client.get_experimental_metrics("24h")
 717            ...         print(f"Server tunnel time: {metrics.server.tunnel_time.seconds}s")
 718            ...         print(f"Server data transferred: {metrics.server.data_transferred.bytes} bytes")
 719            ...
 720            ...         # Get metrics for the last 7 days
 721            ...         metrics = await client.get_experimental_metrics("7d")
 722            ...
 723            ...         # Get metrics since specific timestamp
 724            ...         metrics = await client.get_experimental_metrics("2024-01-01T00:00:00Z")
 725        """
 726        if not since or not since.strip():
 727            raise ValueError("Parameter 'since' is required and cannot be empty")
 728
 729        params = {"since": since}
 730        response = await self._request(
 731            "GET", "experimental/server/metrics", params=params
 732        )
 733        return await self._parse_response(
 734            response, ExperimentalMetrics, json_format=self._json_format
 735        )
 736
 737    # Access Key Management Methods
 738
 739    @log_method_call
 740    async def create_access_key(
 741            self,
 742            *,
 743            name: Optional[str] = None,
 744            password: Optional[str] = None,
 745            port: Optional[int] = None,
 746            method: Optional[str] = None,
 747            limit: Optional[DataLimit] = None,
 748    ) -> Union[JsonDict, AccessKey]:
 749        """
 750        Create a new access key.
 751
 752        Args:
 753            name: Optional key name
 754            password: Optional password
 755            port: Optional port number (1-65535)
 756            method: Optional encryption method
 757            limit: Optional data transfer limit
 758
 759        Returns:
 760            New access key details
 761
 762        Examples:
 763            >>> async def main():
 764            ...     async with AsyncOutlineClient(
 765            ...         "https://example.com:1234/secret",
 766            ...         "ab12cd34..."
 767            ...     ) as client:
 768            ...         # Create basic key
 769            ...         key = await client.create_access_key(name="User 1")
 770            ...
 771            ...         # Create key with data limit
 772            ...         lim = DataLimit(bytes=5 * 1024**3)  # 5 GB
 773            ...         key = await client.create_access_key(
 774            ...             name="Limited User",
 775            ...             port=8388,
 776            ...             limit=lim
 777            ...         )
 778            ...         print(f"Created key: {key.access_url}")
 779        """
 780        request = AccessKeyCreateRequest(
 781            name=name, password=password, port=port, method=method, limit=limit
 782        )
 783        response = await self._request(
 784            "POST",
 785            "access-keys",
 786            json=request.model_dump(exclude_none=True, by_alias=True),
 787        )
 788        return await self._parse_response(
 789            response, AccessKey, json_format=self._json_format
 790        )
 791
 792    @log_method_call
 793    async def create_access_key_with_id(
 794            self,
 795            key_id: str,
 796            *,
 797            name: Optional[str] = None,
 798            password: Optional[str] = None,
 799            port: Optional[int] = None,
 800            method: Optional[str] = None,
 801            limit: Optional[DataLimit] = None,
 802    ) -> Union[JsonDict, AccessKey]:
 803        """
 804        Create a new access key with specific ID.
 805
 806        Args:
 807            key_id: Specific ID for the access key
 808            name: Optional key name
 809            password: Optional password
 810            port: Optional port number (1-65535)
 811            method: Optional encryption method
 812            limit: Optional data transfer limit
 813
 814        Returns:
 815            New access key details
 816
 817        Examples:
 818            >>> async def main():
 819            ...     async with AsyncOutlineClient(
 820            ...         "https://example.com:1234/secret",
 821            ...         "ab12cd34..."
 822            ...     ) as client:
 823            ...         key = await client.create_access_key_with_id(
 824            ...             "my-custom-id",
 825            ...             name="Custom Key"
 826            ...         )
 827        """
 828        request = AccessKeyCreateRequest(
 829            name=name, password=password, port=port, method=method, limit=limit
 830        )
 831        response = await self._request(
 832            "PUT",
 833            f"access-keys/{key_id}",
 834            json=request.model_dump(exclude_none=True, by_alias=True),
 835        )
 836        return await self._parse_response(
 837            response, AccessKey, json_format=self._json_format
 838        )
 839
 840    @log_method_call
 841    async def get_access_keys(self) -> Union[JsonDict, AccessKeyList]:
 842        """
 843        Get all access keys.
 844
 845        Returns:
 846            List of all access keys
 847
 848        Examples:
 849            >>> async def main():
 850            ...     async with AsyncOutlineClient(
 851            ...         "https://example.com:1234/secret",
 852            ...         "ab12cd34..."
 853            ...     ) as client:
 854            ...         keys = await client.get_access_keys()
 855            ...         for key in keys.access_keys:
 856            ...             print(f"Key {key.id}: {key.name or 'unnamed'}")
 857            ...             if key.data_limit:
 858            ...                 print(f"  Limit: {key.data_limit.bytes / 1024**3:.1f} GB")
 859        """
 860        response = await self._request("GET", "access-keys")
 861        return await self._parse_response(
 862            response, AccessKeyList, json_format=self._json_format
 863        )
 864
 865    @log_method_call
 866    async def get_access_key(self, key_id: str) -> Union[JsonDict, AccessKey]:
 867        """
 868        Get specific access key.
 869
 870        Args:
 871            key_id: Access key ID
 872
 873        Returns:
 874            Access key details
 875
 876        Raises:
 877            APIError: If key doesn't exist
 878
 879        Examples:
 880            >>> async def main():
 881            ...     async with AsyncOutlineClient(
 882            ...         "https://example.com:1234/secret",
 883            ...         "ab12cd34..."
 884            ...     ) as client:
 885            ...         key = await client.get_access_key("1")
 886            ...         print(f"Port: {key.port}")
 887            ...         print(f"URL: {key.access_url}")
 888        """
 889        response = await self._request("GET", f"access-keys/{key_id}")
 890        return await self._parse_response(
 891            response, AccessKey, json_format=self._json_format
 892        )
 893
 894    @log_method_call
 895    async def rename_access_key(self, key_id: str, name: str) -> bool:
 896        """
 897        Rename access key.
 898
 899        Args:
 900            key_id: Access key ID
 901            name: New name
 902
 903        Returns:
 904            True if successful
 905
 906        Raises:
 907            APIError: If key doesn't exist
 908
 909        Examples:
 910            >>> async def main():
 911            ...     async with AsyncOutlineClient(
 912            ...         "https://example.com:1234/secret",
 913            ...         "ab12cd34..."
 914            ...     ) as client:
 915            ...         # Rename key
 916            ...         await client.rename_access_key("1", "Alice")
 917            ...
 918            ...         # Verify new name
 919            ...         key = await client.get_access_key("1")
 920            ...         assert key.name == "Alice"
 921        """
 922        request = AccessKeyNameRequest(name=name)
 923        return await self._request(
 924            "PUT", f"access-keys/{key_id}/name", json=request.model_dump(by_alias=True)
 925        )
 926
 927    @log_method_call
 928    async def delete_access_key(self, key_id: str) -> bool:
 929        """
 930        Delete access key.
 931
 932        Args:
 933            key_id: Access key ID
 934
 935        Returns:
 936            True if successful
 937
 938        Raises:
 939            APIError: If key doesn't exist
 940
 941        Examples:
 942            >>> async def main():
 943            ...     async with AsyncOutlineClient(
 944            ...         "https://example.com:1234/secret",
 945            ...         "ab12cd34..."
 946            ...     ) as client:
 947            ...         if await client.delete_access_key("1"):
 948            ...             print("Key deleted")
 949        """
 950        return await self._request("DELETE", f"access-keys/{key_id}")
 951
 952    @log_method_call
 953    async def set_access_key_data_limit(self, key_id: str, bytes_limit: int) -> bool:
 954        """
 955        Set data transfer limit for access key.
 956
 957        Args:
 958            key_id: Access key ID
 959            bytes_limit: Limit in bytes (must be non-negative)
 960
 961        Returns:
 962            True if successful
 963
 964        Raises:
 965            APIError: If key doesn't exist or limit is invalid
 966
 967        Examples:
 968            >>> async def main():
 969            ...     async with AsyncOutlineClient(
 970            ...         "https://example.com:1234/secret",
 971            ...         "ab12cd34..."
 972            ...     ) as client:
 973            ...         # Set 5 GB limit
 974            ...         limit = 5 * 1024**3  # 5 GB in bytes
 975            ...         await client.set_access_key_data_limit("1", limit)
 976            ...
 977            ...         # Verify limit
 978            ...         key = await client.get_access_key("1")
 979            ...         assert key.data_limit and key.data_limit.bytes == limit
 980        """
 981        request = DataLimitRequest(limit=DataLimit(bytes=bytes_limit))
 982        return await self._request(
 983            "PUT",
 984            f"access-keys/{key_id}/data-limit",
 985            json=request.model_dump(by_alias=True),
 986        )
 987
 988    @log_method_call
 989    async def remove_access_key_data_limit(self, key_id: str) -> bool:
 990        """
 991        Remove data transfer limit from access key.
 992
 993        Args:
 994            key_id: Access key ID
 995
 996        Returns:
 997            True if successful
 998
 999        Raises:
1000            APIError: If key doesn't exist
1001
1002        Examples:
1003            >>> async def main():
1004            ...     async with AsyncOutlineClient(
1005            ...         "https://example.com:1234/secret",
1006            ...         "ab12cd34..."
1007            ...     ) as client:
1008            ...         await client.remove_access_key_data_limit("1")
1009        """
1010        return await self._request("DELETE", f"access-keys/{key_id}/data-limit")
1011
1012    # Global Data Limit Methods
1013
1014    @log_method_call
1015    async def set_global_data_limit(self, bytes_limit: int) -> bool:
1016        """
1017        Set global data transfer limit for all access keys.
1018
1019        Args:
1020            bytes_limit: Limit in bytes (must be non-negative)
1021
1022        Returns:
1023            True if successful
1024
1025        Examples:
1026            >>> async def main():
1027            ...     async with AsyncOutlineClient(
1028            ...         "https://example.com:1234/secret",
1029            ...         "ab12cd34..."
1030            ...     ) as client:
1031            ...         # Set 100 GB global limit
1032            ...         await client.set_global_data_limit(100 * 1024**3)
1033        """
1034        request = DataLimitRequest(limit=DataLimit(bytes=bytes_limit))
1035        return await self._request(
1036            "PUT",
1037            "server/access-key-data-limit",
1038            json=request.model_dump(by_alias=True),
1039        )
1040
1041    @log_method_call
1042    async def remove_global_data_limit(self) -> bool:
1043        """
1044        Remove global data transfer limit.
1045
1046        Returns:
1047            True if successful
1048
1049        Examples:
1050            >>> async def main():
1051            ...     async with AsyncOutlineClient(
1052            ...         "https://example.com:1234/secret",
1053            ...         "ab12cd34..."
1054            ...     ) as client:
1055            ...         await client.remove_global_data_limit()
1056        """
1057        return await self._request("DELETE", "server/access-key-data-limit")
1058
1059    # Batch Operations
1060
1061    async def batch_create_access_keys(
1062            self,
1063            keys_config: list[dict[str, Any]],
1064            fail_fast: bool = True
1065    ) -> list[Union[AccessKey, Exception]]:
1066        """
1067        Create multiple access keys in batch.
1068
1069        Args:
1070            keys_config: List of key configurations (same as create_access_key kwargs)
1071            fail_fast: If True, stop on first error. If False, continue and return errors.
1072
1073        Returns:
1074            List of created keys or exceptions
1075
1076        Examples:
1077            >>> async def main():
1078            ...     async with AsyncOutlineClient(
1079            ...         "https://example.com:1234/secret",
1080            ...         "ab12cd34..."
1081            ...     ) as client:
1082            ...         configs = [
1083            ...             {"name": "User1", "limit": DataLimit(bytes=1024**3)},
1084            ...             {"name": "User2", "port": 8388},
1085            ...         ]
1086            ...         res = await client.batch_create_access_keys(configs)
1087        """
1088        results = []
1089
1090        for config in keys_config:
1091            try:
1092                key = await self.create_access_key(**config)
1093                results.append(key)
1094            except Exception as e:
1095                if fail_fast:
1096                    raise
1097                results.append(e)
1098
1099        return results
1100
1101    async def get_server_summary(self, metrics_since: str = "24h") -> dict[str, Any]:
1102        """
1103        Get comprehensive server summary including info, metrics, and key count.
1104
1105        Args:
1106            metrics_since: Time range for experimental metrics (default: "24h")
1107
1108        Returns:
1109            Dictionary with server info, health status, and statistics
1110        """
1111        summary = {}
1112
1113        try:
1114            # Get basic server info
1115            server_info = await self.get_server_info()
1116            summary["server"] = server_info.model_dump() if isinstance(server_info, BaseModel) else server_info
1117
1118            # Get access keys count
1119            keys = await self.get_access_keys()
1120            key_list = keys.access_keys if isinstance(keys, BaseModel) else keys.get("accessKeys", [])
1121            summary["access_keys_count"] = len(key_list)
1122
1123            # Get metrics if available
1124            try:
1125                metrics_status = await self.get_metrics_status()
1126                if (isinstance(metrics_status, BaseModel) and metrics_status.metrics_enabled) or \
1127                        (isinstance(metrics_status, dict) and metrics_status.get("metricsEnabled")):
1128                    transfer_metrics = await self.get_transfer_metrics()
1129                    summary["transfer_metrics"] = transfer_metrics.model_dump() if isinstance(transfer_metrics,
1130                                                                                              BaseModel) else transfer_metrics
1131
1132                    # Try to get experimental metrics
1133                    try:
1134                        experimental_metrics = await self.get_experimental_metrics(metrics_since)
1135                        summary["experimental_metrics"] = experimental_metrics.model_dump() if isinstance(
1136                            experimental_metrics,
1137                            BaseModel) else experimental_metrics
1138                    except Exception:
1139                        summary["experimental_metrics"] = None
1140            except Exception:
1141                summary["transfer_metrics"] = None
1142                summary["experimental_metrics"] = None
1143
1144            summary["healthy"] = True
1145
1146        except Exception as e:
1147            summary["healthy"] = False
1148            summary["error"] = str(e)
1149
1150        return summary
1151
1152    # Utility and management methods
1153
1154    def configure_logging(self, level: str = "INFO", format_string: Optional[str] = None) -> None:
1155        """
1156        Configure logging for the client.
1157
1158        Args:
1159            level: Logging level (DEBUG, INFO, WARNING, ERROR)
1160            format_string: Custom format string for log messages
1161        """
1162        self._enable_logging = True
1163
1164        # Clear existing handlers
1165        logger.handlers.clear()
1166
1167        handler = logging.StreamHandler()
1168        if format_string:
1169            formatter = logging.Formatter(format_string)
1170        else:
1171            formatter = logging.Formatter(
1172                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
1173            )
1174        handler.setFormatter(formatter)
1175        logger.addHandler(handler)
1176        logger.setLevel(getattr(logging, level.upper()))
1177
1178    @property
1179    def is_healthy(self) -> bool:
1180        """Check if the last health check passed."""
1181        return self._is_healthy
1182
1183    @property
1184    def session(self) -> Optional[aiohttp.ClientSession]:
1185        """Access the current client session."""
1186        return self._session
1187
1188    @property
1189    def api_url(self) -> str:
1190        """Get the API URL (without sensitive parts)."""
1191        from urllib.parse import urlparse
1192        parsed = urlparse(self._api_url)
1193        return f"{parsed.scheme}://{parsed.netloc}"
1194
1195    def __repr__(self) -> str:
1196        """String representation of the client."""
1197        status = "connected" if self._session and not self._session.closed else "disconnected"
1198        return f"AsyncOutlineClient(url={self.api_url}, status={status})"

Asynchronous client for the Outline VPN Server API.

Arguments:
  • api_url: Base URL for the Outline server API
  • cert_sha256: SHA-256 fingerprint of the server's TLS certificate
  • json_format: Return raw JSON instead of Pydantic models
  • timeout: Request timeout in seconds
  • retry_attempts: Number of retry attempts connecting to the API
  • enable_logging: Enable debug logging for API calls
  • user_agent: Custom user agent string
  • max_connections: Maximum number of connections in the pool
  • rate_limit_delay: Minimum delay between requests (seconds)
Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34...",
...         enable_logging=True
...     ) as client:
...         server_info = await client.get_server_info()
...         print(f"Server: {server_info.name}")
...
...     # Or use as context manager factory
...     async with AsyncOutlineClient.create(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         await client.get_server_info()
AsyncOutlineClient( api_url: str, cert_sha256: str, *, json_format: bool = False, timeout: int = 30, retry_attempts: int = 3, enable_logging: bool = False, user_agent: Optional[str] = None, max_connections: int = 10, rate_limit_delay: float = 0.0)
157    def __init__(
158            self,
159            api_url: str,
160            cert_sha256: str,
161            *,
162            json_format: bool = False,
163            timeout: int = 30,
164            retry_attempts: int = 3,
165            enable_logging: bool = False,
166            user_agent: Optional[str] = None,
167            max_connections: int = 10,
168            rate_limit_delay: float = 0.0,
169    ) -> None:
170
171        # Validate api_url
172        if not api_url or not api_url.strip():
173            raise ValueError("api_url cannot be empty or whitespace")
174
175        # Validate cert_sha256
176        if not cert_sha256 or not cert_sha256.strip():
177            raise ValueError("cert_sha256 cannot be empty or whitespace")
178
179        # Additional validation for cert_sha256 format (should be hex)
180        cert_sha256_clean = cert_sha256.strip()
181        if not all(c in '0123456789abcdefABCDEF' for c in cert_sha256_clean):
182            raise ValueError("cert_sha256 must contain only hexadecimal characters")
183
184        # Check cert_sha256 length (SHA-256 should be 64 hex characters)
185        if len(cert_sha256_clean) != 64:
186            raise ValueError("cert_sha256 must be exactly 64 hexadecimal characters (SHA-256)")
187
188        self._api_url = api_url.rstrip("/")
189        self._cert_sha256 = cert_sha256
190        self._json_format = json_format
191        self._timeout = aiohttp.ClientTimeout(total=timeout)
192        self._ssl_context: Optional[Fingerprint] = None
193        self._session: Optional[aiohttp.ClientSession] = None
194        self._retry_attempts = retry_attempts
195        self._enable_logging = enable_logging
196        self._user_agent = user_agent or f"PyOutlineAPI/0.3.0"
197        self._max_connections = max_connections
198        self._rate_limit_delay = rate_limit_delay
199        self._last_request_time: float = 0.0
200
201        # Health check state
202        self._last_health_check: float = 0.0
203        self._health_check_interval: float = 300.0  # 5 minutes
204        self._is_healthy: bool = True
205
206        if enable_logging:
207            self._setup_logging()
@classmethod
@asynccontextmanager
def create( cls, api_url: str, cert_sha256: str, **kwargs) -> AsyncGenerator[AsyncOutlineClient, NoneType]:
221    @classmethod
222    @asynccontextmanager
223    async def create(
224            cls,
225            api_url: str,
226            cert_sha256: str,
227            **kwargs
228    ) -> AsyncGenerator[AsyncOutlineClient, None]:
229        """
230        Factory method that returns an async context manager.
231
232        This is the recommended way to create clients for one-off operations.
233        """
234        client = cls(api_url, cert_sha256, **kwargs)
235        async with client:
236            yield client

Factory method that returns an async context manager.

This is the recommended way to create clients for one-off operations.

async def health_check(self, force: bool = False) -> bool:
282    async def health_check(self, force: bool = False) -> bool:
283        """
284        Perform a health check on the Outline server.
285
286        Args:
287            force: Force health check even if recently performed
288
289        Returns:
290            True if server is healthy
291        """
292        current_time = time.time()
293
294        if not force and (current_time - self._last_health_check) < self._health_check_interval:
295            return self._is_healthy
296
297        try:
298            await self.get_server_info()
299            self._is_healthy = True
300            if self._enable_logging:
301                logger.info("Health check passed")
302
303            return self._is_healthy
304        except Exception as e:
305            self._is_healthy = False
306            if self._enable_logging:
307                logger.warning(f"Health check failed: {e}")
308
309            return self._is_healthy
310        finally:
311            self._last_health_check = current_time

Perform a health check on the Outline server.

Arguments:
  • force: Force health check even if recently performed
Returns:

True if server is healthy

@log_method_call
async def get_server_info(self) -> Union[dict[str, Any], Server]:
508    @log_method_call
509    async def get_server_info(self) -> Union[JsonDict, Server]:
510        """
511        Get server information.
512
513        Returns:
514            Server information including name, ID, and configuration.
515
516        Examples:
517            >>> async def main():
518            ...     async with AsyncOutlineClient(
519            ...         "https://example.com:1234/secret",
520            ...         "ab12cd34..."
521            ...     ) as client:
522            ...         server = await client.get_server_info()
523            ...         print(f"Server {server.name} running version {server.version}")
524        """
525        response = await self._request("GET", "server")
526        return await self._parse_response(
527            response, Server, json_format=self._json_format
528        )

Get server information.

Returns:

Server information including name, ID, and configuration.

Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         server = await client.get_server_info()
...         print(f"Server {server.name} running version {server.version}")
@log_method_call
async def rename_server(self, name: str) -> bool:
530    @log_method_call
531    async def rename_server(self, name: str) -> bool:
532        """
533        Rename the server.
534
535        Args:
536            name: New server name
537
538        Returns:
539            True if successful
540
541        Examples:
542            >>> async def main():
543            ...     async with AsyncOutlineClient(
544            ...         "https://example.com:1234/secret",
545            ...         "ab12cd34..."
546            ...     ) as client:
547            ...         success = await client.rename_server("My VPN Server")
548            ...         if success:
549            ...             print("Server renamed successfully")
550        """
551        request = ServerNameRequest(name=name)
552        return await self._request(
553            "PUT", "name", json=request.model_dump(by_alias=True)
554        )

Rename the server.

Arguments:
  • name: New server name
Returns:

True if successful

Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         success = await client.rename_server("My VPN Server")
...         if success:
...             print("Server renamed successfully")
@log_method_call
async def set_hostname(self, hostname: str) -> bool:
556    @log_method_call
557    async def set_hostname(self, hostname: str) -> bool:
558        """
559        Set server hostname for access keys.
560
561        Args:
562            hostname: New hostname or IP address
563
564        Returns:
565            True if successful
566
567        Raises:
568            APIError: If hostname is invalid
569
570        Examples:
571            >>> async def main():
572            ...     async with AsyncOutlineClient(
573            ...         "https://example.com:1234/secret",
574            ...         "ab12cd34..."
575            ...     ) as client:
576            ...         await client.set_hostname("vpn.example.com")
577            ...         # Or use IP address
578            ...         await client.set_hostname("203.0.113.1")
579        """
580        request = HostnameRequest(hostname=hostname)
581        return await self._request(
582            "PUT",
583            "server/hostname-for-access-keys",
584            json=request.model_dump(by_alias=True),
585        )

Set server hostname for access keys.

Arguments:
  • hostname: New hostname or IP address
Returns:

True if successful

Raises:
  • APIError: If hostname is invalid
Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         await client.set_hostname("vpn.example.com")
...         # Or use IP address
...         await client.set_hostname("203.0.113.1")
@log_method_call
async def set_default_port(self, port: int) -> bool:
587    @log_method_call
588    async def set_default_port(self, port: int) -> bool:
589        """
590        Set default port for new access keys.
591
592        Args:
593            port: Port number (1025-65535)
594
595        Returns:
596            True if successful
597
598        Raises:
599            APIError: If port is invalid or in use
600
601        Examples:
602            >>> async def main():
603            ...     async with AsyncOutlineClient(
604            ...         "https://example.com:1234/secret",
605            ...         "ab12cd34..."
606            ...     ) as client:
607            ...         await client.set_default_port(8388)
608        """
609        if port < MIN_PORT or port > MAX_PORT:
610            raise ValueError(
611                f"Privileged ports are not allowed. Use range: {MIN_PORT}-{MAX_PORT}"
612            )
613
614        request = PortRequest(port=port)
615        return await self._request(
616            "PUT",
617            "server/port-for-new-access-keys",
618            json=request.model_dump(by_alias=True),
619        )

Set default port for new access keys.

Arguments:
  • port: Port number (1025-65535)
Returns:

True if successful

Raises:
  • APIError: If port is invalid or in use
Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         await client.set_default_port(8388)
@log_method_call
async def get_metrics_status(self) -> Union[dict[str, Any], MetricsStatusResponse]:
623    @log_method_call
624    async def get_metrics_status(self) -> Union[JsonDict, MetricsStatusResponse]:
625        """
626        Get whether metrics collection is enabled.
627
628        Returns:
629            Current metrics collection status
630
631        Examples:
632            >>> async def main():
633            ...     async with AsyncOutlineClient(
634            ...         "https://example.com:1234/secret",
635            ...         "ab12cd34..."
636            ...     ) as client:
637            ...         status = await client.get_metrics_status()
638            ...         if status.metrics_enabled:
639            ...             print("Metrics collection is enabled")
640        """
641        response = await self._request("GET", "metrics/enabled")
642        return await self._parse_response(
643            response, MetricsStatusResponse, json_format=self._json_format
644        )

Get whether metrics collection is enabled.

Returns:

Current metrics collection status

Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         status = await client.get_metrics_status()
...         if status.metrics_enabled:
...             print("Metrics collection is enabled")
@log_method_call
async def set_metrics_status(self, enabled: bool) -> bool:
646    @log_method_call
647    async def set_metrics_status(self, enabled: bool) -> bool:
648        """
649        Enable or disable metrics collection.
650
651        Args:
652            enabled: Whether to enable metrics
653
654        Returns:
655            True if successful
656
657        Examples:
658            >>> async def main():
659            ...     async with AsyncOutlineClient(
660            ...         "https://example.com:1234/secret",
661            ...         "ab12cd34..."
662            ...     ) as client:
663            ...         # Enable metrics
664            ...         await client.set_metrics_status(True)
665            ...         # Check new status
666            ...         status = await client.get_metrics_status()
667        """
668        request = MetricsEnabledRequest(metricsEnabled=enabled)
669        return await self._request(
670            "PUT", "metrics/enabled", json=request.model_dump(by_alias=True)
671        )

Enable or disable metrics collection.

Arguments:
  • enabled: Whether to enable metrics
Returns:

True if successful

Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         # Enable metrics
...         await client.set_metrics_status(True)
...         # Check new status
...         status = await client.get_metrics_status()
@log_method_call
async def get_transfer_metrics(self) -> Union[dict[str, Any], ServerMetrics]:
673    @log_method_call
674    async def get_transfer_metrics(self) -> Union[JsonDict, ServerMetrics]:
675        """
676        Get transfer metrics for all access keys.
677
678        Returns:
679            Transfer metrics data for each access key
680
681        Examples:
682            >>> async def main():
683            ...     async with AsyncOutlineClient(
684            ...         "https://example.com:1234/secret",
685            ...         "ab12cd34..."
686            ...     ) as client:
687            ...         metrics = await client.get_transfer_metrics()
688            ...         for user_id, bytes_transferred in metrics.bytes_transferred_by_user_id.items():
689            ...             print(f"User {user_id}: {bytes_transferred / 1024**3:.2f} GB")
690        """
691        response = await self._request("GET", "metrics/transfer")
692        return await self._parse_response(
693            response, ServerMetrics, json_format=self._json_format
694        )

Get transfer metrics for all access keys.

Returns:

Transfer metrics data for each access key

Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         metrics = await client.get_transfer_metrics()
...         for user_id, bytes_transferred in metrics.bytes_transferred_by_user_id.items():
...             print(f"User {user_id}: {bytes_transferred / 1024**3:.2f} GB")
@log_method_call
async def get_experimental_metrics( self, since: str) -> Union[dict[str, Any], ExperimentalMetrics]:
696    @log_method_call
697    async def get_experimental_metrics(
698            self, since: str
699    ) -> Union[JsonDict, ExperimentalMetrics]:
700        """
701        Get experimental server metrics.
702
703        Args:
704            since: Required time range filter (e.g., "24h", "7d", "30d", or ISO timestamp)
705
706        Returns:
707            Detailed server and access key metrics
708
709        Examples:
710            >>> async def main():
711            ...     async with AsyncOutlineClient(
712            ...         "https://example.com:1234/secret",
713            ...         "ab12cd34..."
714            ...     ) as client:
715            ...         # Get metrics for the last 24 hours
716            ...         metrics = await client.get_experimental_metrics("24h")
717            ...         print(f"Server tunnel time: {metrics.server.tunnel_time.seconds}s")
718            ...         print(f"Server data transferred: {metrics.server.data_transferred.bytes} bytes")
719            ...
720            ...         # Get metrics for the last 7 days
721            ...         metrics = await client.get_experimental_metrics("7d")
722            ...
723            ...         # Get metrics since specific timestamp
724            ...         metrics = await client.get_experimental_metrics("2024-01-01T00:00:00Z")
725        """
726        if not since or not since.strip():
727            raise ValueError("Parameter 'since' is required and cannot be empty")
728
729        params = {"since": since}
730        response = await self._request(
731            "GET", "experimental/server/metrics", params=params
732        )
733        return await self._parse_response(
734            response, ExperimentalMetrics, json_format=self._json_format
735        )

Get experimental server metrics.

Arguments:
  • since: Required time range filter (e.g., "24h", "7d", "30d", or ISO timestamp)
Returns:

Detailed server and access key metrics

Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         # Get metrics for the last 24 hours
...         metrics = await client.get_experimental_metrics("24h")
...         print(f"Server tunnel time: {metrics.server.tunnel_time.seconds}s")
...         print(f"Server data transferred: {metrics.server.data_transferred.bytes} bytes")
...
...         # Get metrics for the last 7 days
...         metrics = await client.get_experimental_metrics("7d")
...
...         # Get metrics since specific timestamp
...         metrics = await client.get_experimental_metrics("2024-01-01T00:00:00Z")
@log_method_call
async def create_access_key( self, *, name: Optional[str] = None, password: Optional[str] = None, port: Optional[int] = None, method: Optional[str] = None, limit: Optional[DataLimit] = None) -> Union[dict[str, Any], AccessKey]:
739    @log_method_call
740    async def create_access_key(
741            self,
742            *,
743            name: Optional[str] = None,
744            password: Optional[str] = None,
745            port: Optional[int] = None,
746            method: Optional[str] = None,
747            limit: Optional[DataLimit] = None,
748    ) -> Union[JsonDict, AccessKey]:
749        """
750        Create a new access key.
751
752        Args:
753            name: Optional key name
754            password: Optional password
755            port: Optional port number (1-65535)
756            method: Optional encryption method
757            limit: Optional data transfer limit
758
759        Returns:
760            New access key details
761
762        Examples:
763            >>> async def main():
764            ...     async with AsyncOutlineClient(
765            ...         "https://example.com:1234/secret",
766            ...         "ab12cd34..."
767            ...     ) as client:
768            ...         # Create basic key
769            ...         key = await client.create_access_key(name="User 1")
770            ...
771            ...         # Create key with data limit
772            ...         lim = DataLimit(bytes=5 * 1024**3)  # 5 GB
773            ...         key = await client.create_access_key(
774            ...             name="Limited User",
775            ...             port=8388,
776            ...             limit=lim
777            ...         )
778            ...         print(f"Created key: {key.access_url}")
779        """
780        request = AccessKeyCreateRequest(
781            name=name, password=password, port=port, method=method, limit=limit
782        )
783        response = await self._request(
784            "POST",
785            "access-keys",
786            json=request.model_dump(exclude_none=True, by_alias=True),
787        )
788        return await self._parse_response(
789            response, AccessKey, json_format=self._json_format
790        )

Create a new access key.

Arguments:
  • name: Optional key name
  • password: Optional password
  • port: Optional port number (1-65535)
  • method: Optional encryption method
  • limit: Optional data transfer limit
Returns:

New access key details

Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         # Create basic key
...         key = await client.create_access_key(name="User 1")
...
...         # Create key with data limit
...         lim = DataLimit(bytes=5 * 1024**3)  # 5 GB
...         key = await client.create_access_key(
...             name="Limited User",
...             port=8388,
...             limit=lim
...         )
...         print(f"Created key: {key.access_url}")
@log_method_call
async def create_access_key_with_id( self, key_id: str, *, name: Optional[str] = None, password: Optional[str] = None, port: Optional[int] = None, method: Optional[str] = None, limit: Optional[DataLimit] = None) -> Union[dict[str, Any], AccessKey]:
792    @log_method_call
793    async def create_access_key_with_id(
794            self,
795            key_id: str,
796            *,
797            name: Optional[str] = None,
798            password: Optional[str] = None,
799            port: Optional[int] = None,
800            method: Optional[str] = None,
801            limit: Optional[DataLimit] = None,
802    ) -> Union[JsonDict, AccessKey]:
803        """
804        Create a new access key with specific ID.
805
806        Args:
807            key_id: Specific ID for the access key
808            name: Optional key name
809            password: Optional password
810            port: Optional port number (1-65535)
811            method: Optional encryption method
812            limit: Optional data transfer limit
813
814        Returns:
815            New access key details
816
817        Examples:
818            >>> async def main():
819            ...     async with AsyncOutlineClient(
820            ...         "https://example.com:1234/secret",
821            ...         "ab12cd34..."
822            ...     ) as client:
823            ...         key = await client.create_access_key_with_id(
824            ...             "my-custom-id",
825            ...             name="Custom Key"
826            ...         )
827        """
828        request = AccessKeyCreateRequest(
829            name=name, password=password, port=port, method=method, limit=limit
830        )
831        response = await self._request(
832            "PUT",
833            f"access-keys/{key_id}",
834            json=request.model_dump(exclude_none=True, by_alias=True),
835        )
836        return await self._parse_response(
837            response, AccessKey, json_format=self._json_format
838        )

Create a new access key with specific ID.

Arguments:
  • key_id: Specific ID for the access key
  • name: Optional key name
  • password: Optional password
  • port: Optional port number (1-65535)
  • method: Optional encryption method
  • limit: Optional data transfer limit
Returns:

New access key details

Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         key = await client.create_access_key_with_id(
...             "my-custom-id",
...             name="Custom Key"
...         )
@log_method_call
async def get_access_keys(self) -> Union[dict[str, Any], AccessKeyList]:
840    @log_method_call
841    async def get_access_keys(self) -> Union[JsonDict, AccessKeyList]:
842        """
843        Get all access keys.
844
845        Returns:
846            List of all access keys
847
848        Examples:
849            >>> async def main():
850            ...     async with AsyncOutlineClient(
851            ...         "https://example.com:1234/secret",
852            ...         "ab12cd34..."
853            ...     ) as client:
854            ...         keys = await client.get_access_keys()
855            ...         for key in keys.access_keys:
856            ...             print(f"Key {key.id}: {key.name or 'unnamed'}")
857            ...             if key.data_limit:
858            ...                 print(f"  Limit: {key.data_limit.bytes / 1024**3:.1f} GB")
859        """
860        response = await self._request("GET", "access-keys")
861        return await self._parse_response(
862            response, AccessKeyList, json_format=self._json_format
863        )

Get all access keys.

Returns:

List of all access keys

Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         keys = await client.get_access_keys()
...         for key in keys.access_keys:
...             print(f"Key {key.id}: {key.name or 'unnamed'}")
...             if key.data_limit:
...                 print(f"  Limit: {key.data_limit.bytes / 1024**3:.1f} GB")
@log_method_call
async def get_access_key( self, key_id: str) -> Union[dict[str, Any], AccessKey]:
865    @log_method_call
866    async def get_access_key(self, key_id: str) -> Union[JsonDict, AccessKey]:
867        """
868        Get specific access key.
869
870        Args:
871            key_id: Access key ID
872
873        Returns:
874            Access key details
875
876        Raises:
877            APIError: If key doesn't exist
878
879        Examples:
880            >>> async def main():
881            ...     async with AsyncOutlineClient(
882            ...         "https://example.com:1234/secret",
883            ...         "ab12cd34..."
884            ...     ) as client:
885            ...         key = await client.get_access_key("1")
886            ...         print(f"Port: {key.port}")
887            ...         print(f"URL: {key.access_url}")
888        """
889        response = await self._request("GET", f"access-keys/{key_id}")
890        return await self._parse_response(
891            response, AccessKey, json_format=self._json_format
892        )

Get specific access key.

Arguments:
  • key_id: Access key ID
Returns:

Access key details

Raises:
  • APIError: If key doesn't exist
Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         key = await client.get_access_key("1")
...         print(f"Port: {key.port}")
...         print(f"URL: {key.access_url}")
@log_method_call
async def rename_access_key(self, key_id: str, name: str) -> bool:
894    @log_method_call
895    async def rename_access_key(self, key_id: str, name: str) -> bool:
896        """
897        Rename access key.
898
899        Args:
900            key_id: Access key ID
901            name: New name
902
903        Returns:
904            True if successful
905
906        Raises:
907            APIError: If key doesn't exist
908
909        Examples:
910            >>> async def main():
911            ...     async with AsyncOutlineClient(
912            ...         "https://example.com:1234/secret",
913            ...         "ab12cd34..."
914            ...     ) as client:
915            ...         # Rename key
916            ...         await client.rename_access_key("1", "Alice")
917            ...
918            ...         # Verify new name
919            ...         key = await client.get_access_key("1")
920            ...         assert key.name == "Alice"
921        """
922        request = AccessKeyNameRequest(name=name)
923        return await self._request(
924            "PUT", f"access-keys/{key_id}/name", json=request.model_dump(by_alias=True)
925        )

Rename access key.

Arguments:
  • key_id: Access key ID
  • name: New name
Returns:

True if successful

Raises:
  • APIError: If key doesn't exist
Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         # Rename key
...         await client.rename_access_key("1", "Alice")
...
...         # Verify new name
...         key = await client.get_access_key("1")
...         assert key.name == "Alice"
@log_method_call
async def delete_access_key(self, key_id: str) -> bool:
927    @log_method_call
928    async def delete_access_key(self, key_id: str) -> bool:
929        """
930        Delete access key.
931
932        Args:
933            key_id: Access key ID
934
935        Returns:
936            True if successful
937
938        Raises:
939            APIError: If key doesn't exist
940
941        Examples:
942            >>> async def main():
943            ...     async with AsyncOutlineClient(
944            ...         "https://example.com:1234/secret",
945            ...         "ab12cd34..."
946            ...     ) as client:
947            ...         if await client.delete_access_key("1"):
948            ...             print("Key deleted")
949        """
950        return await self._request("DELETE", f"access-keys/{key_id}")

Delete access key.

Arguments:
  • key_id: Access key ID
Returns:

True if successful

Raises:
  • APIError: If key doesn't exist
Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         if await client.delete_access_key("1"):
...             print("Key deleted")
@log_method_call
async def set_access_key_data_limit(self, key_id: str, bytes_limit: int) -> bool:
952    @log_method_call
953    async def set_access_key_data_limit(self, key_id: str, bytes_limit: int) -> bool:
954        """
955        Set data transfer limit for access key.
956
957        Args:
958            key_id: Access key ID
959            bytes_limit: Limit in bytes (must be non-negative)
960
961        Returns:
962            True if successful
963
964        Raises:
965            APIError: If key doesn't exist or limit is invalid
966
967        Examples:
968            >>> async def main():
969            ...     async with AsyncOutlineClient(
970            ...         "https://example.com:1234/secret",
971            ...         "ab12cd34..."
972            ...     ) as client:
973            ...         # Set 5 GB limit
974            ...         limit = 5 * 1024**3  # 5 GB in bytes
975            ...         await client.set_access_key_data_limit("1", limit)
976            ...
977            ...         # Verify limit
978            ...         key = await client.get_access_key("1")
979            ...         assert key.data_limit and key.data_limit.bytes == limit
980        """
981        request = DataLimitRequest(limit=DataLimit(bytes=bytes_limit))
982        return await self._request(
983            "PUT",
984            f"access-keys/{key_id}/data-limit",
985            json=request.model_dump(by_alias=True),
986        )

Set data transfer limit for access key.

Arguments:
  • key_id: Access key ID
  • bytes_limit: Limit in bytes (must be non-negative)
Returns:

True if successful

Raises:
  • APIError: If key doesn't exist or limit is invalid
Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         # Set 5 GB limit
...         limit = 5 * 1024**3  # 5 GB in bytes
...         await client.set_access_key_data_limit("1", limit)
...
...         # Verify limit
...         key = await client.get_access_key("1")
...         assert key.data_limit and key.data_limit.bytes == limit
@log_method_call
async def remove_access_key_data_limit(self, key_id: str) -> bool:
 988    @log_method_call
 989    async def remove_access_key_data_limit(self, key_id: str) -> bool:
 990        """
 991        Remove data transfer limit from access key.
 992
 993        Args:
 994            key_id: Access key ID
 995
 996        Returns:
 997            True if successful
 998
 999        Raises:
1000            APIError: If key doesn't exist
1001
1002        Examples:
1003            >>> async def main():
1004            ...     async with AsyncOutlineClient(
1005            ...         "https://example.com:1234/secret",
1006            ...         "ab12cd34..."
1007            ...     ) as client:
1008            ...         await client.remove_access_key_data_limit("1")
1009        """
1010        return await self._request("DELETE", f"access-keys/{key_id}/data-limit")

Remove data transfer limit from access key.

Arguments:
  • key_id: Access key ID
Returns:

True if successful

Raises:
  • APIError: If key doesn't exist
Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         await client.remove_access_key_data_limit("1")
@log_method_call
async def set_global_data_limit(self, bytes_limit: int) -> bool:
1014    @log_method_call
1015    async def set_global_data_limit(self, bytes_limit: int) -> bool:
1016        """
1017        Set global data transfer limit for all access keys.
1018
1019        Args:
1020            bytes_limit: Limit in bytes (must be non-negative)
1021
1022        Returns:
1023            True if successful
1024
1025        Examples:
1026            >>> async def main():
1027            ...     async with AsyncOutlineClient(
1028            ...         "https://example.com:1234/secret",
1029            ...         "ab12cd34..."
1030            ...     ) as client:
1031            ...         # Set 100 GB global limit
1032            ...         await client.set_global_data_limit(100 * 1024**3)
1033        """
1034        request = DataLimitRequest(limit=DataLimit(bytes=bytes_limit))
1035        return await self._request(
1036            "PUT",
1037            "server/access-key-data-limit",
1038            json=request.model_dump(by_alias=True),
1039        )

Set global data transfer limit for all access keys.

Arguments:
  • bytes_limit: Limit in bytes (must be non-negative)
Returns:

True if successful

Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         # Set 100 GB global limit
...         await client.set_global_data_limit(100 * 1024**3)
@log_method_call
async def remove_global_data_limit(self) -> bool:
1041    @log_method_call
1042    async def remove_global_data_limit(self) -> bool:
1043        """
1044        Remove global data transfer limit.
1045
1046        Returns:
1047            True if successful
1048
1049        Examples:
1050            >>> async def main():
1051            ...     async with AsyncOutlineClient(
1052            ...         "https://example.com:1234/secret",
1053            ...         "ab12cd34..."
1054            ...     ) as client:
1055            ...         await client.remove_global_data_limit()
1056        """
1057        return await self._request("DELETE", "server/access-key-data-limit")

Remove global data transfer limit.

Returns:

True if successful

Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         await client.remove_global_data_limit()
async def batch_create_access_keys( self, keys_config: list[dict[str, typing.Any]], fail_fast: bool = True) -> list[typing.Union[AccessKey, Exception]]:
1061    async def batch_create_access_keys(
1062            self,
1063            keys_config: list[dict[str, Any]],
1064            fail_fast: bool = True
1065    ) -> list[Union[AccessKey, Exception]]:
1066        """
1067        Create multiple access keys in batch.
1068
1069        Args:
1070            keys_config: List of key configurations (same as create_access_key kwargs)
1071            fail_fast: If True, stop on first error. If False, continue and return errors.
1072
1073        Returns:
1074            List of created keys or exceptions
1075
1076        Examples:
1077            >>> async def main():
1078            ...     async with AsyncOutlineClient(
1079            ...         "https://example.com:1234/secret",
1080            ...         "ab12cd34..."
1081            ...     ) as client:
1082            ...         configs = [
1083            ...             {"name": "User1", "limit": DataLimit(bytes=1024**3)},
1084            ...             {"name": "User2", "port": 8388},
1085            ...         ]
1086            ...         res = await client.batch_create_access_keys(configs)
1087        """
1088        results = []
1089
1090        for config in keys_config:
1091            try:
1092                key = await self.create_access_key(**config)
1093                results.append(key)
1094            except Exception as e:
1095                if fail_fast:
1096                    raise
1097                results.append(e)
1098
1099        return results

Create multiple access keys in batch.

Arguments:
  • keys_config: List of key configurations (same as create_access_key kwargs)
  • fail_fast: If True, stop on first error. If False, continue and return errors.
Returns:

List of created keys or exceptions

Examples:
>>> async def main():
...     async with AsyncOutlineClient(
...         "https://example.com:1234/secret",
...         "ab12cd34..."
...     ) as client:
...         configs = [
...             {"name": "User1", "limit": DataLimit(bytes=1024**3)},
...             {"name": "User2", "port": 8388},
...         ]
...         res = await client.batch_create_access_keys(configs)
async def get_server_summary(self, metrics_since: str = '24h') -> dict[str, typing.Any]:
1101    async def get_server_summary(self, metrics_since: str = "24h") -> dict[str, Any]:
1102        """
1103        Get comprehensive server summary including info, metrics, and key count.
1104
1105        Args:
1106            metrics_since: Time range for experimental metrics (default: "24h")
1107
1108        Returns:
1109            Dictionary with server info, health status, and statistics
1110        """
1111        summary = {}
1112
1113        try:
1114            # Get basic server info
1115            server_info = await self.get_server_info()
1116            summary["server"] = server_info.model_dump() if isinstance(server_info, BaseModel) else server_info
1117
1118            # Get access keys count
1119            keys = await self.get_access_keys()
1120            key_list = keys.access_keys if isinstance(keys, BaseModel) else keys.get("accessKeys", [])
1121            summary["access_keys_count"] = len(key_list)
1122
1123            # Get metrics if available
1124            try:
1125                metrics_status = await self.get_metrics_status()
1126                if (isinstance(metrics_status, BaseModel) and metrics_status.metrics_enabled) or \
1127                        (isinstance(metrics_status, dict) and metrics_status.get("metricsEnabled")):
1128                    transfer_metrics = await self.get_transfer_metrics()
1129                    summary["transfer_metrics"] = transfer_metrics.model_dump() if isinstance(transfer_metrics,
1130                                                                                              BaseModel) else transfer_metrics
1131
1132                    # Try to get experimental metrics
1133                    try:
1134                        experimental_metrics = await self.get_experimental_metrics(metrics_since)
1135                        summary["experimental_metrics"] = experimental_metrics.model_dump() if isinstance(
1136                            experimental_metrics,
1137                            BaseModel) else experimental_metrics
1138                    except Exception:
1139                        summary["experimental_metrics"] = None
1140            except Exception:
1141                summary["transfer_metrics"] = None
1142                summary["experimental_metrics"] = None
1143
1144            summary["healthy"] = True
1145
1146        except Exception as e:
1147            summary["healthy"] = False
1148            summary["error"] = str(e)
1149
1150        return summary

Get comprehensive server summary including info, metrics, and key count.

Arguments:
  • metrics_since: Time range for experimental metrics (default: "24h")
Returns:

Dictionary with server info, health status, and statistics

def configure_logging(self, level: str = 'INFO', format_string: Optional[str] = None) -> None:
1154    def configure_logging(self, level: str = "INFO", format_string: Optional[str] = None) -> None:
1155        """
1156        Configure logging for the client.
1157
1158        Args:
1159            level: Logging level (DEBUG, INFO, WARNING, ERROR)
1160            format_string: Custom format string for log messages
1161        """
1162        self._enable_logging = True
1163
1164        # Clear existing handlers
1165        logger.handlers.clear()
1166
1167        handler = logging.StreamHandler()
1168        if format_string:
1169            formatter = logging.Formatter(format_string)
1170        else:
1171            formatter = logging.Formatter(
1172                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
1173            )
1174        handler.setFormatter(formatter)
1175        logger.addHandler(handler)
1176        logger.setLevel(getattr(logging, level.upper()))

Configure logging for the client.

Arguments:
  • level: Logging level (DEBUG, INFO, WARNING, ERROR)
  • format_string: Custom format string for log messages
is_healthy: bool
1178    @property
1179    def is_healthy(self) -> bool:
1180        """Check if the last health check passed."""
1181        return self._is_healthy

Check if the last health check passed.

session: Optional[aiohttp.client.ClientSession]
1183    @property
1184    def session(self) -> Optional[aiohttp.ClientSession]:
1185        """Access the current client session."""
1186        return self._session

Access the current client session.

api_url: str
1188    @property
1189    def api_url(self) -> str:
1190        """Get the API URL (without sensitive parts)."""
1191        from urllib.parse import urlparse
1192        parsed = urlparse(self._api_url)
1193        return f"{parsed.scheme}://{parsed.netloc}"

Get the API URL (without sensitive parts).

class OutlineError(builtins.Exception):
19class OutlineError(Exception):
20    """Base exception for Outline client errors."""

Base exception for Outline client errors.

class APIError(pyoutlineapi.OutlineError):
23class APIError(OutlineError):
24    """Raised when API requests fail."""
25
26    def __init__(
27        self,
28        message: str,
29        status_code: Optional[int] = None,
30        attempt: Optional[int] = None,
31    ) -> None:
32        super().__init__(message)
33        self.status_code = status_code
34        self.attempt = attempt
35
36    def __str__(self) -> str:
37        msg = super().__str__()
38        if self.attempt is not None:
39            msg = f"[Attempt {self.attempt}] {msg}"
40        return msg

Raised when API requests fail.

APIError( message: str, status_code: Optional[int] = None, attempt: Optional[int] = None)
26    def __init__(
27        self,
28        message: str,
29        status_code: Optional[int] = None,
30        attempt: Optional[int] = None,
31    ) -> None:
32        super().__init__(message)
33        self.status_code = status_code
34        self.attempt = attempt
status_code
attempt
class AccessKey(pydantic.main.BaseModel):
34class AccessKey(BaseModel):
35    """Access key details."""
36
37    id: str = Field(description="Access key identifier")
38    name: Optional[str] = Field(None, description="Access key name")
39    password: str = Field(description="Access key password")
40    port: int = Field(gt=0, lt=65536, description="Port number")
41    method: str = Field(description="Encryption method")
42    access_url: str = Field(alias="accessUrl", description="Complete access URL")
43    data_limit: Optional[DataLimit] = Field(
44        None, alias="dataLimit", description="Data limit for this key"
45    )

Access key details.

id: str
name: Optional[str]
password: str
port: int
method: str
access_url: str
data_limit: Optional[DataLimit]
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class AccessKeyCreateRequest(pydantic.main.BaseModel):
194class AccessKeyCreateRequest(BaseModel):
195    """
196    Request parameters for creating an access key.
197    Per OpenAPI: /access-keys POST request body
198    """
199
200    name: Optional[str] = Field(None, description="Access key name")
201    method: Optional[str] = Field(None, description="Encryption method")
202    password: Optional[str] = Field(None, description="Access key password")
203    port: Optional[int] = Field(None, gt=0, lt=65536, description="Port number")
204    limit: Optional[DataLimit] = Field(None, description="Data limit for this key")

Request parameters for creating an access key. Per OpenAPI: /access-keys POST request body

name: Optional[str]
method: Optional[str]
password: Optional[str]
port: Optional[int]
limit: Optional[DataLimit]
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class AccessKeyList(pydantic.main.BaseModel):
48class AccessKeyList(BaseModel):
49    """List of access keys."""
50
51    access_keys: list[AccessKey] = Field(alias="accessKeys")

List of access keys.

access_keys: list[AccessKey]
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class AccessKeyNameRequest(pydantic.main.BaseModel):
225class AccessKeyNameRequest(BaseModel):
226    """Request for renaming access key."""
227
228    name: str = Field(description="New access key name")

Request for renaming access key.

name: str
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class DataLimit(pydantic.main.BaseModel):
21class DataLimit(BaseModel):
22    """Data transfer limit configuration."""
23
24    bytes: int = Field(ge=0, description="Data limit in bytes")
25
26    @classmethod
27    @field_validator("bytes")
28    def validate_bytes(cls, v: int) -> int:
29        if v < 0:
30            raise ValueError("bytes must be non-negative")
31        return v

Data transfer limit configuration.

bytes: int
@classmethod
@field_validator('bytes')
def validate_bytes(unknown):
26    @classmethod
27    @field_validator("bytes")
28    def validate_bytes(cls, v: int) -> int:
29        if v < 0:
30            raise ValueError("bytes must be non-negative")
31        return v

Wrap a classmethod, staticmethod, property or unbound function and act as a descriptor that allows us to detect decorated items from the class' attributes.

This class' __get__ returns the wrapped item's __get__ result, which makes it transparent for classmethods and staticmethods.

Attributes:
  • wrapped: The decorator that has to be wrapped.
  • decorator_info: The decorator info.
  • shim: A wrapper function to wrap V1 style function.
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class DataLimitRequest(pydantic.main.BaseModel):
231class DataLimitRequest(BaseModel):
232    """Request for setting data limit."""
233
234    limit: DataLimit = Field(description="Data limit configuration")

Request for setting data limit.

limit: DataLimit
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ErrorResponse(pydantic.main.BaseModel):
253class ErrorResponse(BaseModel):
254    """
255    Error response structure.
256    Per OpenAPI: 404 and 400 responses
257    """
258
259    code: str = Field(description="Error code")
260    message: str = Field(description="Error message")

Error response structure. Per OpenAPI: 404 and 400 responses

code: str
message: str
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ExperimentalMetrics(pydantic.main.BaseModel):
151class ExperimentalMetrics(BaseModel):
152    """
153    Experimental metrics data structure.
154    Per OpenAPI: /experimental/server/metrics endpoint
155    """
156
157    server: ServerExperimentalMetric = Field(description="Server metrics")
158    access_keys: list[AccessKeyMetric] = Field(
159        alias="accessKeys", description="Access key metrics"
160    )

Experimental metrics data structure. Per OpenAPI: /experimental/server/metrics endpoint

server: pyoutlineapi.models.ServerExperimentalMetric
access_keys: list[pyoutlineapi.models.AccessKeyMetric]
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class HostnameRequest(pydantic.main.BaseModel):
213class HostnameRequest(BaseModel):
214    """Request for changing hostname."""
215
216    hostname: str = Field(description="New hostname or IP address")

Request for changing hostname.

hostname: str
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class MetricsEnabledRequest(pydantic.main.BaseModel):
237class MetricsEnabledRequest(BaseModel):
238    """Request for enabling/disabling metrics."""
239
240    metrics_enabled: bool = Field(
241        alias="metricsEnabled", description="Enable or disable metrics"
242    )

Request for enabling/disabling metrics.

metrics_enabled: bool
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class MetricsStatusResponse(pydantic.main.BaseModel):
245class MetricsStatusResponse(BaseModel):
246    """Response for /metrics/enabled endpoint."""
247
248    metrics_enabled: bool = Field(
249        alias="metricsEnabled", description="Current metrics status"
250    )

Response for /metrics/enabled endpoint.

metrics_enabled: bool
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class PortRequest(pydantic.main.BaseModel):
219class PortRequest(BaseModel):
220    """Request for changing default port."""
221
222    port: int = Field(gt=0, lt=65536, description="New default port")

Request for changing default port.

port: int
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class Server(pydantic.main.BaseModel):
163class Server(BaseModel):
164    """
165    Server information.
166    Per OpenAPI: /server endpoint schema
167    """
168
169    name: str = Field(description="Server name")
170    server_id: str = Field(alias="serverId", description="Unique server identifier")
171    metrics_enabled: bool = Field(
172        alias="metricsEnabled", description="Metrics sharing status"
173    )
174    created_timestamp_ms: int = Field(
175        alias="createdTimestampMs", description="Creation timestamp in milliseconds"
176    )
177    version: str = Field(description="Server version")
178    port_for_new_access_keys: int = Field(
179        alias="portForNewAccessKeys",
180        gt=0,
181        lt=65536,
182        description="Default port for new keys",
183    )
184    hostname_for_access_keys: Optional[str] = Field(
185        None, alias="hostnameForAccessKeys", description="Hostname for access keys"
186    )
187    access_key_data_limit: Optional[DataLimit] = Field(
188        None,
189        alias="accessKeyDataLimit",
190        description="Global data limit for access keys",
191    )

Server information. Per OpenAPI: /server endpoint schema

name: str
server_id: str
metrics_enabled: bool
created_timestamp_ms: int
version: str
port_for_new_access_keys: int
hostname_for_access_keys: Optional[str]
access_key_data_limit: Optional[DataLimit]
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ServerMetrics(pydantic.main.BaseModel):
54class ServerMetrics(BaseModel):
55    """
56    Server metrics data for data transferred per access key.
57    Per OpenAPI: /metrics/transfer endpoint
58    """
59
60    bytes_transferred_by_user_id: dict[str, int] = Field(
61        alias="bytesTransferredByUserId",
62        description="Data transferred by each access key ID",
63    )

Server metrics data for data transferred per access key. Per OpenAPI: /metrics/transfer endpoint

bytes_transferred_by_user_id: dict[str, int]
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ServerNameRequest(pydantic.main.BaseModel):
207class ServerNameRequest(BaseModel):
208    """Request for renaming server."""
209
210    name: str = Field(description="New server name")

Request for renaming server.

name: str
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].