Skip to content

Commit

Permalink
Merge branch 'main' of github.com:stac-utils/pgstac into main
Browse files Browse the repository at this point in the history
  • Loading branch information
bitner committed Aug 11, 2021
2 parents c9f0f64 + de26c36 commit fc80333
Show file tree
Hide file tree
Showing 30 changed files with 5,032 additions and 1,025 deletions.
5 changes: 4 additions & 1 deletion pgstac.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
BEGIN;
\i sql/001_core.sql
\i sql/001a_jsonutils.sql
\i sql/001b_cursorutils.sql
\i sql/001s_stacutils.sql
\i sql/002_collections.sql
\i sql/003_items.sql
\i sql/004_search.sql
\i sql/999_version.sql
COMMIT;
COMMIT;
29 changes: 28 additions & 1 deletion pypgstac/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pypgstac/pypgstac/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""PyPGStac Version."""
__version__ = "0.2.9"
__version__ = "0.3.0"
137 changes: 79 additions & 58 deletions pypgstac/pypgstac/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,36 @@ async def copy(iter: T, table: tables, conn: asyncpg.Connection) -> None:
"""Directly use copy to load data."""
bytes_iter = aiter(iter)
async with conn.transaction():
await conn.copy_to_table(
table,
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)
await conn.execute(
if table == "collections":
await conn.execute(
"""
CREATE TEMP TABLE pgstactemp (content jsonb)
ON COMMIT DROP;
"""
SELECT backfill_partitions();
"""
)
)
await conn.copy_to_table(
"pgstactemp",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)
await conn.execute(
"""
INSERT INTO collections (content)
SELECT content FROM pgstactemp;
"""
)
if table == "items":
await conn.copy_to_table(
"items_staging",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)


async def copy_ignore_duplicates(
Expand All @@ -130,56 +147,58 @@ async def copy_ignore_duplicates(
"""Load data first into a temp table to ignore duplicates."""
bytes_iter = aiter(iter)
async with conn.transaction():
await conn.execute(
if table == "collections":
await conn.execute(
"""
CREATE TEMP TABLE pgstactemp (content jsonb)
ON COMMIT DROP;
"""
CREATE TEMP TABLE pgstactemp (content jsonb)
ON COMMIT DROP;
"""
)
await conn.copy_to_table(
"pgstactemp",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)
await conn.execute(
)
await conn.copy_to_table(
"pgstactemp",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)
await conn.execute(
"""
INSERT INTO collections (content)
SELECT content FROM pgstactemp
ON CONFLICT DO NOTHING;
"""
SELECT make_partitions(
min((content->>'datetime')::timestamptz),
max((content->>'datetime')::timestamptz)
) FROM pgstactemp;
"""
)
await conn.execute(
f"""
INSERT INTO {table} (content)
SELECT content FROM pgstactemp
ON CONFLICT DO NOTHING;
"""
)
)
if table == "items":
await conn.copy_to_table(
"items_staging_ignore",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)


async def copy_upsert(iter: T, table: tables, conn: asyncpg.Connection) -> None:
"""Insert data into a temp table to be able merge data."""
bytes_iter = aiter(iter)
async with conn.transaction():
await conn.execute(
"""
CREATE TEMP TABLE pgstactemp (content jsonb)
ON COMMIT DROP;
"""
)
await conn.copy_to_table(
"pgstactemp",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)
if table == "collections":
await conn.execute(
"""
CREATE TEMP TABLE pgstactemp (content jsonb)
ON COMMIT DROP;
"""
)
await conn.copy_to_table(
"pgstactemp",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)
await conn.execute(
"""
INSERT INTO collections (content)
Expand All @@ -190,11 +209,13 @@ async def copy_upsert(iter: T, table: tables, conn: asyncpg.Connection) -> None:
"""
)
if table == "items":
await conn.execute(
"""
SELECT upsert_item(content)
FROM pgstactemp;
"""
await conn.copy_to_table(
"items_staging_upsert",
source=bytes_iter,
columns=["content"],
format="csv",
quote=chr(27),
delimiter=chr(31),
)


Expand Down
Loading

0 comments on commit fc80333

Please sign in to comment.