Skip to content
46 changes: 44 additions & 2 deletions openapi_python_sdk/async_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import json
import random
from typing import Any, Dict

import httpx
Expand All @@ -10,8 +12,21 @@ class AsyncClient:
Suitable for use with FastAPI, aiohttp, etc.
"""

def __init__(self, token: str, client: Any = None, timeout: float = 30.0):
def __init__(
self,
token: str,
client: Any = None,
timeout: float = 30.0,
max_retries: int = 0,
backoff_factor: float = 1.0,
retry_on_status: list[int] = None,
):
self.client = client if client is not None else httpx.AsyncClient(timeout=timeout)
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.retry_on_status = (
retry_on_status if retry_on_status is not None else [429, 502, 503, 504]
)
self.auth_header: str = f"Bearer {token}"
self.headers: Dict[str, str] = {
"Authorization": self.auth_header,
Expand All @@ -30,6 +45,32 @@ async def aclose(self):
"""Manually close the underlying HTTP client (async)."""
await self.client.aclose()

async def _request_with_retry(self, request_fn, *args, **kwargs) -> httpx.Response:
attempts = 0
while True:
try:
resp = await request_fn(*args, **kwargs)
if resp.status_code in self.retry_on_status and attempts < self.max_retries:
attempts += 1
sleep_time = self.backoff_factor * (2 ** attempts) + random.uniform(0, 0.5)
if resp.status_code == 429:
retry_after = resp.headers.get("Retry-After")
if retry_after:
try:
sleep_time = float(retry_after)
except ValueError:
pass
await asyncio.sleep(sleep_time)
continue
return resp
except httpx.RequestError as exc:
if attempts < self.max_retries:
attempts += 1
sleep_time = self.backoff_factor * (2 ** attempts) + random.uniform(0, 0.5)
await asyncio.sleep(sleep_time)
continue
raise exc

async def request(
self,
method: str = "GET",
Expand All @@ -50,7 +91,8 @@ async def request(
url = f"{url}&{query_string}" if "?" in url else f"{url}?{query_string}"
params = None

resp = await self.client.request(
resp = await self._request_with_retry(
self.client.request,
method=method,
url=url,
headers=self.headers,
Expand Down
55 changes: 49 additions & 6 deletions openapi_python_sdk/async_oauth_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import base64
import random
from typing import Any, Dict, List

import httpx
Expand All @@ -12,8 +14,23 @@ class AsyncOauthClient:
Suitable for use with FastAPI, aiohttp, etc.
"""

