Skip to content

Commit

Permalink
Merge pull request #30 from datalad/itertools
Browse files Browse the repository at this point in the history
Itertools
  • Loading branch information
mih authored Jun 18, 2024
2 parents 92abc30 + aa7a22c commit 1008a0d
Show file tree
Hide file tree
Showing 15 changed files with 1,099 additions and 25 deletions.
62 changes: 52 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,61 @@ implementations are standalone, and are meant to be equally well usable outside
the DataLad system.

A focus of this library is efficient communication with subprocesses, such as
Git or git-annex commands, which read and produce data in some format. The
library provides utilities to integrate such subprocess in Python algorithms,
for example, to iteratively amend information in JSON-lines formatted data
streams that are retrieved in arbitrary chunks over a network connection.
Git or git-annex commands, which read and produce data in some format.

Here is a simple demo how an iterable with inputs can be fed to the ``cat``
shell command, while reading its output back as a Python iterable.
Here is a demo of what can be accomplished with this library. The following
code queries a remote git-annex repository via a `git annex find` command
running over an SSH connection in batch-mode. The output in JSON-lines format
is then itemized and decoded to native Python data types. Both inputs and
outputs are iterables with meaningful items, even though at a lower level
information is transmitted as an arbitrarily chunked byte stream.

```py
>>> with iter_subproc(['cat'], inputs=[b'one', b'two', b'three']) as proc:
... for chunk in proc:
... print(chunk)
b'onetwothree'
>>> from more_itertools import intersperse
>>> from pprint import pprint
>>> from datasalad.runners import iter_subproc
>>> from datasalad.itertools import (
... itemize,
... load_json,
... )

>>> # a bunch of photos we are interested in
>>> interesting = [
... b'DIY/IMG_20200504_205821.jpg',
... b'DIY/IMG_20200505_082136.jpg',
... ]

>>> # run `git-annex find` on a remote server in a repository
>>> # that has these photos in the worktree.
>>> with iter_subproc(
... ['ssh', '[email protected]',
... 'git -C "collections" annex find --json --batch'],
... # the remote process is fed the file names,
... # and a newline after each one to make git-annex write
... # a report in JSON-lines format
... inputs=intersperse(b'\n', interesting),
... ) as remote_annex:
... # we loop over the output of the remote process.
... # this is originally a byte stream downloaded in arbitrary
... # chunks, so we itemize at any newline separator.
... # each item is then decoded from JSON-lines format to
... # native datatypes
... for rec in load_json(itemize(remote_annex, sep=b'\n')):
... # for this demo we just pretty-print it
... pprint(rec)
{'backend': 'SHA256E',
'bytesize': '3357612',
'error-messages': [],
'file': 'DIY/IMG_20200504_205821.jpg',
'hashdirlower': '853/12f/',
'hashdirmixed': '65/qp/',
'humansize': '3.36 MB',
'key': 'SHA256E-s3357612--700a52971714c2707c2de975f6015ca14d1a4cdbbf01e43d73951c45cd58c176.jpg',
'keyname': '700a52971714c2707c2de975f6015ca14d1a4cdbbf01e43d73951c45cd58c176.jpg',
'mtime': 'unknown'}
{'backend': 'SHA256E',
'bytesize': '3284291',
...
```

## Developing with datasalad
Expand Down
38 changes: 38 additions & 0 deletions datasalad/itertools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Various iterators, e.g., for subprocess pipelining and output processing
.. currentmodule:: datasalad.itertools
.. autosummary::
:toctree: generated
align_pattern
decode_bytes
itemize
load_json
load_json_with_flag
route_out
route_in
"""

__all__ = [
'align_pattern',
'decode_bytes',
'itemize',
'load_json',
'load_json_with_flag',
'StoreOnly',
'route_in',
'route_out',
]

from .align_pattern import align_pattern
from .decode_bytes import decode_bytes
from .itemize import itemize
from .load_json import (
load_json,
load_json_with_flag,
)
from .reroute import (
StoreOnly,
route_in,
route_out,
)
122 changes: 122 additions & 0 deletions datasalad/itertools/align_pattern.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Function to ensure that a pattern is completely contained in single chunks"""

from __future__ import annotations

