diff --git a/server/data_stream.py b/server/data_stream.py index bf2c868..8562bec 100644 --- a/server/data_stream.py +++ b/server/data_stream.py @@ -1,6 +1,7 @@ import logging from collections import defaultdict + from atproto import AtUri, CAR, firehose_models, FirehoseSubscribeReposClient, models, parse_subscribe_repos_message from atproto.exceptions import FirehoseError @@ -69,9 +70,6 @@ def _run(name, operations_callback, stream_stop_event=None): client = FirehoseSubscribeReposClient(params) - if not state: - SubscriptionState.create(service=name, cursor=0) - def on_message_handler(message: firehose_models.MessageFrame) -> None: # stop on next message if requested if stream_stop_event and stream_stop_event.is_set(): @@ -86,7 +84,11 @@ def on_message_handler(message: firehose_models.MessageFrame) -> None: if commit.seq % 1000 == 0: # lower value could lead to performance issues logger.debug(f'Updated cursor for {name} to {commit.seq}') client.update_params(models.ComAtprotoSyncSubscribeRepos.Params(cursor=commit.seq)) - SubscriptionState.update(cursor=commit.seq).where(SubscriptionState.service == name).execute() + SubscriptionState.insert(service=name, cursor=commit.seq).on_conflict( + conflict_target=(SubscriptionState.service,), + action='UPDATE', + update={SubscriptionState.cursor: commit.seq}, + ).execute() if not commit.blocks: return