From 0c2d5274f8d64578b9c5cae8f534294d8d0164c4 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Fri, 27 Feb 2026 16:55:22 +0300 Subject: [PATCH 1/2] Update sync for active proposals: votes and proposal updates --- CHANGELOG.md | 3 ++ internal/app.go | 2 +- internal/db/message.go | 19 +++++++- internal/db/proposal.go | 2 +- internal/updates/active_proposals.go | 67 +++++++++++++++++----------- internal/updates/votes.go | 3 +- 6 files changed, 66 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c9a142..3dcc34b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Changed +- Update sync for active proposals: votes and proposal updates + ## [0.6.12] - 2026-02-11 ### Added diff --git a/internal/app.go b/internal/app.go index 5b38c0f..018202e 100644 --- a/internal/app.go +++ b/internal/app.go @@ -262,7 +262,7 @@ func (a *Application) initGrpc() error { func (a *Application) initUpdatesWorkers() error { spacesUpdater := updates.NewSpacesUpdater(a.sdk, a.spacesService) proposals := updates.NewProposalsWorker(a.sdk, a.proposalsService, a.cfg.Snapshot.ProposalsCheckInterval) - activeProposals := updates.NewActiveProposalsWorker(a.sdk, a.proposalsService, a.cfg.Snapshot.ProposalsUpdatesInterval) + activeProposals := updates.NewActiveProposalsWorker(a.sdk, a.proposalsService, a.messagesService, a.cfg.Snapshot.ProposalsUpdatesInterval) spaces := updates.NewSpacesWorker(spacesUpdater, a.spacesService, a.cfg.Snapshot.UnknownSpacesCheckInterval) votes := updates.NewVotesWorker(a.sdk, a.votesService, a.proposalsService, a.messagesService, a.cfg.Snapshot.VotesCheckInterval) messages := updates.NewMessagesWorker(a.sdk, a.messagesService, a.cfg.Snapshot.MessagesCheckInterval) diff --git a/internal/db/message.go b/internal/db/message.go index c4a3c8a..490816a 100644 --- a/internal/db/message.go +++ b/internal/db/message.go @@ -99,6 +99,14 @@ func (r *MessageRepo) FindLatestMCI() (int, error) { } func (r *MessageRepo) FindSpacesWithNewVotes(after time.Time) ([]string, error) { + return r.findSpacesWithUpdates(after, VoteMessage) +} + +func (r *MessageRepo) FindSpacesWithProposalsUpdates(after time.Time) ([]string, error) { + return r.findSpacesWithUpdates(after, ProposalMessage) +} + +func (r *MessageRepo) findSpacesWithUpdates(after time.Time, mt MessageType) ([]string, error) { var dummy Message _ = dummy.Space _ = dummy.Timestamp @@ -108,7 +116,7 @@ func (r *MessageRepo) FindSpacesWithNewVotes(after time.Time) ([]string, error) Select("space"). Distinct(). Table("messages"). - Where("type = @type", sql.Named("type", VoteMessage)). + Where("type = @type", sql.Named("type", mt)). Where("space != ''"). Where("timestamp >= @timestamp", sql.Named("timestamp", after)). Pluck("space", &spaces). @@ -218,6 +226,15 @@ func (s *MessageService) FindSpacesWithNewVotes(after time.Time) ([]string, erro return spaces, err } +func (s *MessageService) FindSpacesWithProposalsUpdates(after time.Time) ([]string, error) { + spaces, err := s.repo.FindSpacesWithProposalsUpdates(after) + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + + return spaces, err +} + func (s *MessageService) FindDeleteProposals(limit, offset int) ([]string, error) { ids, err := s.repo.FindDeleteProposals(limit, offset) if errors.Is(err, gorm.ErrRecordNotFound) { diff --git a/internal/db/proposal.go b/internal/db/proposal.go index fca94b5..79735b2 100644 --- a/internal/db/proposal.go +++ b/internal/db/proposal.go @@ -137,7 +137,7 @@ func (r *ProposalRepo) GetProposalIDsForUpdate(spaces []string, interval time.Du Where("deleted_at is null"). Where(r.conn. Where("to_timestamp((snapshot->'start')::double precision) <= now() and to_timestamp((snapshot->'end')::double precision) >= now()"). - Or("updated_at < to_timestamp((snapshot->'end')::double precision) and to_timestamp((snapshot->'end')::double precision) < now()"), + Or("updated_at < to_timestamp((snapshot->'end')::double precision) AND to_timestamp((snapshot->'end')::double precision) < now() AND to_timestamp((snapshot->'end')::double precision) > now() - interval '7 days'"), ). Order(orderBy). Limit(limit) diff --git a/internal/updates/active_proposals.go b/internal/updates/active_proposals.go index c003aed..cc3ff55 100644 --- a/internal/updates/active_proposals.go +++ b/internal/updates/active_proposals.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "time" "github.com/goverland-labs/snapshot-sdk-go/client" @@ -13,21 +14,25 @@ import ( "github.com/goverland-labs/goverland-datasource-snapshot/internal/db" ) -const gap = 30 * time.Minute +const ( + gapMessages = 1 * time.Hour + gapProposalUpdate = 10 * time.Minute +) type ActiveProposalsWorker struct { sdk *snapshot.SDK proposals *db.ProposalService - checkInterval time.Duration + checkInterval time.Duration + unrpocessedSpaces unprocessedSpacesFinder } -func NewActiveProposalsWorker(sdk *snapshot.SDK, proposals *db.ProposalService, checkInterval time.Duration) *ActiveProposalsWorker { +func NewActiveProposalsWorker(sdk *snapshot.SDK, proposals *db.ProposalService, unrpocessedSpaces unprocessedSpacesFinder, checkInterval time.Duration) *ActiveProposalsWorker { return &ActiveProposalsWorker{ - sdk: sdk, - proposals: proposals, - - checkInterval: checkInterval, + sdk: sdk, + proposals: proposals, + unrpocessedSpaces: unrpocessedSpaces, + checkInterval: checkInterval, } } @@ -51,35 +56,45 @@ func (w *ActiveProposalsWorker) Start(ctx context.Context) error { } func (w *ActiveProposalsWorker) loop(ctx context.Context) error { - ids, err := w.proposals.GetProposalIDsForUpdate(nil, gap, proposalsPerRequest, false) + spaces, err := w.unrpocessedSpaces.FindSpacesWithProposalsUpdates(time.Now().Add(-gapMessages)) if err != nil { - return err + return fmt.Errorf("get paces with new votes: %w", err) } - - if len(ids) == 0 { + if len(spaces) == 0 { return nil } - proposals, err := w.fetchProposalsInternal(ctx, []snapshot.ListProposalOption{ - snapshot.ListProposalWithOrderBy("created", client.OrderDirectionAsc), - snapshot.ListProposalWithIDFilter(ids...), - }) + for { + ids, err := w.proposals.GetProposalIDsForUpdate(spaces, gapProposalUpdate, proposalsPerRequest, false) + if err != nil { + return err + } - if err != nil { - return err - } + if len(ids) == 0 { + break + } - log.Info().Int("requested", len(ids)).Int("fetched", len(proposals)).Msg("updated proposals") + proposals, err := w.fetchProposalsInternal(ctx, []snapshot.ListProposalOption{ + snapshot.ListProposalWithOrderBy("created", client.OrderDirectionAsc), + snapshot.ListProposalWithIDFilter(ids...), + }) - if err := w.processProposals(proposals); err != nil { - return err - } + if err != nil { + return err + } + + log.Info().Int("requested", len(ids)).Int("fetched", len(proposals)).Msg("updated proposals") - forDeletion := findProposalIDsForDeletion(ids, proposals) - log.Info().Int("delete", len(forDeletion)).Msg("deleted proposals") + if err := w.processProposals(proposals); err != nil { + return err + } + + forDeletion := findProposalIDsForDeletion(ids, proposals) + log.Info().Int("delete", len(forDeletion)).Msg("deleted proposals") - if err := w.proposals.Delete(forDeletion); err != nil { - return err + if err := w.proposals.Delete(forDeletion); err != nil { + return err + } } return nil diff --git a/internal/updates/votes.go b/internal/updates/votes.go index 4ad1d49..59d52ce 100644 --- a/internal/updates/votes.go +++ b/internal/updates/votes.go @@ -28,6 +28,7 @@ const ( type unprocessedSpacesFinder interface { FindSpacesWithNewVotes(after time.Time) ([]string, error) + FindSpacesWithProposalsUpdates(after time.Time) ([]string, error) } type VoteWorker struct { @@ -156,7 +157,7 @@ func (w *VoteWorker) loopActive(ctx context.Context) error { return nil } - ids, err := w.proposals.GetProposalIDsForUpdate(spaces, gap, proposalsPerRequest, true) + ids, err := w.proposals.GetProposalIDsForUpdate(spaces, 0, proposalsPerRequest, true) if err != nil { return fmt.Errorf("get proposals for votes: %w", err) } From 46d38013102ce0905dc28cb8d334e9164ac91c0c Mon Sep 17 00:00:00 2001 From: Dmitry Date: Mon, 2 Mar 2026 15:42:03 +0300 Subject: [PATCH 2/2] Rollback getting proposals window --- internal/db/proposal.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/db/proposal.go b/internal/db/proposal.go index 79735b2..fca94b5 100644 --- a/internal/db/proposal.go +++ b/internal/db/proposal.go @@ -137,7 +137,7 @@ func (r *ProposalRepo) GetProposalIDsForUpdate(spaces []string, interval time.Du Where("deleted_at is null"). Where(r.conn. Where("to_timestamp((snapshot->'start')::double precision) <= now() and to_timestamp((snapshot->'end')::double precision) >= now()"). - Or("updated_at < to_timestamp((snapshot->'end')::double precision) AND to_timestamp((snapshot->'end')::double precision) < now() AND to_timestamp((snapshot->'end')::double precision) > now() - interval '7 days'"), + Or("updated_at < to_timestamp((snapshot->'end')::double precision) and to_timestamp((snapshot->'end')::double precision) < now()"), ). Order(orderBy). Limit(limit)