def __init__(self, username: str, apikey: str, test: bool = False, client: Any = None, timeout: float = 30.0):
def __init__(
self,
username: str,
apikey: str,
test: bool = False,
client: Any = None,
timeout: float = 30.0,
max_retries: int = 0,
backoff_factor: float = 1.0,
retry_on_status: List[int] = None,
):
self.client = client if client is not None else httpx.AsyncClient(timeout=timeout)
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.retry_on_status = (
retry_on_status if retry_on_status is not None else [429, 502, 503, 504]
)
self.url: str = TEST_OAUTH_BASE_URL if test else OAUTH_BASE_URL
self.auth_header: str = (
"Basic " + base64.b64encode(f"{username}:{apikey}".encode("utf-8")).decode()
Expand All @@ -35,35 +52,61 @@ async def aclose(self):
"""Manually close the underlying HTTP client (async)."""
await self.client.aclose()

async def _request_with_retry(self, request_fn, *args, **kwargs) -> httpx.Response:
attempts = 0
while True:
try:
resp = await request_fn(*args, **kwargs)
if resp.status_code in self.retry_on_status and attempts < self.max_retries:
attempts += 1
sleep_time = self.backoff_factor * (2 ** attempts) + random.uniform(0, 0.5)
if resp.status_code == 429:
retry_after = resp.headers.get("Retry-After")
if retry_after:
try:
sleep_time = float(retry_after)
except ValueError:
pass
await asyncio.sleep(sleep_time)
continue
return resp
except httpx.RequestError as exc:
if attempts < self.max_retries:
attempts += 1
sleep_time = self.backoff_factor * (2 ** attempts) + random.uniform(0, 0.5)
await asyncio.sleep(sleep_time)
continue
raise exc

async def get_scopes(self, limit: bool = False) -> Dict[str, Any]:
"""Retrieve available scopes for the current user (async)."""
params = {"limit": int(limit)}
url = f"{self.url}/scopes"
resp = await self.client.get(url=url, headers=self.headers, params=params)
resp = await self._request_with_retry(self.client.get, url=url, headers=self.headers, params=params)
return resp.json()

async def create_token(self, scopes: List[str] = [], ttl: int = 0) -> Dict[str, Any]:
"""Create a new bearer token with specified scopes and TTL (async)."""
payload = {"scopes": scopes, "ttl": ttl}
url = f"{self.url}/token"
resp = await self.client.post(url=url, headers=self.headers, json=payload)
resp = await self._request_with_retry(self.client.post, url=url, headers=self.headers, json=payload)
return resp.json()

async def get_token(self, scope: str = None) -> Dict[str, Any]:
"""Retrieve an existing token, optionally filtered by scope (async)."""
params = {"scope": scope or ""}
url = f"{self.url}/token"
resp = await self.client.get(url=url, headers=self.headers, params=params)
resp = await self._request_with_retry(self.client.get, url=url, headers=self.headers, params=params)
return resp.json()

async def delete_token(self, id: str) -> Dict[str, Any]:
"""Revoke/Delete a specific token by ID (async)."""
url = f"{self.url}/token/{id}"
resp = await self.client.delete(url=url, headers=self.headers)
resp = await self._request_with_retry(self.client.delete, url=url, headers=self.headers)
return resp.json()

async def get_counters(self, period: str, date: str) -> Dict[str, Any]:
"""Retrieve usage counters for a specific period and date (async)."""
url = f"{self.url}/counters/{period}/{date}"
resp = await self.client.get(url=url, headers=self.headers)
resp = await self._request_with_retry(self.client.get, url=url, headers=self.headers)
return resp.json()
49 changes: 46 additions & 3 deletions openapi_python_sdk/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import random
import threading
import time
from typing import Any, Dict

import httpx
Expand All @@ -15,10 +17,23 @@ class Client:
Synchronous client for making authenticated requests to Openapi endpoints.
"""

def __init__(self, token: str, client: Any = None, timeout: float = 30.0):
def __init__(
self,
token: str,
client: Any = None,
timeout: float = 30.0,
max_retries: int = 0,
backoff_factor: float = 1.0,
retry_on_status: list[int] = None,
):
self._client = client
self._thread_local = threading.local()
self.timeout = timeout
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.retry_on_status = (
retry_on_status if retry_on_status is not None else [429, 502, 503, 504]
)
self.auth_header: str = f"Bearer {token}"
self.headers: Dict[str, str] = {
"Authorization": self.auth_header,
Expand Down Expand Up @@ -55,6 +70,32 @@ def close(self):
"""Manually close the underlying HTTP client."""
self.client.close()

def _request_with_retry(self, request_fn, *args, **kwargs) -> httpx.Response:
attempts = 0
while True:
try:
resp = request_fn(*args, **kwargs)
if resp.status_code in self.retry_on_status and attempts < self.max_retries:
attempts += 1
sleep_time = self.backoff_factor * (2 ** attempts) + random.uniform(0, 0.5)
if resp.status_code == 429:
retry_after = resp.headers.get("Retry-After")
if retry_after:
try:
sleep_time = float(retry_after)
except ValueError:
pass
time.sleep(sleep_time)
continue
return resp
except httpx.RequestError as exc:
if attempts < self.max_retries:
attempts += 1
sleep_time = self.backoff_factor * (2 ** attempts) + random.uniform(0, 0.5)
time.sleep(sleep_time)
continue
raise exc

def request(
self,
method: str = "GET",
Expand All @@ -75,13 +116,15 @@ def request(
url = f"{url}&{query_string}" if "?" in url else f"{url}?{query_string}"
params = None

data = self.client.request(
resp = self._request_with_retry(
self.client.request,
method=method,
url=url,
headers=self.headers,
json=payload,
params=params,
).json()
)
data = resp.json()

# Handle cases where the API might return a JSON-encoded string instead of an object
if isinstance(data, str):
Expand Down
60 changes: 54 additions & 6 deletions openapi_python_sdk/oauth_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import base64
import random
import threading
import time
from typing import Any, Dict, List

import httpx
Expand All @@ -13,10 +15,25 @@ class OauthClient:
Synchronous client for handling Openapi authentication and token management.
"""

def __init__(self, username: str, apikey: str, test: bool = False, client: Any = None, timeout: float = 30.0):
def __init__(
self,
username: str,
apikey: str,
test: bool = False,
client: Any = None,
timeout: float = 30.0,
max_retries: int = 0,
backoff_factor: float = 1.0,
retry_on_status: List[int] = None,
):
self._client = client
self._thread_local = threading.local()
self.timeout = timeout
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.retry_on_status = (
retry_on_status if retry_on_status is not None else [429, 502, 503, 504]
)
self.url: str = TEST_OAUTH_BASE_URL if test else OAUTH_BASE_URL
self.auth_header: str = (
"Basic " + base64.b64encode(f"{username}:{apikey}".encode("utf-8")).decode()
Expand Down Expand Up @@ -55,30 +72,61 @@ def close(self):
"""Manually close the underlying HTTP client."""
self.client.close()

def _request_with_retry(self, request_fn, *args, **kwargs) -> httpx.Response:
attempts = 0
while True:
try:
resp = request_fn(*args, **kwargs)
if resp.status_code in self.retry_on_status and attempts < self.max_retries:
attempts += 1
sleep_time = self.backoff_factor * (2 ** attempts) + random.uniform(0, 0.5)
if resp.status_code == 429:
retry_after = resp.headers.get("Retry-After")
if retry_after:
try:
sleep_time = float(retry_after)
except ValueError:
pass
time.sleep(sleep_time)
continue
return resp
except httpx.RequestError as exc:
if attempts < self.max_retries:
attempts += 1
sleep_time = self.backoff_factor * (2 ** attempts) + random.uniform(0, 0.5)
time.sleep(sleep_time)
continue
raise exc

def get_scopes(self, limit: bool = False) -> Dict[str, Any]:
"""Retrieve available scopes for the current user."""
params = {"limit": int(limit)}
url = f"{self.url}/scopes"
return self.client.get(url=url, headers=self.headers, params=params).json()
resp = self._request_with_retry(self.client.get, url=url, headers=self.headers, params=params)
return resp.json()

def create_token(self, scopes: List[str] = [], ttl: int = 0) -> Dict[str, Any]:
"""Create a new bearer token with specified scopes and TTL."""
payload = {"scopes": scopes, "ttl": ttl}
url = f"{self.url}/token"
return self.client.post(url=url, headers=self.headers, json=payload).json()
resp = self._request_with_retry(self.client.post, url=url, headers=self.headers, json=payload)
return resp.json()

def get_token(self, scope: str = None) -> Dict[str, Any]:
"""Retrieve an existing token, optionally filtered by scope."""
params = {"scope": scope or ""}
url = f"{self.url}/token"
return self.client.get(url=url, headers=self.headers, params=params).json()
resp = self._request_with_retry(self.client.get, url=url, headers=self.headers, params=params)
return resp.json()

def delete_token(self, id: str) -> Dict[str, Any]:
"""Revoke/Delete a specific token by ID."""
url = f"{self.url}/token/{id}"
return self.client.delete(url=url, headers=self.headers).json()
resp = self._request_with_retry(self.client.delete, url=url, headers=self.headers)
return resp.json()

def get_counters(self, period: str, date: str) -> Dict[str, Any]:
"""Retrieve usage counters for a specific period and date."""
url = f"{self.url}/counters/{period}/{date}"
return self.client.get(url=url, headers=self.headers).json()
resp = self._request_with_retry(self.client.get, url=url, headers=self.headers)
return resp.json()
Loading
Loading