Skip to content

Commit

Permalink
Merge pull request #89 from ourzora/try-async-parser
Browse files Browse the repository at this point in the history
Make everything async for Metadata
  • Loading branch information
mattmalec authored Sep 21, 2023
2 parents 622958f + c81d9bf commit c24b372
Show file tree
Hide file tree
Showing 80 changed files with 2,639 additions and 969 deletions.
4 changes: 4 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## v0.2.2

- Go deep on making things as async as they possibly can

## v0.2.1

- Add async support for custom adapters
Expand Down
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Getting Started

Documentation for version: **v0.2.1**
Documentation for version: **v0.2.2**

## Overview

Expand Down
2 changes: 1 addition & 1 deletion offchain/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from offchain.metadata import (
from offchain.metadata import ( # type: ignore[attr-defined]
get_token_metadata,
Metadata,
MetadataFetcher,
Expand Down
2 changes: 1 addition & 1 deletion offchain/base/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class StringEnum(str, Enum):
def __repr__(self) -> str:
return str(self.value)

def __str__(self):
def __str__(self): # type: ignore[no-untyped-def]
return str(self.value)

@classmethod
Expand Down
12 changes: 6 additions & 6 deletions offchain/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
MAX_PROCS = (multiprocessing.cpu_count() * 2) + 1


def parallelize_with_threads(*args: Sequence[Callable]) -> Sequence[Any]:
def parallelize_with_threads(*args: Sequence[Callable]) -> Sequence[Any]: # type: ignore[type-arg] # noqa: E501
"""Parallelize a set of functions with a threadpool.
Good for network calls, less for for number crunching.
Expand All @@ -17,12 +17,12 @@ def parallelize_with_threads(*args: Sequence[Callable]) -> Sequence[Any]:
n_tasks = len(args)
logger.debug("Starting tasks", extra={"num_tasks": n_tasks})
with ThreadPoolExecutor(max_workers=min(n_tasks, MAX_PROCS)) as pool:
futures = [pool.submit(fn) for fn in args]
futures = [pool.submit(fn) for fn in args] # type: ignore[arg-type, var-annotated] # noqa: E501
res = [f.result() for f in futures]
return res


def parmap(fn: Callable, args: list) -> list:
def parmap(fn: Callable, args: list) -> list: # type: ignore[type-arg]
"""Run a map in parallel safely
Args:
Expand All @@ -35,11 +35,11 @@ def parmap(fn: Callable, args: list) -> list:
Note: explicitly using a map to generate function rather than a list comprehension to prevent
a subtle variable shadowing bug that can occur with code like this:
>>> parallelize_with_threads(*[lambda: fn(arg) for arg in args])
"""
return list(parallelize_with_threads(*map(lambda i: lambda: fn(i), args)))
""" # noqa: E501
return list(parallelize_with_threads(*map(lambda i: lambda: fn(i), args))) # type: ignore[arg-type] # noqa: E501


def batched_parmap(fn: Callable, args: list, batch_size: int = 10) -> list:
def batched_parmap(fn: Callable, args: list, batch_size: int = 10) -> list: # type: ignore[type-arg] # noqa: E501
results = []
i, j = 0, 0
while i < len(args):
Expand Down
2 changes: 1 addition & 1 deletion offchain/logger/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

log_handler = logging.StreamHandler()
log_handler.setFormatter(
jsonlogger.JsonFormatter(
jsonlogger.JsonFormatter( # type: ignore[no-untyped-call]
rename_fields={"levelname": "severity"},
fmt="%(name)s %(threadName) %(message)s '%(asctime)s %(levelname)",
)
Expand Down
12 changes: 6 additions & 6 deletions offchain/metadata/adapters/arweave.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ class ARWeaveAdapter(HTTPAdapter):
key (str, optional): optional key to send with request
secret (str, optional): optional secret to send with request
timeout (int): request timeout in seconds. Defaults to 10 seconds.
"""
""" # noqa: E501

def __init__(
def __init__( # type: ignore[no-untyped-def]
self,
host_prefixes: Optional[list[str]] = None,
key: Optional[str] = None,
Expand Down Expand Up @@ -59,7 +59,7 @@ def parse_ar_url(self, url: str) -> str:
url = new_url
return url

async def gen_send(self, url: str, sess: httpx.AsyncClient(), *args, **kwargs) -> httpx.Response:
async def gen_send(self, url: str, sess: httpx.AsyncClient(), *args, **kwargs) -> httpx.Response: # type: ignore[no-untyped-def, valid-type] # noqa: E501
"""Format and send async request to ARWeave host.
Args:
Expand All @@ -69,9 +69,9 @@ async def gen_send(self, url: str, sess: httpx.AsyncClient(), *args, **kwargs) -
Returns:
httpx.Response: response from ARWeave host.
"""
return await sess.get(self.parse_ar_url(url), timeout=self.timeout, follow_redirects=True)
return await sess.get(self.parse_ar_url(url), timeout=self.timeout, follow_redirects=True) # type: ignore[no-any-return] # noqa: E501

def send(self, request: PreparedRequest, *args, **kwargs) -> Response:
def send(self, request: PreparedRequest, *args, **kwargs) -> Response: # type: ignore[no-untyped-def] # noqa: E501
"""Format and send request to ARWeave host.
Args:
Expand All @@ -80,6 +80,6 @@ def send(self, request: PreparedRequest, *args, **kwargs) -> Response:
Returns:
Response: response from ARWeave host.
"""
request.url = self.parse_ar_url(request.url)
request.url = self.parse_ar_url(request.url) # type: ignore[arg-type]
kwargs["timeout"] = self.timeout
return super().send(request, *args, **kwargs)
20 changes: 10 additions & 10 deletions offchain/metadata/adapters/base_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
class BaseAdapter(RequestsBaseAdapter):
"""Base Adapter inheriting from requests BaseAdapter"""

def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def]
super().__init__()

async def gen_send(self, url: str, *args, **kwargs) -> httpx.Response:
async def gen_send(self, url: str, *args, **kwargs) -> httpx.Response: # type: ignore[no-untyped-def] # noqa: E501
"""Format and send async request to url host.
Args:
Expand All @@ -28,18 +28,18 @@ async def gen_send(self, url: str, *args, **kwargs) -> httpx.Response:
class HTTPAdapter(RequestsHTTPAdapter):
"""HTTP Adapter inheriting from requests HTTPAdapter"""

def __init__(
def __init__( # type: ignore[no-untyped-def]
self,
pool_connections: int = ...,
pool_maxsize: int = ...,
max_retries: Union[Retry, int, None] = ...,
pool_block: bool = ...,
pool_connections: int = ..., # type: ignore[assignment]
pool_maxsize: int = ..., # type: ignore[assignment]
max_retries: Union[Retry, int, None] = ..., # type: ignore[assignment]
pool_block: bool = ..., # type: ignore[assignment]
*args,
**kwargs
) -> None:
super().__init__(pool_connections, pool_maxsize, max_retries, pool_block)

async def gen_send(self, url: str, sess: httpx.AsyncClient(), *args, **kwargs) -> httpx.Response:
async def gen_send(self, url: str, sess: httpx.AsyncClient(), *args, **kwargs) -> httpx.Response: # type: ignore[no-untyped-def, valid-type] # noqa: E501
"""Format and send async request to url host.
Args:
Expand All @@ -48,7 +48,7 @@ async def gen_send(self, url: str, sess: httpx.AsyncClient(), *args, **kwargs) -
Returns:
httpx.Response: response from host.
"""
return await sess.get(url, follow_redirects=True)
return await sess.get(url, follow_redirects=True) # type: ignore[no-any-return]


Adapter = Union[BaseAdapter, HTTPAdapter]
Expand All @@ -59,4 +59,4 @@ class AdapterConfig:
adapter_cls: Type[Adapter]
mount_prefixes: list[str]
host_prefixes: Optional[list[str]] = None
kwargs: dict = field(default_factory=dict)
kwargs: dict = field(default_factory=dict) # type: ignore[type-arg]
20 changes: 10 additions & 10 deletions offchain/metadata/adapters/data_uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from offchain.metadata.registries.adapter_registry import AdapterRegistry


def decode_data_url(data_url):
def decode_data_url(data_url): # type: ignore[no-untyped-def]
data_parts = data_url.split(",")
data = data_parts[1]

Expand All @@ -24,10 +24,10 @@ def decode_data_url(data_url):
class DataURIAdapter(BaseAdapter):
"""Provides an interface for Requests sessions to handle data uris."""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def]
super().__init__(*args, **kwargs) # type: ignore[no-untyped-call]

async def gen_send(self, url: str, *args, **kwargs) -> httpx.Response:
async def gen_send(self, url: str, *args, **kwargs) -> httpx.Response: # type: ignore[no-untyped-def] # noqa: E501
"""Handle async data uri request.
Args:
Expand All @@ -38,12 +38,12 @@ async def gen_send(self, url: str, *args, **kwargs) -> httpx.Response:
"""
response = httpx.Response(
status_code=200,
text=decode_data_url(url),
text=decode_data_url(url), # type: ignore[no-untyped-call]
request=httpx.Request(method="GET", url=url),
)
return response

def send(self, request: PreparedRequest, *args, **kwargs):
def send(self, request: PreparedRequest, *args, **kwargs): # type: ignore[no-untyped-def] # noqa: E501
"""Handle data uri request.
Args:
Expand All @@ -54,10 +54,10 @@ def send(self, request: PreparedRequest, *args, **kwargs):
"""
newResponse = Response()
newResponse.request = request
newResponse.url = request.url
newResponse.connection = self
newResponse.url = request.url # type: ignore[assignment]
newResponse.connection = self # type: ignore[attr-defined]
try:
response = urlopen(request.url)
response = urlopen(request.url) # type: ignore[arg-type]
newResponse.status_code = 200
newResponse.headers = response.headers
newResponse.raw = response
Expand All @@ -66,5 +66,5 @@ def send(self, request: PreparedRequest, *args, **kwargs):
finally:
return newResponse

def close(self):
def close(self): # type: ignore[no-untyped-def]
self.response.close()
20 changes: 10 additions & 10 deletions offchain/metadata/adapters/ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ def build_request_url(gateway: str, request_url: str) -> str:
if parsed_url.host != "ipfs":
host = parsed_url.host
# Remove duplicate slashes
if url.endswith("/") and host.startswith("/"):
host = host[1:]
url += host
if url.endswith("/") and host.startswith("/"): # type: ignore[union-attr]
host = host[1:] # type: ignore[index]
url += host # type: ignore[operator]
if parsed_url.path is not None:
path = parsed_url.path
# Remove duplicate slashes
if url.endswith("/") and path.startswith("/"):
path = path[1:]
url += path
# Handle "https://" prefixed urls that have "/ipfs/" in the path
elif parsed_url.scheme == "https" and "ipfs" in parsed_url.path:
elif parsed_url.scheme == "https" and "ipfs" in parsed_url.path: # type: ignore[operator] # noqa: E501
url = f"{gateway}"
if parsed_url.path is not None:
path = parsed_url.path
Expand All @@ -60,9 +60,9 @@ class IPFSAdapter(HTTPAdapter):
key (str, optional): optional key to send with request
secret (str, optional): optional secret to send with request
timeout (int): request timeout in seconds. Defaults to 10 seconds.
"""
""" # noqa: E501

def __init__(
def __init__( # type: ignore[no-untyped-def]
self,
host_prefixes: Optional[list[str]] = None,
key: Optional[str] = None,
Expand Down Expand Up @@ -96,7 +96,7 @@ def make_request_url(self, request_url: str, gateway: Optional[str] = None) -> s
gateway = gateway or random.choice(self.host_prefixes)
return build_request_url(gateway=gateway, request_url=request_url)

async def gen_send(self, url: str, sess: httpx.AsyncClient(), *args, **kwargs) -> httpx.Response:
async def gen_send(self, url: str, sess: httpx.AsyncClient(), *args, **kwargs) -> httpx.Response: # type: ignore[no-untyped-def, valid-type] # noqa: E501
"""Format and send async request to IPFS host.
Args:
Expand All @@ -106,9 +106,9 @@ async def gen_send(self, url: str, sess: httpx.AsyncClient(), *args, **kwargs) -
Returns:
httpx.Response: response from IPFS host.
"""
return await sess.get(self.make_request_url(url), timeout=self.timeout, follow_redirects=True)
return await sess.get(self.make_request_url(url), timeout=self.timeout, follow_redirects=True) # type: ignore[no-any-return] # noqa: E501

def send(self, request: PreparedRequest, *args, **kwargs) -> Response:
def send(self, request: PreparedRequest, *args, **kwargs) -> Response: # type: ignore[no-untyped-def] # noqa: E501
"""For IPFS hashes, query pinata cloud gateway
Args:
Expand All @@ -117,7 +117,7 @@ def send(self, request: PreparedRequest, *args, **kwargs) -> Response:
Returns:
Response: response from IPFS Gateway
"""
request.url = self.make_request_url(request.url)
request.url = self.make_request_url(request.url) # type: ignore[arg-type]

kwargs["timeout"] = self.timeout
return super().send(request, *args, **kwargs)
21 changes: 16 additions & 5 deletions offchain/metadata/fetchers/base_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,23 @@ def __init__(
) -> None:
pass

def set_timeout(self, new_timeout: int):
def set_timeout(self, new_timeout: int): # type: ignore[no-untyped-def]
"""Setter function for timeout
Args:
new_timeout (int): new request timeout in seconds.
"""
pass

def set_max_retries(self, new_max_retries: int):
def set_max_retries(self, new_max_retries: int): # type: ignore[no-untyped-def]
"""Setter function for max retries
Args:
new_max_retries (int): new maximum number of request retries.
"""
pass

def register_adapter(self, adapter: Adapter, url_prefix: str):
def register_adapter(self, adapter: Adapter, url_prefix: str): # type: ignore[no-untyped-def] # noqa: E501
"""Register an adapter to a url prefix.
Args:
Expand All @@ -59,7 +59,18 @@ def fetch_mime_type_and_size(self, uri: str) -> tuple[str, int]:
"""
pass

def fetch_content(self, uri: str) -> Union[dict, str]:
async def gen_fetch_mime_type_and_size(self, uri: str) -> tuple[str, int]:
"""Fetch the mime type and size of the content at a given uri.
Args:
uri (str): uri from which to fetch content mime type and size.
Returns:
tuple[str, int]: mime type and size
"""
pass

def fetch_content(self, uri: str) -> Union[dict, str]: # type: ignore[type-arg]
"""Fetch the content at a given uri
Args:
Expand All @@ -70,7 +81,7 @@ def fetch_content(self, uri: str) -> Union[dict, str]:
"""
pass

async def gen_fetch_content(self, uri: str) -> Union[dict, str]:
async def gen_fetch_content(self, uri: str) -> Union[dict, str]: # type: ignore[type-arg] # noqa: E501
"""Async fetch the content at a given uri
Args:
Expand Down
Loading

0 comments on commit c24b372

Please sign in to comment.