-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathmessages.go
More file actions
150 lines (133 loc) · 3.84 KB
/
messages.go
File metadata and controls
150 lines (133 loc) · 3.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package scheduler
import (
"context"
"fmt"
"go.uber.org/fx"
"gorm.io/gorm"
"github.com/storacha/piri/pkg/config/dynamic"
"github.com/storacha/piri/pkg/pdp/chainsched"
"github.com/storacha/piri/pkg/pdp/ethereum"
"github.com/storacha/piri/pkg/pdp/scheduler"
"github.com/storacha/piri/pkg/pdp/service"
"github.com/storacha/piri/pkg/pdp/smartcontracts"
"github.com/storacha/piri/pkg/pdp/tasks"
"github.com/storacha/piri/pkg/wallet"
)
var MessageModule = fx.Module("scheduler-messages",
fx.Provide(
// This setup is required to prevent a circular dependency
// - SenderETH (implements ethereum.Sender) depends on SendTaskETH
// - SendTaskETH is registered as a scheduler task
// - Other tasks (InitProvingPeriodTask, NextProvingPeriodTask, ProveTask) depend on ethereum.Sender (SenderETH)
// - Fx needs all tasks created before building the engine, but can't create tasks that depend on Sender until Sender exists
// and Sender can't exist until SendTaskETH exists. So we make them both together
ProvideSenderETHPair,
fx.Annotate(
ProvideSenderFromPair,
fx.As(new(ethereum.Sender)),
),
fx.Annotate(
ProvideSendTaskFromPair,
fx.ResultTags(`group:"scheduler_tasks"`),
fx.As(new(scheduler.TaskInterface)),
),
),
// NB: these methods are invoked as they do not provide any types in their return or nothing depends on their return
fx.Invoke(
StartWatcherMessageEth,
StartWatcherCreate,
StartWatcherRootAdd,
StartWatcherProviderRegister,
),
)
type SenderETHParams struct {
fx.In
DB *gorm.DB `name:"engine_db"`
Client service.EthClient
Wallet wallet.Wallet
Registry *dynamic.Registry
}
// SenderETHPair holds both the sender and task to ensure they're created together
type SenderETHPair struct {
Sender *tasks.SenderETH
SendTask *tasks.SendTaskETH
}
func ProvideSenderETHPair(params SenderETHParams) (*SenderETHPair, error) {
sender, sendTask, err := tasks.NewSenderETH(params.Client, params.Wallet, params.DB, tasks.WithGasConfig(params.Registry))
return &SenderETHPair{
Sender: sender,
SendTask: sendTask,
}, err
}
func ProvideSenderFromPair(pair *SenderETHPair) *tasks.SenderETH {
return pair.Sender
}
func ProvideSendTaskFromPair(pair *SenderETHPair) *tasks.SendTaskETH {
return pair.SendTask
}
type WatcherMessageEthParams struct {
fx.In
DB *gorm.DB `name:"engine_db"`
Client service.EthClient
Scheduler *chainsched.Scheduler
}
func StartWatcherMessageEth(
lc fx.Lifecycle,
params WatcherMessageEthParams,
) (*tasks.MessageWatcherEth, error) {
ew, err := tasks.NewMessageWatcherEth(params.DB, params.Scheduler, params.Client)
if err != nil {
return nil, fmt.Errorf("creating message watcher: %w", err)
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
ew.Start()
return nil
},
OnStop: func(ctx context.Context) error {
return ew.Stop(ctx)
},
})
return ew, nil
}
type WatcherCreateParams struct {
fx.In
DB *gorm.DB `name:"engine_db"`
Verifier smartcontracts.Verifier
Scheduler *chainsched.Scheduler
ServiceView smartcontracts.Service
}
func StartWatcherCreate(params WatcherCreateParams) error {
return tasks.NewWatcherCreate(
params.DB,
params.Verifier,
params.Scheduler,
params.ServiceView,
)
}
type WatcherRootAddParams struct {
fx.In
DB *gorm.DB `name:"engine_db"`
Verifier smartcontracts.Verifier
Scheduler *chainsched.Scheduler
}
func StartWatcherRootAdd(params WatcherRootAddParams) error {
return tasks.NewWatcherRootAdd(
params.DB,
params.Scheduler,
params.Verifier,
)
}
type WatcherProviderRegisterParams struct {
fx.In
DB *gorm.DB `name:"engine_db"`
Scheduler *chainsched.Scheduler
Registry smartcontracts.Registry
}
func StartWatcherProviderRegister(params WatcherProviderRegisterParams) error {
return tasks.NewWatcherProviderRegister(
params.DB,
params.Scheduler,
params.Registry.Address(),
)
}