Skip to content

Commit af8c447

Browse files
committed
fix potential cursor issue
omit unrelates lints
1 parent d8b155d commit af8c447

File tree

1 file changed

+6
-4
lines changed

1 file changed

+6
-4
lines changed

server/data_stream.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
from collections import defaultdict
33

4+
45
from atproto import AtUri, CAR, firehose_models, FirehoseSubscribeReposClient, models, parse_subscribe_repos_message
56
from atproto.exceptions import FirehoseError
67

@@ -69,9 +70,6 @@ def _run(name, operations_callback, stream_stop_event=None):
6970

7071
client = FirehoseSubscribeReposClient(params)
7172

72-
if not state:
73-
SubscriptionState.create(service=name, cursor=0)
74-
7573
def on_message_handler(message: firehose_models.MessageFrame) -> None:
7674
# stop on next message if requested
7775
if stream_stop_event and stream_stop_event.is_set():
@@ -86,7 +84,11 @@ def on_message_handler(message: firehose_models.MessageFrame) -> None:
8684
if commit.seq % 1000 == 0: # lower value could lead to performance issues
8785
logger.debug(f'Updated cursor for {name} to {commit.seq}')
8886
client.update_params(models.ComAtprotoSyncSubscribeRepos.Params(cursor=commit.seq))
89-
SubscriptionState.update(cursor=commit.seq).where(SubscriptionState.service == name).execute()
87+
SubscriptionState.insert(service=name, cursor=commit.seq).on_conflict(
88+
conflict_target=(SubscriptionState.service,),
89+
action='UPDATE',
90+
update={SubscriptionState.cursor: commit.seq},
91+
).execute()
9092

9193
if not commit.blocks:
9294
return

0 commit comments

Comments
 (0)