Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.

Commit ed791bb

Browse files
authored
Merge pull request #36 from PowerLoom/feat/migrate-to-onchain-arch
Snapshotter status report structure update and support for http IPFS URL
2 parents ec9d401 + ca35483 commit ed791bb

File tree

13 files changed

+316
-83
lines changed

13 files changed

+316
-83
lines changed

go/caching/caching.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type DbCache interface {
1515
CheckIfProjectExists(ctx context.Context, projectID string) (bool, error)
1616
StoreProjects(background context.Context, projects []string) error
1717
AddUnfinalizedSnapshotCID(ctx context.Context, msg *datamodel.PayloadCommitMessage) error
18-
AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport) error
18+
AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport, incrCount bool) error
1919
StoreLastFinalizedEpoch(ctx context.Context, projectID string, epochId int) error
2020
StoreFinalizedSnapshot(ctx context.Context, msg *datamodel.PowerloomSnapshotFinalizedMessage) error
2121
GetFinalizedSnapshotAtEpochID(ctx context.Context, projectID string, epochId int) (*datamodel.PowerloomSnapshotFinalizedMessage, error)

go/caching/redis.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,29 @@ func (r *RedisCache) AddUnfinalizedSnapshotCID(ctx context.Context, msg *datamod
194194
}
195195

