Skip to content

Commit dd42f16

Browse files
authored
v2 Retrieval Client (Layr-Labs#953)
1 parent 100eb3e commit dd42f16

File tree

1 file changed

+205
-0
lines changed

1 file changed

+205
-0
lines changed

api/clients/retrieval_client_v2.go

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
package clients
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
grpcnode "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
9+
"github.com/Layr-Labs/eigenda/core"
10+
corev2 "github.com/Layr-Labs/eigenda/core/v2"
11+
"github.com/Layr-Labs/eigenda/encoding"
12+
"github.com/Layr-Labs/eigensdk-go/logging"
13+
"google.golang.org/grpc"
14+
"google.golang.org/grpc/credentials/insecure"
15+
16+
"github.com/gammazero/workerpool"
17+
)
18+
19+
// RetrievalClientV2 is an object that can retrieve blobs from the DA nodes.
20+
// To retrieve a blob from the relay, use RelayClient instead.
21+
type RetrievalClientV2 interface {
22+
// GetBlob downloads chunks of a blob from operator network and reconstructs the blob.
23+
GetBlob(ctx context.Context, blobHeader corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error)
24+
}
25+
26+
type retrievalClientV2 struct {
27+
logger logging.Logger
28+
ethClient core.Reader
29+
indexedChainState core.IndexedChainState
30+
verifier encoding.Verifier
31+
numConnections int
32+
}
33+
34+
// NewRetrievalClientV2 creates a new retrieval client.
35+
func NewRetrievalClientV2(
36+
logger logging.Logger,
37+
ethClient core.Reader,
38+
chainState core.IndexedChainState,
39+
verifier encoding.Verifier,
40+
numConnections int,
41+
) RetrievalClientV2 {
42+
return &retrievalClientV2{
43+
logger: logger.With("component", "RetrievalClient"),
44+
ethClient: ethClient,
45+
indexedChainState: chainState,
46+
verifier: verifier,
47+
numConnections: numConnections,
48+
}
49+
}
50+
51+
func (r *retrievalClientV2) GetBlob(ctx context.Context, blobHeader corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error) {
52+
blobKey, err := blobHeader.BlobKey()
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
commitmentBatch := []encoding.BlobCommitments{blobHeader.BlobCommitments}
58+
err = r.verifier.VerifyCommitEquivalenceBatch(commitmentBatch)
59+
if err != nil {
60+
return nil, err
61+
}
62+
63+
indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, uint(referenceBlockNumber), []core.QuorumID{quorumID})
64+
if err != nil {
65+
return nil, err
66+
}
67+
operators, ok := indexedOperatorState.Operators[quorumID]
68+
if !ok {
69+
return nil, fmt.Errorf("no quorum with ID: %d", quorumID)
70+
}
71+
72+
blobVersions, err := r.ethClient.GetAllVersionedBlobParams(ctx)
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
blobParam, ok := blobVersions[blobHeader.BlobVersion]
78+
if !ok {
79+
return nil, fmt.Errorf("invalid blob version %d", blobHeader.BlobVersion)
80+
}
81+
82+
encodingParams, err := blobHeader.GetEncodingParams(blobParam)
83+
if err != nil {
84+
return nil, err
85+
}
86+
87+
assignments, err := corev2.GetAssignments(indexedOperatorState.OperatorState, blobParam, quorumID)
88+
if err != nil {
89+
return nil, errors.New("failed to get assignments")
90+
}
91+
92+
// Fetch chunks from all operators
93+
chunksChan := make(chan RetrievedChunks, len(operators))
94+
pool := workerpool.New(r.numConnections)
95+
for opID := range operators {
96+
opID := opID
97+
opInfo := indexedOperatorState.IndexedOperators[opID]
98+
pool.Submit(func() {
99+
r.getChunksFromOperator(ctx, opID, opInfo, blobKey, quorumID, chunksChan)
100+
})
101+
}
102+
103+
var chunks []*encoding.Frame
104+
var indices []encoding.ChunkNumber
105+
// TODO(ian-shim): if we gathered enough chunks, cancel remaining RPC calls
106+
for i := 0; i < len(operators); i++ {
107+
reply := <-chunksChan
108+
if reply.Err != nil {
109+
r.logger.Error("failed to get chunks from operator", "operator", reply.OperatorID.Hex(), "err", reply.Err)
110+
continue
111+
}
112+
assignment, ok := assignments[reply.OperatorID]
113+
if !ok {
114+
return nil, fmt.Errorf("no assignment to operator %s", reply.OperatorID.Hex())
115+
}
116+
117+
assignmentIndices := make([]uint, len(assignment.GetIndices()))
118+
for i, index := range assignment.GetIndices() {
119+
assignmentIndices[i] = uint(index)
120+
}
121+
122+
err = r.verifier.VerifyFrames(reply.Chunks, assignmentIndices, blobHeader.BlobCommitments, encodingParams)
123+
if err != nil {
124+
r.logger.Error("failed to verify chunks from operator", "operator", reply.OperatorID.Hex(), "err", err)
125+
continue
126+
} else {
127+
r.logger.Info("verified chunks from operator", "operator", reply.OperatorID.Hex())
128+
}
129+
130+
chunks = append(chunks, reply.Chunks...)
131+
indices = append(indices, assignmentIndices...)
132+
}
133+
134+
return r.verifier.Decode(
135+
chunks,
136+
indices,
137+
encodingParams,
138+
uint64(blobHeader.BlobCommitments.Length)*encoding.BYTES_PER_SYMBOL,
139+
)
140+
}
141+
142+
func (r *retrievalClientV2) getChunksFromOperator(
143+
ctx context.Context,
144+
opID core.OperatorID,
145+
opInfo *core.IndexedOperatorInfo,
146+
blobKey corev2.BlobKey,
147+
quorumID core.QuorumID,
148+
chunksChan chan RetrievedChunks,
149+
) {
150+
conn, err := grpc.NewClient(
151+
core.OperatorSocket(opInfo.Socket).GetRetrievalSocket(),
152+
grpc.WithTransportCredentials(insecure.NewCredentials()),
153+
)
154+
defer func() {
155+
err := conn.Close()
156+
if err != nil {
157+
r.logger.Error("failed to close connection", "err", err)
158+
}
159+
}()
160+
if err != nil {
161+
chunksChan <- RetrievedChunks{
162+
OperatorID: opID,
163+
Err: err,
164+
Chunks: nil,
165+
}
166+
return
167+
}
168+
169+
n := grpcnode.NewRetrievalClient(conn)
170+
request := &grpcnode.GetChunksRequest{
171+
BlobKey: blobKey[:],
172+
QuorumId: uint32(quorumID),
173+
}
174+
175+
reply, err := n.GetChunks(ctx, request)
176+
if err != nil {
177+
chunksChan <- RetrievedChunks{
178+
OperatorID: opID,
179+
Err: err,
180+
Chunks: nil,
181+
}
182+
return
183+
}
184+
185+
chunks := make([]*encoding.Frame, len(reply.GetChunks()))
186+
for i, data := range reply.GetChunks() {
187+
var chunk *encoding.Frame
188+
chunk, err = new(encoding.Frame).DeserializeGnark(data)
189+
if err != nil {
190+
chunksChan <- RetrievedChunks{
191+
OperatorID: opID,
192+
Err: err,
193+
Chunks: nil,
194+
}
195+
return
196+
}
197+
198+
chunks[i] = chunk
199+
}
200+
chunksChan <- RetrievedChunks{
201+
OperatorID: opID,
202+
Err: nil,
203+
Chunks: chunks,
204+
}
205+
}

0 commit comments

Comments
 (0)