Skip to content

Commit abc1a09

Browse files
committed
Made use of BulkTransactionsClient easily overridable
1 parent e93cfd3 commit abc1a09

File tree

1 file changed

+44
-42
lines changed

1 file changed

+44
-42
lines changed

Diff for: stac_fastapi/sqlalchemy/transactions.py

+44-42
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,48 @@
2121
logger = logging.getLogger(__name__)
2222

2323

24+
@attr.s
25+
class BulkTransactionsClient(BaseBulkTransactionsClient):
26+
"""Postgres bulk transactions."""
27+
28+
session: Session = attr.ib(default=attr.Factory(Session.create_from_env))
29+
debug: bool = attr.ib(default=False)
30+
item_table: Type[database.Item] = attr.ib(default=database.Item)
31+
item_serializer: Type[serializers.Serializer] = attr.ib(
32+
default=serializers.ItemSerializer
33+
)
34+
35+
def __attrs_post_init__(self):
36+
"""Create sqlalchemy engine."""
37+
self.engine = self.session.writer.cached_engine
38+
39+
def _preprocess_item(self, item: stac_types.Item) -> stac_types.Item:
40+
"""Preprocess items to match data model.
41+
42+
# TODO: dedup with GetterDict logic (ref #58)
43+
"""
44+
db_model = self.item_serializer.stac_to_db(item)
45+
return self.item_serializer.row_to_dict(db_model)
46+
47+
def bulk_item_insert(
48+
self, items: Items, chunk_size: Optional[int] = None, **kwargs
49+
) -> str:
50+
"""Bulk item insertion using sqlalchemy core.
51+
52+
https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow
53+
"""
54+
# Use items.items because schemas.Items is a model with an items key
55+
processed_items = [self._preprocess_item(item) for item in items]
56+
return_msg = f"Successfully added {len(processed_items)} items."
57+
if chunk_size:
58+
for chunk in self._chunks(processed_items, chunk_size):
59+
self.engine.execute(self.item_table.__table__.insert(), chunk)
60+
return return_msg
61+
62+
self.engine.execute(self.item_table.__table__.insert(), processed_items)
63+
return return_msg
64+
65+
2466
@attr.s
2567
class TransactionsClient(BaseTransactionsClient):
2668
"""Transactions extension specific CRUD operations."""
@@ -34,6 +76,7 @@ class TransactionsClient(BaseTransactionsClient):
3476
collection_serializer: Type[serializers.Serializer] = attr.ib(
3577
default=serializers.CollectionSerializer
3678
)
79+
bulk_client_cls = attr.ib(default=BulkTransactionsClient)
3780

3881
def create_item(
3982
self,
@@ -46,7 +89,7 @@ def create_item(
4689

4790
# If a feature collection is posted
4891
if item["type"] == "FeatureCollection":
49-
bulk_client = BulkTransactionsClient(session=self.session)
92+
bulk_client = self.bulk_client_cls(session=self.session)
5093
bulk_client.bulk_item_insert(items=item["features"])
5194
return None
5295

@@ -158,44 +201,3 @@ def delete_collection(
158201
query.delete()
159202
return self.collection_serializer.db_to_stac(data, base_url=base_url)
160203

161-
162-
@attr.s
163-
class BulkTransactionsClient(BaseBulkTransactionsClient):
164-
"""Postgres bulk transactions."""
165-
166-
session: Session = attr.ib(default=attr.Factory(Session.create_from_env))
167-
debug: bool = attr.ib(default=False)
168-
item_table: Type[database.Item] = attr.ib(default=database.Item)
169-
item_serializer: Type[serializers.Serializer] = attr.ib(
170-
default=serializers.ItemSerializer
171-
)
172-
173-
def __attrs_post_init__(self):
174-
"""Create sqlalchemy engine."""
175-
self.engine = self.session.writer.cached_engine
176-
177-
def _preprocess_item(self, item: stac_types.Item) -> stac_types.Item:
178-
"""Preprocess items to match data model.
179-
180-
# TODO: dedup with GetterDict logic (ref #58)
181-
"""
182-
db_model = self.item_serializer.stac_to_db(item)
183-
return self.item_serializer.row_to_dict(db_model)
184-
185-
def bulk_item_insert(
186-
self, items: Items, chunk_size: Optional[int] = None, **kwargs
187-
) -> str:
188-
"""Bulk item insertion using sqlalchemy core.
189-
190-
https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow
191-
"""
192-
# Use items.items because schemas.Items is a model with an items key
193-
processed_items = [self._preprocess_item(item) for item in items]
194-
return_msg = f"Successfully added {len(processed_items)} items."
195-
if chunk_size:
196-
for chunk in self._chunks(processed_items, chunk_size):
197-
self.engine.execute(self.item_table.__table__.insert(), chunk)
198-
return return_msg
199-
200-
self.engine.execute(self.item_table.__table__.insert(), processed_items)
201-
return return_msg

0 commit comments

Comments
 (0)