import re
from typing import (
Generator,
Iterable,
TypeVar,
)

# TODO: datalad-next originally also had `str` here. Confirm
S = TypeVar('S', str, bytes, bytearray)


def align_pattern(iterable: Iterable[S], pattern: S) -> Generator[S, None, None]:
"""Yield data chunks that contain a complete pattern, if it is present
``align_pattern`` makes it easy to find a pattern (``str``, ``bytes``,
or ``bytearray``) in data chunks. It joins data-chunks in such a way,
that a simple containment-check (e.g. ``pattern in chunk``) on the chunks
that ``align_pattern`` yields will suffice to determine whether the pattern
is present in the stream yielded by the underlying iterable or not.
To achieve this, ``align_pattern`` will join consecutive chunks to ensures
that the following two assertions hold:
1. Each chunk that is yielded by ``align_pattern`` has at least the length
of the pattern (unless the underlying iterable is exhausted before the
length of the pattern is reached).
2. The pattern is not split between two chunks, i.e. no chunk that is
yielded by ``align_pattern`` ends with a prefix of the pattern (unless
it is the last chunk that the underlying iterable yield).
The pattern might be present multiple times in a yielded data chunk.
Note: the ``pattern`` is compared verbatim to the content in the data
chunks, i.e. no parsing of the ``pattern`` is performed and no regular
expressions or wildcards are supported.
.. code-block:: python
>>> from datasalad.itertools import align_pattern
>>> tuple(align_pattern([b'abcd', b'e', b'fghi'], pattern=b'def'))
(b'abcdefghi',)
>>> # The pattern can be present multiple times in a yielded chunk
>>> tuple(align_pattern([b'abcd', b'e', b'fdefghi'], pattern=b'def'))
(b'abcdefdefghi',)
Use this function if you want to locate a pattern in an input stream. It
allows to use a simple ``in``-check to determine whether the pattern is
present in the yielded result chunks.
The function always yields everything it has fetched from the underlying
iterable. So after a yield it does not cache any data from the underlying
iterable. That means, if the functionality of
``align_pattern`` is no longer required, the underlying iterator can be
used, when ``align_pattern`` has yielded a data chunk.
This allows more efficient processing of the data that remains in the
underlying iterable.
Parameters
----------
iterable: Iterable
An iterable that yields data chunks.
pattern: str | bytes | bytearray
The pattern that should be contained in the chunks. Its type must be
compatible to the type of the elements in ``iterable``.
Yields
-------
bytes | bytearray
data chunks that have at least the size of the pattern and do not end
with a prefix of the pattern. Note that a data chunk might contain the
pattern multiple times.
"""

# Create pattern matcher for all
if isinstance(pattern, str):
regex: str | bytes | bytearray = (
'('
+ '|'.join(
'.' * (len(pattern) - index - 1) + re.escape(pattern[:index]) + '$'
for index in range(1, len(pattern))
)
+ ')'
)
else:
regex = (
b'('
+ b'|'.join(
b'.' * (len(pattern) - index - 1) + re.escape(pattern[:index]) + b'$'
for index in range(1, len(pattern))
)
+ b')'
)
pattern_matcher = re.compile(regex, re.DOTALL)
pattern_sub = len(pattern) - 1
# Join data chunks until they are sufficiently long to contain the pattern,
# i.e. have at least size: `len(pattern)`. Continue joining, if the chunk
# ends with a prefix of the pattern.
current_chunk = None
for data_chunk in iterable:
# get the type of current_chunk from the type of this data_chunk
if current_chunk is None:
current_chunk = data_chunk
else:
current_chunk += data_chunk
# we type-ignore the next line, because `pattern_matcher`
# (ie. `Pattern`) only supports a subtype specification ala
# `Pattern[str]` from Python 3.9 onwards. For now we need to
# be compatible with Python 3.8
if len(current_chunk) >= len(pattern) and not (
current_chunk[-1] in pattern
and pattern_matcher.match(current_chunk, len(current_chunk) - pattern_sub) # type: ignore
):
yield current_chunk
current_chunk = None