196196
// AddSnapshotterStatusReport adds the snapshotter's status report to the given project and epoch ID.
197-
func (r *RedisCache) AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport) error {
197+
func (r *RedisCache) AddSnapshotterStatusReport(ctx context.Context, epochId int, projectId string, report *datamodel.SnapshotterStatusReport, incrCount bool) error {
198198
key := fmt.Sprintf(redisutils.REDIS_KEY_SNAPSHOTTER_STATUS_REPORT, projectId)
199199

200+
storedReport := new(datamodel.SnapshotterStatusReport)
201+
202+
reportJsonString, err := r.readClient.HGet(ctx, key, strconv.Itoa(epochId)).Result()
203+
if err == nil || reportJsonString != "" {
204+
_ = json.Unmarshal([]byte(reportJsonString), storedReport)
205+
}
206+
200207
if report != nil {
208+
if storedReport.SubmittedSnapshotCid != "" {
209+
report.SubmittedSnapshotCid = storedReport.SubmittedSnapshotCid
210+
}
211+
212+
if storedReport.Reason != "" {
213+
report.Reason = storedReport.Reason
214+
}
215+
216+
if storedReport.FinalizedSnapshotCid != "" {
217+
report.FinalizedSnapshotCid = storedReport.FinalizedSnapshotCid
218+
}
219+
201220
reportJson, err := json.Marshal(report)
202221
if err != nil {
203222
log.WithError(err).Error("failed to marshal snapshotter status report")
@@ -222,9 +241,11 @@ func (r *RedisCache) AddSnapshotterStatusReport(ctx context.Context, epochId int
222241
key = fmt.Sprintf(redisutils.REDIS_KEY_TOTAL_SUCCESSFUL_SNAPSHOT_COUNT, projectId)
223242
}
224243

225-
err := r.writeClient.Incr(ctx, key).Err()
226-
if err != nil {
227-
log.WithError(err).Error("failed to increment total missed snapshot count")
244+
if incrCount {
245+
err = r.writeClient.Incr(ctx, key).Err()
246+
if err != nil {
247+
log.WithError(err).Error("failed to increment total missed snapshot count")
248+
}
228249
}
229250

230251
log.Debug("added snapshotter status report in redis")

go/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/ethereum/go-ethereum v1.11.6
88
github.com/go-playground/validator/v10 v10.14.0
99
github.com/go-redis/redis/v8 v8.11.5
10+
github.com/google/uuid v1.3.0
1011
github.com/hashicorp/go-retryablehttp v0.7.2
1112
github.com/ipfs/go-cid v0.4.1
1213
github.com/ipfs/go-ipfs-api v0.6.0
@@ -35,7 +36,6 @@ require (
3536
github.com/go-playground/locales v0.14.1 // indirect
3637
github.com/go-playground/universal-translator v0.18.1 // indirect
3738
github.com/go-stack/stack v1.8.1 // indirect
38-
github.com/google/uuid v1.3.0 // indirect
3939
github.com/gorilla/websocket v1.4.2 // indirect
4040
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
4141
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c // indirect

go/goutils/datamodel/data_model.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,12 @@ type SnapshotRelayerPayload struct {
6868
}
6969

7070
type SnapshotterStatusReport struct {
71-
SubmittedSnapshotCid string `json:"submittedSnapshotCid"`
71+
SubmittedSnapshotCid string `json:"submittedSnapshotCid,omitempty"`
72+
SubmittedSnapshot map[string]interface{} `json:"submittedSnapshot,omitempty"`
7273
FinalizedSnapshotCid string `json:"finalizedSnapshotCid"`
74+
FinalizedSnapshot map[string]interface{} `json:"finalizedSnapshot,omitempty"`
7375
State SnapshotSubmissionState `json:"state"`
76+
Reason string `json:"reason"`
7477
}
7578

7679
type UnfinalizedSnapshot struct {
@@ -87,3 +90,11 @@ type SnapshotterIssue struct {
8790
TimeOfReporting string `json:"timeOfReporting"`
8891
Extra string `json:"extra"`
8992
}
93+
94+
type SnapshotSubmittedEventMessage struct {
95+
SnapshotCid string `json:"snapshotCid"`
96+
EpochId int `json:"epochId"`
97+
ProjectId string `json:"projectId"`
98+
BroadcastId string `json:"broadcastId"`
99+
Timestamp int64 `json:"timestamp"`
100+
}

go/goutils/ipfsutils/ipfs_utils.go

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@ import (
44
"bytes"
55
"context"
66
"encoding/json"
7+
"errors"
78
"fmt"
89
"net"
10+
"net/url"
11+
"strings"
912
"time"
1013

1114
shell "github.com/ipfs/go-ipfs-api"
@@ -30,16 +33,16 @@ type IpfsClient struct {
3033
func InitClient(settingsObj *settings.SettingsObj) *IpfsClient {
3134
writeUrl := settingsObj.IpfsConfig.URL
3235

33-
writeUrl, err := ParseMultiAddrURL(writeUrl)
36+
writeUrl, err := ParseURL(writeUrl)
3437
if err != nil {
35-
log.WithError(err).Fatal("failed to parse IPFS write multiaddr URL: ", writeUrl)
38+
log.WithError(err).Fatal("failed to parse IPFS write URL: ", writeUrl)
3639
}
3740

3841
readUrl := settingsObj.IpfsConfig.ReaderURL
3942

40-
readUrl, err = ParseMultiAddrURL(readUrl)
43+
readUrl, err = ParseURL(readUrl)
4144
if err != nil {
42-
log.WithError(err).Fatal("failed to parse IPFS read multiaddr URL: ", readUrl)
45+
log.WithError(err).Fatal("failed to parse IPFS read URL: ", readUrl)
4346
}
4447

4548
ipfsReadHTTPClient := httpclient.GetIPFSReadHTTPClient(settingsObj)
@@ -127,27 +130,28 @@ func (e *UnsupportedMultiaddrError) Error() string {
127130
return fmt.Sprintf("unsupported multiaddr url pattern: %s", e.URL)
128131
}
129132

130-
func ParseMultiAddrURL(url string) (string, error) {
133+
// ParseURL tries to parse a multiaddr URL, if the url is not multiaddr it tries to parse http url.
134+
func ParseURL(ipfsUrl string) (string, error) {
131135
parts := make([]string, 0) // [host,port,scheme]
132136

133-
if multiaddr, err := ma.NewMultiaddr(url); err == nil {
137+
if multiaddr, err := ma.NewMultiaddr(ipfsUrl); err == nil {
134138
addrSplits := ma.Split(multiaddr)
135139

136140
// host and port are required
137141
if len(addrSplits) < 2 {
138-
return "", &UnsupportedMultiaddrError{URL: url}
142+
return "", &UnsupportedMultiaddrError{URL: ipfsUrl}
139143
}
140144

141145
for index, addr := range addrSplits {
142146
component, _ := ma.SplitFirst(addr)
143147
if index == 1 && component.Protocol().Code != ma.P_TCP {
144-
return "", &UnsupportedMultiaddrError{URL: url}
148+
return "", &UnsupportedMultiaddrError{URL: ipfsUrl}
145149
}
146150

147151
// check if scheme is present
148152
if index == 2 {
149153
if component.Protocol().Code != ma.P_HTTP && component.Protocol().Code != ma.P_HTTPS {
150-
return "", &UnsupportedMultiaddrError{URL: url}
154+
return "", &UnsupportedMultiaddrError{URL: ipfsUrl}
151155
}
152156

153157
parts = append(parts, component.Protocol().Name)
@@ -159,23 +163,37 @@ func ParseMultiAddrURL(url string) (string, error) {
159163
}
160164

161165
if len(parts) < 2 {
162-
return "", &UnsupportedMultiaddrError{URL: url}
166+
return "", &UnsupportedMultiaddrError{URL: ipfsUrl}
163167
}
164168

165169
// join host and port
166-
url = net.JoinHostPort(parts[0], parts[1])
170+
ipfsUrl = net.JoinHostPort(parts[0], parts[1])
167171

168172
// add scheme if present
169173
if len(parts) >= 3 {
170-
url = fmt.Sprintf("%s://%s", parts[2], url)
174+
ipfsUrl = fmt.Sprintf("%s://%s", parts[2], ipfsUrl)
171175
} else {
172-
url = fmt.Sprintf("http://%s", url) // default to http if scheme is not present
176+
ipfsUrl = fmt.Sprintf("http://%s", ipfsUrl) // default to http if scheme is not present
173177
}
174178
} else {
175-
return "", err
179+
// parse http url
180+
parsedURL, err := url.ParseRequestURI(ipfsUrl)
181+
if err != nil {
182+
return "", err
183+
}
184+
185+
// check if scheme is http or https
186+
if !strings.EqualFold(parsedURL.Scheme, "http") && !strings.EqualFold(parsedURL.Scheme, "https") {
187+
return "", fmt.Errorf("unsupported scheme: %s", parsedURL.Scheme)
188+
}
189+
190+
// check if host is present
191+
if parsedURL.Host == "" {
192+
return "", errors.New("host is required in url")
193+
}
176194
}
177195

178-
return url, nil
196+
return ipfsUrl, nil
179197
}
180198

181199
func (client *IpfsClient) UploadSnapshotToIPFS(payloadCommit *datamodel.PayloadCommitMessage) error {

go/goutils/settings/settings.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,16 @@ type (
3838
Core struct {
3939
Exchange string `json:"exchange"`
4040
CommitPayloadExchange string `json:"commit_payload_exchange"`
41+
EventDetectorExchange string `json:"event_detector_exchange"`
4142
} `json:"core"`
4243
PayloadCommit struct {
4344
QueueNamePrefix string `json:"queue_name_prefix"`
4445
RoutingKeyPrefix string `json:"routing_key_prefix"`
4546
} `json:"payload_commit"`
47+
EventDetector struct {
48+
QueueNamePrefix string `json:"queue_name_prefix"`
49+
RoutingKeyPrefix string `json:"routing_key_prefix"`
50+
} `json:"event_detector"`
4651
} `json:"setup"`
4752
}
4853

0 commit comments

Comments
 (0)