From 56ba20c5cf3330db17a77a376b149305306b4197 Mon Sep 17 00:00:00 2001 From: Andrey Atapin Date: Thu, 4 Jul 2024 12:55:55 +0500 Subject: [PATCH 1/3] Added test coverage for batched_parmap function and improved its readability by simplifying the implementation. Also improved the performance by utilizing list.extend method instead of `+` concatenation. Extension modifies a list in place, instead of creating a new list by `+` --- offchain/concurrency.py | 21 ++++++++++----------- tests/test_concurrency.py | 14 ++++++++++++++ 2 files changed, 24 insertions(+), 11 deletions(-) create mode 100644 tests/test_concurrency.py diff --git a/offchain/concurrency.py b/offchain/concurrency.py index 1cc4c03..23e6489 100644 --- a/offchain/concurrency.py +++ b/offchain/concurrency.py @@ -1,9 +1,12 @@ import multiprocessing from concurrent.futures import ThreadPoolExecutor -from typing import Any, Callable, Sequence +from typing import Any, Callable, Sequence, TypeVar from offchain.logger.logging import logger +T = TypeVar("T") +U = TypeVar("U") + MAX_PROCS = (multiprocessing.cpu_count() * 2) + 1 @@ -22,7 +25,7 @@ def parallelize_with_threads(*args: Sequence[Callable]) -> Sequence[Any]: # typ return res -def parmap(fn: Callable, args: list) -> list: # type: ignore[type-arg] +def parmap(fn: Callable, args: list) -> list: """Run a map in parallel safely Args: @@ -39,15 +42,11 @@ def parmap(fn: Callable, args: list) -> list: # type: ignore[type-arg] return list(parallelize_with_threads(*map(lambda i: lambda: fn(i), args))) # type: ignore[arg-type] # noqa: E501 -def batched_parmap(fn: Callable, args: list, batch_size: int = 10) -> list: # type: ignore[type-arg] # noqa: E501 +def batched_parmap(fn: Callable[[T], U], args: list[T], batch_size: int = 10) -> list: # noqa: E501 results = [] - i, j = 0, 0 - while i < len(args): - i, j = i + batch_size, i - if len(args) > i: - batch = args[j:i] - else: - batch = args[j:] + for i in range(0, len(args), batch_size): + batch_end = i + batch_size + batch = args[i:batch_end] res = parmap(fn, batch) - results += res + results.extend(res) return results diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py new file mode 100644 index 0000000..87b2567 --- /dev/null +++ b/tests/test_concurrency.py @@ -0,0 +1,14 @@ +import pytest + +from offchain.concurrency import batched_parmap + + +@pytest.mark.parametrize("batch_size", range(1, 11)) +def test_batched_parmap(batch_size): + def square(x): + return x * x + + args = list(range(0, 10)) + expected = [square(x) for x in args] + result = batched_parmap(square, args, batch_size=batch_size) + assert result == expected From d314eaa33352fe07277985f2997c32429e75e66f Mon Sep 17 00:00:00 2001 From: Andrey Atapin Date: Thu, 4 Jul 2024 13:05:36 +0500 Subject: [PATCH 2/3] returned a removed pragma --- offchain/concurrency.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/offchain/concurrency.py b/offchain/concurrency.py index 23e6489..29dba1d 100644 --- a/offchain/concurrency.py +++ b/offchain/concurrency.py @@ -25,7 +25,7 @@ def parallelize_with_threads(*args: Sequence[Callable]) -> Sequence[Any]: # typ return res -def parmap(fn: Callable, args: list) -> list: +def parmap(fn: Callable, args: list) -> list: # type: ignore[type-arg] """Run a map in parallel safely Args: From fad6dc11a9776050c44e783dceaa456d321b2ae1 Mon Sep 17 00:00:00 2001 From: Andrey Atapin Date: Thu, 4 Jul 2024 13:06:48 +0500 Subject: [PATCH 3/3] added a type hint --- offchain/concurrency.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/offchain/concurrency.py b/offchain/concurrency.py index 29dba1d..9597612 100644 --- a/offchain/concurrency.py +++ b/offchain/concurrency.py @@ -42,7 +42,7 @@ def parmap(fn: Callable, args: list) -> list: # type: ignore[type-arg] return list(parallelize_with_threads(*map(lambda i: lambda: fn(i), args))) # type: ignore[arg-type] # noqa: E501 -def batched_parmap(fn: Callable[[T], U], args: list[T], batch_size: int = 10) -> list: # noqa: E501 +def batched_parmap(fn: Callable[[T], U], args: list[T], batch_size: int = 10) -> list[U]: # noqa: E501 results = [] for i in range(0, len(args), batch_size): batch_end = i + batch_size