if current_chunk is not None:
yield current_chunk
142 changes: 142 additions & 0 deletions datasalad/itertools/decode_bytes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""Get strings decoded from chunks of bytes"""

from __future__ import annotations

from typing import (
Generator,
Iterable,
)

__all__ = ['decode_bytes']


def decode_bytes(
iterable: Iterable[bytes],
*,
encoding: str = 'utf-8',
backslash_replace: bool = True,
) -> Generator[str, None, None]:
"""Decode bytes in an ``iterable`` into strings
This function decodes ``bytes`` or ``bytearray`` into ``str`` objects,
using the specified encoding. Importantly, the decoding input can
be spread across multiple chunks of heterogeneous sizes, for example
output read from a process or pieces of a download.
Multi-byte encodings that are spread over multiple byte chunks are
supported, and chunks are joined as necessary. For example, the utf-8
encoding for ö is ``b'\\xc3\\xb6'``. If the encoding is split in the
middle because a chunk ends with ``b'\\xc3'`` and the next chunk starts
with ``b'\\xb6'``, a naive decoding approach like the following would fail:
.. code-block:: python
>>> [chunk.decode() for chunk in [b'\\xc3', b'\\xb6']] # doctest: +SKIP
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 1, in <listcomp>
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 0: unexpected end of data
Compared to:
.. code-block:: python
>>> from datasalad.itertools import decode_bytes
>>> tuple(decode_bytes([b'\\xc3', b'\\xb6']))
('ö',)
Input chunks are only joined, if it is necessary to properly decode bytes:
.. code-block:: python
>>> from datasalad.itertools import decode_bytes
>>> tuple(decode_bytes([b'\\xc3', b'\\xb6', b'a']))
('ö', 'a')
If ``backslash_replace`` is ``True``, undecodable bytes will be
replaced with a backslash-substitution. Otherwise,
undecodable bytes will raise a ``UnicodeDecodeError``:
.. code-block:: python
>>> tuple(decode_bytes([b'\\xc3']))
('\\\\xc3',)
>>> tuple(decode_bytes([b'\\xc3'], backslash_replace=False)) # doctest: +SKIP
Traceback (most recent call last):
...
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 1: invalid continuation byte
Backslash-replacement of undecodable bytes is an ambiguous mapping,
because, for example, ``b'\\xc3'`` can already be present in the input.
Parameters
----------
iterable: Iterable[bytes]
Iterable that yields bytes that should be decoded
encoding: str (default: ``'utf-8'``)
Encoding to be used for decoding.
backslash_replace: bool (default: ``True``)
If ``True``, backslash-escapes are used for undecodable bytes. If
``False``, a ``UnicodeDecodeError`` is raised if a byte sequence cannot
be decoded.
Yields
------
str
Decoded strings that are generated by decoding the data yielded by
``iterable`` with the specified ``encoding``
Raises
------
UnicodeDecodeError
If ``backslash_replace`` is ``False`` and the data yielded by
``iterable`` cannot be decoded with the specified ``encoding``
"""

def handle_decoding_error(
position: int, exc: UnicodeDecodeError
) -> tuple[int, str]:
"""Handle a UnicodeDecodeError"""
if not backslash_replace:
# Signal the error to the caller
raise exc
return (
position + exc.end,
joined_data[: position + exc.start].decode(encoding)
+ joined_data[position + exc.start : position + exc.end].decode(
encoding, errors='backslashreplace'
),
)

joined_data = b''
pending_error = None
position = 0
for chunk in iterable:
joined_data += chunk
while position < len(joined_data):
try:
yield joined_data[position:].decode(encoding)
joined_data = b''
except UnicodeDecodeError as e:
# If an encoding error occurs, we first check whether it was
# in the middle of `joined_data` or whether it extends until the
# end of `joined_data`.
# If it occurred in the middle of
# `joined_data`, we replace it with backslash encoding or
# re-raise the decoding error.
# If it occurred at the end of `joined_data`, we wait for the
# next chunk, which might fix the problem.
if position + e.end == len(joined_data):
# Wait for the next chunk, which might fix the problem
pending_error = e
break
else:
pending_error = None
position, string = handle_decoding_error(position, e)
yield string

if pending_error:
# If the last chunk has a decoding error at the end, process it.
position, string = handle_decoding_error(position, pending_error)
if string:
yield string
Loading

0 comments on commit 1008a0d

Please sign in to comment.