Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
masih committed Apr 30, 2024
1 parent b7771db commit 994fe50
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 67 deletions.
4 changes: 2 additions & 2 deletions chain/events/filter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
}

func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (EventFilter, error) {
keysWithCodec map[string][]types.ActorEventBlock, _ bool) (EventFilter, error) {
m.mu.Lock()
if m.currentHeight == 0 {
// sync in progress, we haven't had an Apply
Expand Down Expand Up @@ -402,7 +402,7 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
}

if m.EventIndex != nil && requiresHistoricEvents {
if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil {
if err := m.EventIndex.prefillFilter(ctx, f); err != nil {
return nil, xerrors.Errorf("pre-fill historic events: %w", err)
}
}
Expand Down
34 changes: 15 additions & 19 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ const (
revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?`
restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter=? AND event_index=? AND message_cid=? AND message_index=?`

createIndexEventEmitter = `CREATE INDEX IF NOT EXISTS event_emitter ON event (emitter)`
createIndexEventEmitter = `CREATE INDEX IF NOT EXISTS event_emitter ON event (emitter)`
createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);`
createIndexEventHeight = `CREATE INDEX IF NOT EXISTS event_height ON event (height);`
createIndexEventReverted = `CREATE INDEX IF NOT EXISTS event_reverted ON event (reverted);`
Expand Down Expand Up @@ -295,11 +295,12 @@ func (ei *EventIndex) migrateToVersion3(ctx context.Context) error {
}
defer func() { _ = tx.Rollback() }()

// create index on event.emitter_addr.
_, err = tx.ExecContext(ctx, "CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)")
if err != nil {
return xerrors.Errorf("create index event_emitter_addr: %w", err)
}
// The original work on schema v3 included an index on, then, emitter_addr.
// Successive work to index events by emitter actor ID instead required the index on the column
// to be recreated on a new column called emitter. Therefore, the index creation is done as part of
// migrateToVersion4.
//
// For further context, see: https://github.com/filecoin-project/lotus/pull/11723#discussion_r1526295815

// original v3 migration introduced an index:
// CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key)
Expand Down Expand Up @@ -370,14 +371,11 @@ func (ei *EventIndex) migrateToVersion4(ctx context.Context) error {
return xerrors.Errorf("commit transaction: %w", err)
}

ei.vacuumDBAndCheckpointWAL(ctx)

log.Infof("Successfully migrated event index from version 3 to version 4 in %s", time.Since(now))
return nil
}


// migrateToVersion4 migrates the schema from version 3 to version 4: indexing events by emitter actor ID.
// migrateToVersion5 migrates the schema from version 3 to version 4: indexing events by emitter actor ID.
// This migration replaces the emitter_addr column in event table with a new column called `emitter`, which stores
// the emitter's actor ID.
func (ei *EventIndex) migrateToVersion5(ctx context.Context, chainStore *store.ChainStore) error {
Expand Down Expand Up @@ -505,7 +503,7 @@ func (ei *EventIndex) migrateToVersion5(ctx context.Context, chainStore *store.C
}

// Increment the schema version in _meta table to 4.
if _, err = tx.Exec("INSERT OR IGNORE INTO _meta (version) VALUES (4)"); err != nil {
if _, err = tx.Exec("INSERT OR IGNORE INTO _meta (version) VALUES (5)"); err != nil {
return xerrors.Errorf("increment _meta version: %w", err)
}

Expand All @@ -515,7 +513,7 @@ func (ei *EventIndex) migrateToVersion5(ctx context.Context, chainStore *store.C

ei.vacuumDBAndCheckpointWAL(ctx)

log.Infof("successfully migrated event index from version 3 to version 4 in %s", time.Since(now))
log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now))
return nil
}

Expand Down Expand Up @@ -756,7 +754,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
}

// prefillFilter fills a filter's collection of events from the historic index.
func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, excludeReverted bool) error {
func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter) error {
var (
clauses, joins []string
values []any
Expand Down Expand Up @@ -792,14 +790,12 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude
values = append(values, emitter)
}
clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")")
// Explicitly exclude reverted events, since at least one emitter is present and reverts cannot be considered.
excludeReverted = true
}

if excludeReverted {
clauses = append(clauses, "event.reverted=?")
values = append(values, false)
}
// Always exclude reverted events when prefilling. See:
// - https://github.com/filecoin-project/lotus/issues/11770
clauses = append(clauses, "event.reverted=?")
values = append(values, false)

if len(f.keysWithCodec) > 0 {
join := 0
Expand Down
8 changes: 4 additions & 4 deletions chain/events/filter/index_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ func TestMigration_V3ToV4Sample1(t *testing.T) {
require.NoError(t, err)
tree, err := state.NewStateTree(cst, network)
require.NoError(t, err)
//for _, addr := range addrs {
// _, err = tree.RegisterNewAddress(addr)
// require.NoError(t, err)
//}
for _, addr := range addrs {
_, err = tree.RegisterNewAddress(addr)
require.NoError(t, err)
}
_, err = tree.Flush(ctx)
require.NoError(t, err)

Expand Down
24 changes: 2 additions & 22 deletions chain/events/filter/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
for _, tc := range testCases {
tc := tc // appease lint
t.Run(tc.name, func(t *testing.T) {
if err := ei.prefillFilter(context.Background(), tc.filter, false); err != nil {
if err := ei.prefillFilter(context.Background(), tc.filter); err != nil {
require.NoError(t, err, "prefill filter events")
}

Expand Down Expand Up @@ -681,14 +681,6 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
te: events14000,
want: noCollectedEvents,
},
}

exclusiveTestCases := []struct {
name string
filter *eventFilter
te *TipSetEvents
want []*CollectedEvent
}{
{
name: "nomatch tipset min height",
filter: &eventFilter{
Expand Down Expand Up @@ -929,19 +921,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
for _, tc := range inclusiveTestCases {
tc := tc // appease lint
t.Run(tc.name, func(t *testing.T) {
if err := ei.prefillFilter(context.Background(), tc.filter, false); err != nil {
require.NoError(t, err, "prefill filter events")
}

coll := tc.filter.TakeCollectedEvents(context.Background())
require.ElementsMatch(t, coll, tc.want, tc.name)
})
}

for _, tc := range exclusiveTestCases {
tc := tc // appease lint
t.Run(tc.name, func(t *testing.T) {
if err := ei.prefillFilter(context.Background(), tc.filter, true); err != nil {
if err := ei.prefillFilter(context.Background(), tc.filter); err != nil {
require.NoError(t, err, "prefill filter events")
}

Expand Down
2 changes: 1 addition & 1 deletion chain/state/statetree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestResolveCache(t *testing.T) {
nonId := address.NewForTestGetter()()
id, _ := address.NewIDAddress(1000)

st.LookupIDFun = func(a address.Address) (address.Address, error) {
st.lookupIDFun = func(a address.Address) (address.Address, error) {
if a == nonId {
return id, nil
}
Expand Down
18 changes: 0 additions & 18 deletions chain/stmgr/stmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,21 +587,3 @@ func (sm *StateManager) GetRandomnessDigestFromTickets(ctx context.Context, rand

return r.GetChainRandomness(ctx, randEpoch)
}

func (sm *StateManager) LookupActorID(ctx context.Context, addr address.Address, ts *types.TipSet) (abi.ActorID, error) {
var idAddr address.Address
if addr.Protocol() == address.ID {
idAddr = addr // already an ID address
} else {
var err error
addr, err = sm.LookupID(ctx, addr, ts)
if err != nil {
return 0, xerrors.Errorf("state manager lookup id: %w", err)
}
}
actor, err := address.IDFromAddress(idAddr)
if err != nil {
return 0, xerrors.Errorf("resolve actor id: id from addr: %w", err)
}
return abi.ActorID(actor), nil
}
2 changes: 1 addition & 1 deletion node/modules/actorevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func EventFilterManager(cfg config.EventsConfig) func(helpers.MetricsCtx, repo.L
resolverCacheExpiry = time.Minute
resolverCacheNilTipSet = false
)
actorResolver := filter.NewCachedActorResolver(sm.LookupActorID, resolverCacheSize, resolverCacheExpiry, resolverCacheNilTipSet)
actorResolver := filter.NewCachedActorResolver(sm.LookupID, resolverCacheSize, resolverCacheExpiry, resolverCacheNilTipSet)

// Enable indexing of actor events
var eventIndex *filter.EventIndex
Expand Down

0 comments on commit 994fe50

Please sign in to comment.