Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[python] Min-sizing for dataframes/arrays [no merge] #3189

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 54 additions & 28 deletions apis/python/src/tiledbsoma/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ def create(
)

(slot_core_current_domain, saturated_cd) = _fill_out_slot_soma_domain(
slot_soma_domain, index_column_name, pa_field.type, dtype
slot_soma_domain, False, index_column_name, pa_field.type, dtype
)
(slot_core_max_domain, saturated_md) = _fill_out_slot_soma_domain(
None, index_column_name, pa_field.type, dtype
None, True, index_column_name, pa_field.type, dtype
)

extent = _find_extent_for_domain(
Expand Down Expand Up @@ -824,6 +824,7 @@ def _canonicalize_schema(

def _fill_out_slot_soma_domain(
slot_domain: AxisDomain,
is_max_domain: bool,
index_column_name: str,
pa_type: pa.DataType,
dtype: Any,
Expand Down Expand Up @@ -873,17 +874,30 @@ def _fill_out_slot_soma_domain(
# will (and must) ignore these when creating the TileDB schema.
slot_domain = "", ""
elif np.issubdtype(dtype, NPInteger):
iinfo = np.iinfo(cast(NPInteger, dtype))
slot_domain = iinfo.min, iinfo.max - 1
# Here the slot_domain isn't specified by the user; we're setting it.
# The SOMA spec disallows negative soma_joinid.
if index_column_name == SOMA_JOINID:
slot_domain = (0, 2**63 - 2)
saturated_range = True
if is_max_domain or not NEW_SHAPE_FEATURE_FLAG_ENABLED:
# Core max domain is immutable. If unspecified, it should be as big
# as possible since it can never be resized.
iinfo = np.iinfo(cast(NPInteger, dtype))
slot_domain = iinfo.min, iinfo.max - 1
# Here the slot_domain isn't specified by the user; we're setting it.
# The SOMA spec disallows negative soma_joinid.
if index_column_name == SOMA_JOINID:
slot_domain = (0, 2**63 - 2)
saturated_range = True
else:
# Core current domain is mutable but not shrinkable. If unspecified,
# it should be as small as possible since it can only be grown, not shrunk.
#
# Also: core current domain can't be (0, -1): only (0, 0). Yes, this
# does mean that "smallest" current domain has shape 1 not 0.
slot_domain = 0, 0
elif np.issubdtype(dtype, NPFloating):
finfo = np.finfo(cast(NPFloating, dtype))
slot_domain = finfo.min, finfo.max
saturated_range = True
if is_max_domain or not NEW_SHAPE_FEATURE_FLAG_ENABLED:
finfo = np.finfo(cast(NPFloating, dtype))
slot_domain = finfo.min, finfo.max
saturated_range = True
else:
slot_domain = 0.0, 0.0

# The `iinfo.min+1` is necessary as of tiledb core 2.15 / tiledb-py 0.21.1 since
# `iinfo.min` maps to `NaT` (not a time), resulting in
Expand All @@ -895,25 +909,37 @@ def _fill_out_slot_soma_domain(
# expanded to multiple of tile extent exceeds max value representable by domain type. Reduce
# domain max by 1 tile extent to allow for expansion.
elif dtype == "datetime64[s]":
iinfo = np.iinfo(cast(NPInteger, np.int64))
slot_domain = np.datetime64(iinfo.min + 1, "s"), np.datetime64(
iinfo.max - 1000000, "s"
)
if is_max_domain or not NEW_SHAPE_FEATURE_FLAG_ENABLED:
iinfo = np.iinfo(cast(NPInteger, np.int64))
slot_domain = np.datetime64(iinfo.min + 1, "s"), np.datetime64(
iinfo.max - 1000000, "s"
)
else:
slot_domain = np.datetime64(0, "s"), np.datetime64(0, "s")
elif dtype == "datetime64[ms]":
iinfo = np.iinfo(cast(NPInteger, np.int64))
slot_domain = np.datetime64(iinfo.min + 1, "ms"), np.datetime64(
iinfo.max - 1000000, "ms"
)
if is_max_domain or not NEW_SHAPE_FEATURE_FLAG_ENABLED:
iinfo = np.iinfo(cast(NPInteger, np.int64))
slot_domain = np.datetime64(iinfo.min + 1, "ms"), np.datetime64(
iinfo.max - 1000000, "ms"
)
else:
slot_domain = np.datetime64(0, "ms"), np.datetime64(0, "ms")
elif dtype == "datetime64[us]":
iinfo = np.iinfo(cast(NPInteger, np.int64))
slot_domain = np.datetime64(iinfo.min + 1, "us"), np.datetime64(
iinfo.max - 1000000, "us"
)
if is_max_domain or not NEW_SHAPE_FEATURE_FLAG_ENABLED:
iinfo = np.iinfo(cast(NPInteger, np.int64))
slot_domain = np.datetime64(iinfo.min + 1, "us"), np.datetime64(
iinfo.max - 1000000, "us"
)
else:
slot_domain = np.datetime64(0, "us"), np.datetime64(0, "us")
elif dtype == "datetime64[ns]":
iinfo = np.iinfo(cast(NPInteger, np.int64))
slot_domain = np.datetime64(iinfo.min + 1, "ns"), np.datetime64(
iinfo.max - 1000000, "ns"
)
if is_max_domain or not NEW_SHAPE_FEATURE_FLAG_ENABLED:
iinfo = np.iinfo(cast(NPInteger, np.int64))
slot_domain = np.datetime64(iinfo.min + 1, "ns"), np.datetime64(
iinfo.max - 1000000, "ns"
)
else:
slot_domain = np.datetime64(0, "ns"), np.datetime64(0, "ns")

else:
raise TypeError(f"Unsupported dtype {dtype}")
Expand Down
3 changes: 2 additions & 1 deletion apis/python/src/tiledbsoma/_dense_nd_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ def create(
if dim_shape == 0:
raise ValueError("DenseNDArray shape slots must be at least 1")
if dim_shape is None:
dim_shape = dim_capacity
# XXX COMMENT dim_shape = dim_capacity
dim_shape = 1

index_column_data[pa_field.name] = [
0,
Expand Down
4 changes: 2 additions & 2 deletions apis/python/src/tiledbsoma/_point_cloud_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,10 @@ def create(
)

(slot_core_current_domain, saturated_cd) = _fill_out_slot_soma_domain(
slot_soma_domain, index_column_name, pa_field.type, dtype
slot_soma_domain, False, index_column_name, pa_field.type, dtype
)
(slot_core_max_domain, saturated_md) = _fill_out_slot_soma_domain(
None, index_column_name, pa_field.type, dtype
None, True, index_column_name, pa_field.type, dtype
)

extent = _find_extent_for_domain(
Expand Down
3 changes: 2 additions & 1 deletion apis/python/src/tiledbsoma/_sparse_nd_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ def create(
if dim_shape == 0:
raise ValueError("SparseNDArray shape slots must be at least 1")
if dim_shape is None:
dim_shape = dim_capacity
# XXX dim_shape = dim_capacity
dim_shape = 1

index_column_data[pa_field.name] = [
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ def from_isolated_dataframe(
next_soma_joinid += 1
return cls(data=data, field_name=index_field_name)

def get_shape(self) -> int:
if len(self.data.values()) == 0:
return 0
else:
return 1 + max(self.data.values())

def to_json(self) -> str:
return json.dumps(self, default=attrs.asdict, sort_keys=True, indent=4)

Expand Down Expand Up @@ -490,20 +496,15 @@ def get_obs_shape(self) -> int:
"""Reports the new obs shape which the experiment will need to be
resized to in order to accommodate the data contained within the
registration."""
if len(self.obs_axis.data.values()) == 0:
return 0
return 1 + max(self.obs_axis.data.values())
return self.obs_axis.get_shape()

def get_var_shapes(self) -> Dict[str, int]:
"""Reports the new var shapes, one per measurement, which the experiment
will need to be resized to in order to accommodate the data contained
within the registration."""
retval: Dict[str, int] = {}
for key, axis in self.var_axes.items():
if len(axis.data.values()) == 0:
retval[key] = 0
else:
retval[key] = 1 + max(axis.data.values())
retval[key] = axis.get_shape()
return retval

def to_json(self) -> str:
Expand Down
6 changes: 6 additions & 0 deletions apis/python/src/tiledbsoma/io/_registration/id_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ def is_identity(self) -> bool:
return False
return True

def get_shape(self) -> int:
if len(self.data) == 0:
return 0
else:
return 1 + max(self.data)

@classmethod
def identity(cls, n: int) -> Self:
"""This maps 0-up input-file offsets to 0-up soma_joinid values. This is
Expand Down
22 changes: 21 additions & 1 deletion apis/python/src/tiledbsoma/io/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
NotCreateableError,
SOMAError,
)
from .._flags import NEW_SHAPE_FEATURE_FLAG_ENABLED
from .._soma_array import SOMAArray
from .._soma_object import AnySOMAObject, SOMAObject
from .._tdb_handles import RawHandle
Expand Down Expand Up @@ -1164,6 +1165,7 @@ def _write_dataframe(
df,
df_uri,
id_column_name,
shape=axis_mapping.get_shape(),
ingestion_params=ingestion_params,
additional_metadata=additional_metadata,
original_index_metadata=original_index_metadata,
Expand All @@ -1177,6 +1179,7 @@ def _write_dataframe_impl(
df_uri: str,
id_column_name: Optional[str],
*,
shape: int,
ingestion_params: IngestionParams,
additional_metadata: AdditionalMetadata = None,
original_index_metadata: OriginalIndexMetadata = None,
Expand All @@ -1203,9 +1206,13 @@ def _write_dataframe_impl(
arrow_table = _extract_new_values_for_append(df_uri, arrow_table, context)

try:
domain = None
if NEW_SHAPE_FEATURE_FLAG_ENABLED:
domain = ((0, shape - 1),)
soma_df = DataFrame.create(
df_uri,
schema=arrow_table.schema,
domain=domain,
platform_config=platform_config,
context=context,
)
Expand Down Expand Up @@ -1304,8 +1311,19 @@ def _create_from_matrix(
logging.log_io(None, f"START WRITING {uri}")

try:
shape: Sequence[Union[int, None]] = ()
# A SparseNDArray must be appendable in soma.io.
shape = [None for _ in matrix.shape] if cls.is_sparse else matrix.shape
if NEW_SHAPE_FEATURE_FLAG_ENABLED:
# Instead of
# shape = tuple(int(e) for e in matrix.shape)
# we consult the registration mapping. This is important
# in the case when multiple H5ADs/AnnDatas are being
# ingested to an experiment which doesn't pre-exist.
shape = (axis_0_mapping.get_shape(), axis_1_mapping.get_shape())
elif cls.is_sparse:
shape = tuple(None for _ in matrix.shape)
else:
shape = matrix.shape
soma_ndarray = cls.create(
uri,
type=pa.from_numpy_dtype(matrix.dtype),
Expand Down Expand Up @@ -2711,6 +2729,7 @@ def _ingest_uns_1d_string_array(
df,
df_uri,
None,
shape=df.shape[0],
ingestion_params=ingestion_params,
platform_config=platform_config,
context=context,
Expand Down Expand Up @@ -2756,6 +2775,7 @@ def _ingest_uns_2d_string_array(
df,
df_uri,
None,
shape=df.shape[0],
ingestion_params=ingestion_params,
additional_metadata=additional_metadata,
platform_config=platform_config,
Expand Down
20 changes: 11 additions & 9 deletions apis/python/src/tiledbsoma/io/shaping.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,18 @@ def resize_experiment(
output_handle=output_handle,
)

# Do an early check on the nvars keys vs the experiment's
# measurent names. This isn't a can-do status for the experiment;
# it's a failure of the user's arguments.
# Extra user-provided keys not relevant to the experiment are ignored. This
# is important for the case when a new measurement, which is registered from
# AnnData/H5AD inputs, is registered and is about to be created but does not
# exist just yet in the experiment storage.
#
# If the user hasn't provided a key -- e.g. a from-anndata-append-with-resize
# on one measurement while the experiment's other measurements aren't being
# updated -- then we need to find those other measurements' var-shapes.
with tiledbsoma.Experiment.open(uri) as exp:
arg_keys = sorted(nvars.keys())
ms_keys = sorted(exp.ms.keys())
if arg_keys != ms_keys:
raise ValueError(
f"resize_experiment: provided nvar keys {arg_keys} do not match experiment keys {ms_keys}"
)
for ms_key in exp.ms.keys():
if ms_key not in nvars.keys():
nvars[ms_key] = exp.ms[ms_key].var._maybe_soma_joinid_shape or 1

ok = _treewalk(
uri,
Expand Down
5 changes: 5 additions & 0 deletions apis/python/tests/test_basic_anndata_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,11 @@ def test_nan_append(conftest_pbmc_small, dtype, nans, new_obs_ids):
var_field_name="var_id",
)

if tiledbsoma._flags.NEW_SHAPE_FEATURE_FLAG_ENABLED:
nobs = rd.get_obs_shape()
nvars = rd.get_var_shapes()
tiledbsoma.io.resize_experiment(SOMA_URI, nobs=nobs, nvars=nvars)

# Append the second anndata object
tiledbsoma.io.from_anndata(
experiment_uri=SOMA_URI,
Expand Down
2 changes: 1 addition & 1 deletion apis/python/tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def create_and_populate_dataframe(path: str) -> soma.DataFrame:
]
)

with soma.DataFrame.create(path, schema=arrow_schema) as df:
with soma.DataFrame.create(path, schema=arrow_schema, domain=[[0, 999]]) as df:
pydict = {}
pydict["soma_joinid"] = [0, 1, 2, 3, 4]
pydict["foo"] = [10, 20, 30, 40, 50]
Expand Down
Loading