Skip to content

Commit ea80482

Browse files
cshiels-ieclaude
andcommitted
[AAP-73135] Flush batches before exceeding Segment's 500KB batch limit
Segment silently drops events from batch POSTs that exceed 500KB and returns HTTP 200, making the loss invisible to on_error callbacks. Fix by tracking accumulated batch size and calling flush() before adding a chunk that would push the batch over 450KB (leaving headroom for the per-event metadata the SDK appends). Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
1 parent 3ecfaf2 commit ea80482

1 file changed

Lines changed: 17 additions & 3 deletions

File tree

metrics_utility/library/storage/segment.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ class StorageSegment:
2121
# including `properties` wrapper, event name, and segment_meta; keep this conservative.
2222
REGULAR_MESSAGE_LIMIT = 24 * 1024
2323

24+
# Segment enforces a 500KB limit per batch POST. We flush before reaching it,
25+
# leaving headroom for per-event metadata the SDK adds (anonymousId, timestamp,
26+
# context, etc.). EVENT_OVERHEAD is a conservative per-event estimate.
27+
BATCH_SIZE_LIMIT = 450 * 1024
28+
EVENT_OVERHEAD = 512
29+
2430
def __init__(self, **settings):
2531
self.debug = settings.get('debug', False)
2632
self.user_id = settings.get('user_id', 'unknown')
@@ -130,7 +136,6 @@ def put(self, artifact_name, *, filename=None, fileobj=None, dict=None, event_na
130136
# Configure Segment client
131137
analytics.write_key = self.write_key
132138
analytics.debug = self.debug
133-
analytics.sync_mode = True
134139

135140
max_size = self.REGULAR_MESSAGE_LIMIT
136141
chunks = self._split_into_chunks(dict, max_size)
@@ -145,9 +150,17 @@ def put(self, artifact_name, *, filename=None, fileobj=None, dict=None, event_na
145150
segment_meta = {}
146151
message_id = segment_meta.get('message_id', None)
147152

148-
# Send each chunk
153+
# Send each chunk, flushing before the batch would exceed Segment's 500KB limit
154+
batch_bytes = 0
149155
for i, chunk in enumerate(chunks, 1):
150156
chunk_size = self._calculate_size(chunk)
157+
event_size = chunk_size + self.EVENT_OVERHEAD
158+
159+
if batch_bytes + event_size > self.BATCH_SIZE_LIMIT and batch_bytes > 0:
160+
if self.debug:
161+
print(f'Flushing batch at {batch_bytes} bytes before adding chunk {i}', file=sys.stderr)
162+
analytics.flush()
163+
batch_bytes = 0
151164

152165
# chunk hash = sha256(message hash + chunk index)
153166
if message_id:
@@ -174,8 +187,9 @@ def put(self, artifact_name, *, filename=None, fileobj=None, dict=None, event_na
174187
},
175188
**segment_meta,
176189
)
190+
batch_bytes += event_size
177191

178-
# Flush to ensure all events are sent
192+
# Flush remaining queued events
179193
analytics.flush()
180194

181195
return chunks

0 commit comments

Comments
 (0)