Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Changed
- Update sync for active proposals: votes and proposal updates

## [0.6.12] - 2026-02-11

### Added
Expand Down
2 changes: 1 addition & 1 deletion internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (a *Application) initGrpc() error {
func (a *Application) initUpdatesWorkers() error {
spacesUpdater := updates.NewSpacesUpdater(a.sdk, a.spacesService)
proposals := updates.NewProposalsWorker(a.sdk, a.proposalsService, a.cfg.Snapshot.ProposalsCheckInterval)
activeProposals := updates.NewActiveProposalsWorker(a.sdk, a.proposalsService, a.cfg.Snapshot.ProposalsUpdatesInterval)
activeProposals := updates.NewActiveProposalsWorker(a.sdk, a.proposalsService, a.messagesService, a.cfg.Snapshot.ProposalsUpdatesInterval)
spaces := updates.NewSpacesWorker(spacesUpdater, a.spacesService, a.cfg.Snapshot.UnknownSpacesCheckInterval)
votes := updates.NewVotesWorker(a.sdk, a.votesService, a.proposalsService, a.messagesService, a.cfg.Snapshot.VotesCheckInterval)
messages := updates.NewMessagesWorker(a.sdk, a.messagesService, a.cfg.Snapshot.MessagesCheckInterval)
Expand Down
19 changes: 18 additions & 1 deletion internal/db/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ func (r *MessageRepo) FindLatestMCI() (int, error) {
}

func (r *MessageRepo) FindSpacesWithNewVotes(after time.Time) ([]string, error) {
return r.findSpacesWithUpdates(after, VoteMessage)
}

func (r *MessageRepo) FindSpacesWithProposalsUpdates(after time.Time) ([]string, error) {
return r.findSpacesWithUpdates(after, ProposalMessage)
}

func (r *MessageRepo) findSpacesWithUpdates(after time.Time, mt MessageType) ([]string, error) {
var dummy Message
_ = dummy.Space
_ = dummy.Timestamp
Expand All @@ -108,7 +116,7 @@ func (r *MessageRepo) FindSpacesWithNewVotes(after time.Time) ([]string, error)
Select("space").
Distinct().
Table("messages").
Where("type = @type", sql.Named("type", VoteMessage)).
Where("type = @type", sql.Named("type", mt)).
Where("space != ''").
Where("timestamp >= @timestamp", sql.Named("timestamp", after)).
Pluck("space", &spaces).
Expand Down Expand Up @@ -218,6 +226,15 @@ func (s *MessageService) FindSpacesWithNewVotes(after time.Time) ([]string, erro
return spaces, err
}

func (s *MessageService) FindSpacesWithProposalsUpdates(after time.Time) ([]string, error) {
spaces, err := s.repo.FindSpacesWithProposalsUpdates(after)
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}

return spaces, err
}

func (s *MessageService) FindDeleteProposals(limit, offset int) ([]string, error) {
ids, err := s.repo.FindDeleteProposals(limit, offset)
if errors.Is(err, gorm.ErrRecordNotFound) {
Expand Down
67 changes: 41 additions & 26 deletions internal/updates/active_proposals.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/goverland-labs/snapshot-sdk-go/client"
Expand All @@ -13,21 +14,25 @@ import (
"github.com/goverland-labs/goverland-datasource-snapshot/internal/db"
)

const gap = 30 * time.Minute
const (
gapMessages = 1 * time.Hour
gapProposalUpdate = 10 * time.Minute
)

type ActiveProposalsWorker struct {
sdk *snapshot.SDK
proposals *db.ProposalService

checkInterval time.Duration
checkInterval time.Duration
unrpocessedSpaces unprocessedSpacesFinder
}

func NewActiveProposalsWorker(sdk *snapshot.SDK, proposals *db.ProposalService, checkInterval time.Duration) *ActiveProposalsWorker {
func NewActiveProposalsWorker(sdk *snapshot.SDK, proposals *db.ProposalService, unrpocessedSpaces unprocessedSpacesFinder, checkInterval time.Duration) *ActiveProposalsWorker {
return &ActiveProposalsWorker{
sdk: sdk,
proposals: proposals,

checkInterval: checkInterval,
sdk: sdk,
proposals: proposals,
unrpocessedSpaces: unrpocessedSpaces,
checkInterval: checkInterval,
Comment on lines +26 to +35
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field/parameter name unrpocessedSpaces appears to be a misspelling (likely unprocessedSpaces). Since this worker just introduced this dependency, consider correcting the identifier now to avoid propagating the typo across the codebase (and keep it consistent with unprocessedSpacesFinder).

Copilot uses AI. Check for mistakes.
}
}

Expand All @@ -51,35 +56,45 @@ func (w *ActiveProposalsWorker) Start(ctx context.Context) error {
}

func (w *ActiveProposalsWorker) loop(ctx context.Context) error {
ids, err := w.proposals.GetProposalIDsForUpdate(nil, gap, proposalsPerRequest, false)
spaces, err := w.unrpocessedSpaces.FindSpacesWithProposalsUpdates(time.Now().Add(-gapMessages))
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using time.Now().Add(-gapMessages) as a fixed lookback window means proposal updates can be silently missed if this worker is down longer than gapMessages (e.g., restart/outage). Consider tracking a persisted watermark (last processed message timestamp/MCI) or otherwise ensuring the query covers all unprocessed updates.

Suggested change
spaces, err := w.unrpocessedSpaces.FindSpacesWithProposalsUpdates(time.Now().Add(-gapMessages))
spaces, err := w.unrpocessedSpaces.FindSpacesWithProposalsUpdates(time.Time{})

Copilot uses AI. Check for mistakes.
if err != nil {
return err
return fmt.Errorf("get paces with new votes: %w", err)
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message text is misleading and contains typos: this path is fetching spaces with proposal updates, but the message says "paces" and "new votes". Update the message so logs/errors accurately describe the failure source.

Suggested change
return fmt.Errorf("get paces with new votes: %w", err)
return fmt.Errorf("get spaces with proposal updates: %w", err)

Copilot uses AI. Check for mistakes.
}

if len(ids) == 0 {
if len(spaces) == 0 {
return nil
}

proposals, err := w.fetchProposalsInternal(ctx, []snapshot.ListProposalOption{
snapshot.ListProposalWithOrderBy("created", client.OrderDirectionAsc),
snapshot.ListProposalWithIDFilter(ids...),
})
for {
ids, err := w.proposals.GetProposalIDsForUpdate(spaces, gapProposalUpdate, proposalsPerRequest, false)
if err != nil {
return err
}

if err != nil {
return err
}
if len(ids) == 0 {
break
}

log.Info().Int("requested", len(ids)).Int("fetched", len(proposals)).Msg("updated proposals")
proposals, err := w.fetchProposalsInternal(ctx, []snapshot.ListProposalOption{
snapshot.ListProposalWithOrderBy("created", client.OrderDirectionAsc),
snapshot.ListProposalWithIDFilter(ids...),
})

if err := w.processProposals(proposals); err != nil {
return err
}
if err != nil {
return err
}

log.Info().Int("requested", len(ids)).Int("fetched", len(proposals)).Msg("updated proposals")

forDeletion := findProposalIDsForDeletion(ids, proposals)
log.Info().Int("delete", len(forDeletion)).Msg("deleted proposals")
if err := w.processProposals(proposals); err != nil {
return err
}

forDeletion := findProposalIDsForDeletion(ids, proposals)
log.Info().Int("delete", len(forDeletion)).Msg("deleted proposals")

if err := w.proposals.Delete(forDeletion); err != nil {
return err
if err := w.proposals.Delete(forDeletion); err != nil {
return err
}
}

return nil
Expand Down
3 changes: 2 additions & 1 deletion internal/updates/votes.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (

type unprocessedSpacesFinder interface {
FindSpacesWithNewVotes(after time.Time) ([]string, error)
FindSpacesWithProposalsUpdates(after time.Time) ([]string, error)
}

type VoteWorker struct {
Expand Down Expand Up @@ -156,7 +157,7 @@ func (w *VoteWorker) loopActive(ctx context.Context) error {
return nil
}

ids, err := w.proposals.GetProposalIDsForUpdate(spaces, gap, proposalsPerRequest, true)
ids, err := w.proposals.GetProposalIDsForUpdate(spaces, 0, proposalsPerRequest, true)
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing interval=0 to GetProposalIDsForUpdate makes the updated_at < now() filter effectively match almost all proposals in the selected spaces, which can cause this loop to select random proposals and issue many unnecessary Snapshot API calls (often returning no new votes). Use a non-zero interval (or a dedicated constant for active-vote sync) so only stale proposals are revisited.

Suggested change
ids, err := w.proposals.GetProposalIDsForUpdate(spaces, 0, proposalsPerRequest, true)
ids, err := w.proposals.GetProposalIDsForUpdate(spaces, votesCreatedAtGap, proposalsPerRequest, true)

Copilot uses AI. Check for mistakes.
if err != nil {
return fmt.Errorf("get proposals for votes: %w", err)
}
Expand Down
Loading