Skip to content

Commit

Permalink
CQL Search (#38)
Browse files Browse the repository at this point in the history
* bump version to 0.3.0

* Add option to skip incremental migration check to test scripts

* Add SQL script to bypass pypgstac and directly reset / load pgstac sql and sample data

* Add SQL to use CQL for search and to use cursor for paging through results

* Split test files to match location of code each section is testing. Modify test data with ids that can be identified as testdata and to have attributes to test more filters

* Adjust layout of items table to allow better use of partition pruning, start/end dates and updated properties

* fix issue with quoting in sql generation, don't pregenerate all cursors
  • Loading branch information
bitner authored Aug 11, 2021
1 parent b353a2e commit de26c36
Show file tree
Hide file tree
Showing 30 changed files with 5,032 additions and 1,025 deletions.
5 changes: 4 additions & 1 deletion pgstac.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
BEGIN;
\i sql/001_core.sql
\i sql/001a_jsonutils.sql
\i sql/001b_cursorutils.sql
\i sql/001s_stacutils.sql
\i sql/002_collections.sql
\i sql/003_items.sql
\i sql/004_search.sql
\i sql/999_version.sql
COMMIT;
COMMIT;
29 changes: 28 additions & 1 deletion pypgstac/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pypgstac/pypgstac/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""PyPGStac Version."""
__version__ = "0.2.9"
__version__ = "0.3.0"
137 changes: 79 additions & 58 deletions pypgstac/pypgstac/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,36 @@ async def copy(iter: T, table: tables, conn: asyncpg.Connection) -> None:
"""Directly use copy to load data."""
bytes_iter = aiter(iter)
async with conn.transaction():
await conn.copy_to_table(
table,
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)
await conn.execute(
if table == "collections":
await conn.execute(
"""
CREATE TEMP TABLE pgstactemp (content jsonb)
ON COMMIT DROP;
"""
SELECT backfill_partitions();
"""
)
)
await conn.copy_to_table(
"pgstactemp",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)
await conn.execute(
"""
INSERT INTO collections (content)
SELECT content FROM pgstactemp;
"""
)
if table == "items":
await conn.copy_to_table(
"items_staging",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)


async def copy_ignore_duplicates(
Expand All @@ -130,56 +147,58 @@ async def copy_ignore_duplicates(
"""Load data first into a temp table to ignore duplicates."""
bytes_iter = aiter(iter)
async with conn.transaction():
await conn.execute(
if table == "collections":
await conn.execute(
"""
CREATE TEMP TABLE pgstactemp (content jsonb)
ON COMMIT DROP;
"""
CREATE TEMP TABLE pgstactemp (content jsonb)
ON COMMIT DROP;
"""
)
await conn.copy_to_table(
"pgstactemp",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)
await conn.execute(
)
await conn.copy_to_table(
"pgstactemp",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)
await conn.execute(
"""
INSERT INTO collections (content)
SELECT content FROM pgstactemp
ON CONFLICT DO NOTHING;
"""
SELECT make_partitions(
min((content->>'datetime')::timestamptz),
max((content->>'datetime')::timestamptz)
) FROM pgstactemp;
"""
)
await conn.execute(
f"""
INSERT INTO {table} (content)
SELECT content FROM pgstactemp
ON CONFLICT DO NOTHING;
"""
)
)
if table == "items":
await conn.copy_to_table(
"items_staging_ignore",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)


async def copy_upsert(iter: T, table: tables, conn: asyncpg.Connection) -> None:
"""Insert data into a temp table to be able merge data."""
bytes_iter = aiter(iter)
async with conn.transaction():
await conn.execute(
"""
CREATE TEMP TABLE pgstactemp (content jsonb)
ON COMMIT DROP;
"""
)
await conn.copy_to_table(
"pgstactemp",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)
if table == "collections":
await conn.execute(
"""
CREATE TEMP TABLE pgstactemp (content jsonb)
ON COMMIT DROP;
"""
)
await conn.copy_to_table(
"pgstactemp",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)
await conn.execute(
"""
INSERT INTO collections (content)
Expand All @@ -190,11 +209,13 @@ async def copy_upsert(iter: T, table: tables, conn: asyncpg.Connection) -> None:
"""
)
if table == "items":
await conn.execute(
"""
SELECT upsert_item(content)
FROM pgstactemp;
"""
await conn.copy_to_table(
"items_staging_upsert",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)


Expand Down
Loading

0 comments on commit de26c36

Please sign in to comment.