Skip to content

Commit

Permalink
Merge pull request #1738 from dandi/streaming-upload
Browse files Browse the repository at this point in the history
Clean up and improve the performance of manifest file creation
  • Loading branch information
brianhelba authored Nov 9, 2023
2 parents f31367b + 1296df7 commit cf99632
Showing 1 changed file with 46 additions and 54 deletions.
100 changes: 46 additions & 54 deletions dandiapi/api/manifests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from contextlib import contextmanager
import os
import tempfile
from typing import IO, Any, Generator, Iterable
from urllib.parse import urlparse, urlunparse

from django.conf import settings
Expand All @@ -12,7 +12,7 @@
from dandiapi.api.storage import create_s3_storage


def s3_url(path: str):
def _s3_url(path: str) -> str:
"""Turn an object path into a fully qualified S3 URL."""
storage = create_s3_storage(settings.DANDI_DANDISETS_BUCKET_NAME)
signed_url = storage.url(path)
Expand All @@ -22,14 +22,14 @@ def s3_url(path: str):
return s3_url


def _manifests_path(version: Version):
def _manifests_path(version: Version) -> str:
return (
f'{settings.DANDI_DANDISETS_BUCKET_PREFIX}'
f'dandisets/{version.dandiset.identifier}/{version.version}'
)


def manifest_location(version: Version):
def manifest_location(version: Version) -> list[str]:
"""Calculate the manifestLocation field for a Version."""
if version.version == 'draft':
return [
Expand All @@ -38,87 +38,81 @@ def manifest_location(version: Version):
f'/versions/draft/assets/'
)
]
return [s3_url(assets_yaml_path(version))]
return [_s3_url(_assets_yaml_path(version))]


def dandiset_jsonld_path(version: Version):
def _dandiset_jsonld_path(version: Version) -> str:
return f'{_manifests_path(version)}/dandiset.jsonld'


def assets_jsonld_path(version: Version):
def _assets_jsonld_path(version: Version) -> str:
return f'{_manifests_path(version)}/assets.jsonld'


def dandiset_yaml_path(version: Version):
def _dandiset_yaml_path(version: Version) -> str:
return f'{_manifests_path(version)}/dandiset.yaml'


def assets_yaml_path(version: Version):
def _assets_yaml_path(version: Version) -> str:
return f'{_manifests_path(version)}/assets.yaml'


def collection_jsonld_path(version: Version):
def _collection_jsonld_path(version: Version) -> str:
return f'{_manifests_path(version)}/collection.jsonld'


@contextmanager
def streaming_file_upload(path: str, mode: str = 'w'):
temp_file_name = None

try:
with tempfile.NamedTemporaryFile(mode=mode, delete=False) as outfile:
temp_file_name = outfile.name
yield outfile
def _streaming_file_upload(path: str) -> Generator[IO[bytes], None, None]:
with tempfile.NamedTemporaryFile(mode='r+b') as outfile:
yield outfile
outfile.seek(0)

# Piggyback on the AssetBlob storage since we want to store manifests in the same bucket
storage = AssetBlob.blob.field.storage
with open(temp_file_name, 'rb') as temp_file:
storage._save(path, File(temp_file))
finally:
if temp_file_name:
os.remove(temp_file_name)
storage._save(path, File(outfile))


def write_dandiset_jsonld(version: Version):
with streaming_file_upload(dandiset_jsonld_path(version)) as stream:
stream.write(JSONRenderer().render(version.metadata).decode())
def _yaml_dump_sequence_from_generator(stream: IO[bytes], generator: Iterable[Any]) -> None:
for obj in generator:
for i, line in enumerate(
yaml.dump(
obj, encoding='utf-8', Dumper=yaml.CSafeDumper, allow_unicode=True
).splitlines()
):
stream.write(b'- ' if i == 0 else b' ')
stream.write(line)
stream.write(b'\n')


def write_assets_jsonld(version: Version):
def write_dandiset_jsonld(version: Version) -> None:
with _streaming_file_upload(_dandiset_jsonld_path(version)) as stream:
stream.write(JSONRenderer().render(version.metadata))


def write_assets_jsonld(version: Version) -> None:
# Use full metadata when writing externally
assets_metadata = (
asset.full_metadata for asset in version.assets.select_related('blob', 'zarr').iterator()
)
with streaming_file_upload(assets_jsonld_path(version)) as stream:
stream.write('[')
with _streaming_file_upload(_assets_jsonld_path(version)) as stream:
stream.write(b'[')
for i, obj in enumerate(assets_metadata):
if i > 0:
stream.write(',')
stream.write(JSONRenderer().render(obj).decode())

stream.write(']')
stream.write(b',')
stream.write(JSONRenderer().render(obj))

stream.write(b']')

def write_dandiset_yaml(version: Version):
with streaming_file_upload(dandiset_yaml_path(version)) as stream:
yaml.dump(version.metadata, stream, Dumper=yaml.CSafeDumper, allow_unicode=True)


def _yaml_dump_sequence_from_generator(stream, generator):
for obj in generator:
for i, line in enumerate(
yaml.dump(obj, Dumper=yaml.CSafeDumper, allow_unicode=True).splitlines()
):
if i == 0:
prefix = '- '
else:
prefix = ' '

stream.write(f'{prefix}{line}\n')
def write_dandiset_yaml(version: Version) -> None:
with _streaming_file_upload(_dandiset_yaml_path(version)) as stream:
yaml.dump(
version.metadata, stream, encoding='utf-8', Dumper=yaml.CSafeDumper, allow_unicode=True
)


def write_assets_yaml(version: Version):
with streaming_file_upload(assets_yaml_path(version)) as stream:
def write_assets_yaml(version: Version) -> None:
with _streaming_file_upload(_assets_yaml_path(version)) as stream:
_yaml_dump_sequence_from_generator(
stream,
# Use full metadata when writing externally
Expand All @@ -131,21 +125,19 @@ def write_assets_yaml(version: Version):
)


def write_collection_jsonld(version: Version):
def write_collection_jsonld(version: Version) -> None:
asset_ids = [
Asset.dandi_asset_id(asset_id)
for asset_id in version.assets.values_list('asset_id', flat=True)
]
with streaming_file_upload(collection_jsonld_path(version)) as stream:
with _streaming_file_upload(_collection_jsonld_path(version)) as stream:
stream.write(
JSONRenderer()
.render(
JSONRenderer().render(
{
'@context': version.metadata['@context'],
'id': version.metadata['id'],
'@type': 'prov:Collection',
'hasMember': asset_ids,
},
)
.decode()
)

0 comments on commit cf99632

Please sign in to comment.