Skip to content

Commit 75a3f7c

Browse files
committed
fix(embedded/store): ensure watchers get notified when indexing is up-to-date
Signed-off-by: Jeronimo Irazabal <[email protected]>
1 parent b47c390 commit 75a3f7c

File tree

1 file changed

+6
-4
lines changed

1 file changed

+6
-4
lines changed

embedded/store/immustore.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -581,9 +581,6 @@ const (
581581
)
582582

583583
func (s *ImmuStore) notify(nType NotificationType, formattedMessage string, args ...interface{}) {
584-
s.mutex.Lock()
585-
defer s.mutex.Unlock()
586-
587584
if time.Since(s.lastNotification) > NotificationWindow {
588585
switch nType {
589586
case Info:
@@ -611,13 +608,16 @@ func (s *ImmuStore) indexer() {
611608
s.wCenter.DoneUpto(s.index.Ts())
612609
}
613610

614-
txsToIndex := s.TxCount() - s.index.Ts()
611+
s.mutex.Lock()
612+
txsToIndex := s.committedTxID - s.index.Ts()
615613

616614
if txsToIndex == 0 {
617615
s.notify(Info, "All transactions successfully indexed at '%s'", s.path)
616+
s.mutex.Unlock()
618617
s.indexCond.Wait()
619618
} else {
620619
s.notify(Info, "%d transaction/s to be indexed at '%s'", txsToIndex, s.path)
620+
s.mutex.Unlock()
621621
}
622622

623623
err := s.indexSince(s.index.Ts()+1, 10)
@@ -626,7 +626,9 @@ func (s *ImmuStore) indexer() {
626626
return
627627
}
628628
if err != nil {
629+
s.mutex.Lock()
629630
s.notify(Error, "Indexing at '%s' was stopped due to error: %v", s.path, err)
631+
s.mutex.Unlock()
630632
s.indexErr = err
631633
s.indexCond.L.Unlock()
632634
return

0 commit comments

Comments
 (0)