Skip to content

Commit a0799d0

Browse files
authored
update reader timeout setting in subscription (#1387)
1 parent 722fc61 commit a0799d0

File tree

2 files changed

+10
-15
lines changed

2 files changed

+10
-15
lines changed

hstream/src/HStream/Server/Core/Subscription.hs

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,9 @@ doSubInit ServerContext{..} subId = do
352352
-- create a ldReader for rereading unacked records
353353
Log.info $ "Create a reader for " <> Log.build subId
354354
reader <- S.newLDReader scLDClient maxReadLogs (Just ldReaderBufferSize)
355-
S.readerSetTimeout reader 10 -- 10 milliseconds
355+
-- reader reads the data and delivers it immediately, otherwise it waits up to 1s
356+
S.readerSetTimeout reader 1000 -- 1 seconds
357+
S.readerSetWaitOnlyWhenNoData reader
356358
ldReader <- newMVar reader
357359

358360
trimCkpWorker <- startCompactedWorker (60 * 1000000){- 60s -} $ do
@@ -732,11 +734,12 @@ sendRecords ServerContext{..} subState subCtx@SubscribeContext {..} = do
732734
<> ", logId=" <> Log.build logId <> ", batchId=" <> Log.build batchId
733735
dataRecord <- withMVar subLdReader $ \reader -> do
734736
S.readerStartReading reader logId batchId batchId
735-
readLoop reader 3
736-
if length dataRecord /= 1
737+
S.readerRead reader 1
738+
if null dataRecord
737739
then do
738-
Log.fatal $ "read error on `readerRead`. Expect 1 record but got " <> Log.build (length dataRecord)
739-
<> ", logId " <> Log.build logId <> ", batchId " <> Log.build batchId
740+
Log.info $ "sub resend reader read empty records from log "
741+
<> Log.build logId <> ", batchId " <> Log.build batchId
742+
<> ", retry next time"
740743
else do
741744
(_, _, _, ReceivedRecord{..}) <- decodeRecordBatch $ head dataRecord
742745
let batchRecords@BatchedRecord{..} = fromJust receivedRecordRecord
@@ -753,16 +756,6 @@ sendRecords ServerContext{..} subState subCtx@SubscribeContext {..} = do
753756
resendBatch = mkBatchedRecord batchedRecordCompressionType batchedRecordPublishTime (fromIntegral $ V.length records) records
754757
resendRecords = ReceivedRecord ids (Just resendBatch)
755758
void $ sendReceivedRecords logId batchId resendRecordIds resendRecords True
756-
where
757-
readLoop reader n
758-
| n == 0 = return []
759-
| otherwise = do
760-
res <- S.readerRead reader 1
761-
if null res
762-
then do
763-
Log.warning $ "reader read got empty result, logId " <> Log.build logId <> ", batchId " <> Log.build batchId <> ", retry."
764-
readLoop reader (n - 1)
765-
else return res
766759

767760
filterUnackedRecordIds recordIds ackedRanges windowLowerBound =
768761
flip V.filter recordIds $ \recordId ->

hstream/test/HStream/RunSQLSpec.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ viewSpecAround = provideRunTest setup clean
110110
clean api (source1, source2, viewName, qName1, qName2) = do
111111
runTerminateSql api $ "TERMINATE QUERY " <> qName1 <> ";"
112112
runTerminateSql api $ "TERMINATE QUERY " <> qName2 <> ";"
113+
-- FIXME: wait the query terminated
114+
threadDelay 10000000
113115
runDropSql api $ "DROP VIEW " <> viewName <> " IF EXISTS;"
114116
runDropSql api $ "DROP STREAM " <> source2 <> " IF EXISTS;"
115117
runDropSql api $ "DROP STREAM " <> source1 <> " IF EXISTS;"

0 commit comments

Comments
 (0)