Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions aggregator/aggregator.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,8 @@ pyroscope_url = "http://pyroscope:4040"
address = ":50051"

[storage]
type = "dynamodb"

[storage.dynamoDB]
region = "us-east-1"
endpoint = "http://aggregator-dynamodb:8000" # Local DynamoDB endpoint for development
commitVerificationRecordTableName = "commit_verification_records"
finalizedFeedTableName = "aggregated_reports"
checkpointTableName = "checkpoint_records"
type = "postgres"
connectionURL = "postgres://aggregator:aggregator@aggregator-db:5432/aggregator?sslmode=disable"

[apiKeys]
[apiKeys.clients.dev-api-key-verifier-1]
Expand Down Expand Up @@ -62,7 +56,7 @@ pyroscope_url = "http://pyroscope:4040"

# Default rate limits (applied to all callers unless overridden)
[rateLimiting.defaultLimits]
"/chainlink_ccv.v1.CCVData/GetMessagesSince" = { limit_per_minute = 90 }
"/chainlink_ccv.v1.CCVData/GetMessagesSince" = { limit_per_minute = 1200 }

# Group-based rate limits (more restrictive than defaults for specific groups)
[rateLimiting.groupLimits.verifiers]
Expand Down
92 changes: 92 additions & 0 deletions aggregator/migrations/postgres/00001_create_all_tables.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
-- +goose Up
CREATE SEQUENCE IF NOT EXISTS commit_verification_records_seq_num_seq;
CREATE SEQUENCE IF NOT EXISTS commit_aggregated_reports_seq_num_seq;
CREATE TABLE IF NOT EXISTS commit_verification_records (
id BIGSERIAL PRIMARY KEY,
seq_num BIGINT NOT NULL DEFAULT nextval('commit_verification_records_seq_num_seq'),
message_id TEXT NOT NULL,
committee_id TEXT NOT NULL,
participant_id TEXT NOT NULL DEFAULT '',
signer_address TEXT NOT NULL,
source_chain_selector TEXT NOT NULL,
dest_chain_selector TEXT NOT NULL,
onramp_address TEXT NOT NULL,
offramp_address TEXT NOT NULL,
signature_r BYTEA NOT NULL DEFAULT '',
signature_s BYTEA NOT NULL DEFAULT '',
ccv_node_data BYTEA NOT NULL,
verification_timestamp BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT unique_verification UNIQUE (message_id, committee_id, signer_address, verification_timestamp)
);

CREATE TABLE IF NOT EXISTS commit_aggregated_reports (
id BIGSERIAL PRIMARY KEY,
seq_num BIGINT NOT NULL DEFAULT nextval('commit_aggregated_reports_seq_num_seq'),
message_id TEXT NOT NULL,
committee_id TEXT NOT NULL,
verification_record_ids BIGINT[] NOT NULL,
report_data BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT unique_aggregated_report_sequence UNIQUE (message_id, committee_id, verification_record_ids)
);

CREATE TABLE IF NOT EXISTS block_checkpoints (
id BIGSERIAL PRIMARY KEY,
client_id TEXT NOT NULL,
chain_selector TEXT NOT NULL,
finalized_block_height TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

CONSTRAINT unique_client_chain UNIQUE (client_id, chain_selector)
);

-- +goose StatementBegin
CREATE OR REPLACE FUNCTION trigger_set_timestamp()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- +goose StatementEnd

CREATE TRIGGER set_timestamp_block_checkpoints
BEFORE UPDATE ON block_checkpoints
FOR EACH ROW
EXECUTE FUNCTION trigger_set_timestamp();

-- Used by all "latest record" queries
CREATE INDEX IF NOT EXISTS idx_verification_latest ON commit_verification_records(message_id, committee_id, signer_address, seq_num DESC);
-- Used by batchGetVerificationRecords
CREATE INDEX IF NOT EXISTS idx_verification_by_id ON commit_verification_records(id);
-- Used by GetCCVData and aggregated report queries
CREATE INDEX IF NOT EXISTS idx_aggregated_latest ON commit_aggregated_reports(message_id, committee_id, seq_num DESC);
-- Used by QueryAggregatedReports with time range
CREATE INDEX IF NOT EXISTS idx_aggregated_reports_time_query ON commit_aggregated_reports(committee_id, created_at, message_id, seq_num DESC);

-- Used by Checkpoint APIs
CREATE INDEX IF NOT EXISTS idx_block_checkpoints_client_id ON block_checkpoints(client_id);
CREATE INDEX IF NOT EXISTS idx_block_checkpoints_chain_selector ON block_checkpoints(chain_selector);
CREATE INDEX IF NOT EXISTS idx_block_checkpoints_updated_at ON block_checkpoints(updated_at);



-- +goose Down
DROP INDEX IF EXISTS idx_block_checkpoints_updated_at;
DROP INDEX IF EXISTS idx_block_checkpoints_chain_selector;
DROP INDEX IF EXISTS idx_block_checkpoints_client_id;
DROP INDEX IF EXISTS idx_aggregated_latest;
DROP INDEX IF EXISTS idx_verification_by_id;
DROP INDEX IF EXISTS idx_verification_latest;
DROP INDEX IF EXISTS idx_aggregated_reports_time_query;

DROP TABLE IF EXISTS block_checkpoints;
DROP TABLE IF EXISTS commit_aggregated_reports;
DROP TABLE IF EXISTS commit_verification_records;

DROP SEQUENCE IF EXISTS commit_verification_records_seq_num_seq;
DROP SEQUENCE IF EXISTS commit_aggregated_reports_seq_num_seq;
DROP SEQUENCE IF EXISTS commit_verification_records_seq_num_seq;
DROP SEQUENCE IF EXISTS commit_aggregated_reports_seq_num_seq;
25 changes: 2 additions & 23 deletions aggregator/pkg/aggregation/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package aggregation

import (
"context"
"math"
"time"

"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/common"
Expand Down Expand Up @@ -54,18 +53,6 @@ func (c *CommitReportAggregator) metrics(ctx context.Context) common.AggregatorM
return scope.AugmentMetrics(ctx, c.monitoring.Metrics())
}

func normalizeTimestampToSeconds(timestamp int64) int64 {
if timestamp <= 0 {
return timestamp
}
digits := int(math.Log10(float64(timestamp))) + 1
if digits > 10 {
divisor := int64(math.Pow10(digits - 10))
return timestamp / divisor
}
return timestamp
}

func deduplicateVerificationsByParticipant(verifications []*model.CommitVerificationRecord) []*model.CommitVerificationRecord {
if len(verifications) <= 1 {
return verifications
Expand Down Expand Up @@ -111,22 +98,14 @@ func (c *CommitReportAggregator) checkAggregationAndSubmitComplete(ctx context.C
lggr.Infow("Deduplicated verifications", "original", len(verifications), "deduplicated", len(dedupedVerifications))
}

var mostRecentTimestamp int64
for _, verification := range dedupedVerifications {
verificationTimestamp := normalizeTimestampToSeconds(verification.GetTimestamp())
lggr.Debugw("Processing verification", "rawTimestamp", verification.GetTimestamp(), "normalizedTimestamp", verificationTimestamp)
if verificationTimestamp > mostRecentTimestamp {
mostRecentTimestamp = verificationTimestamp
}
}

aggregatedReport := &model.CommitAggregatedReport{
MessageID: messageID,
CommitteeID: committeeID,
Verifications: dedupedVerifications,
Timestamp: mostRecentTimestamp,
}

mostRecentTimestamp := aggregatedReport.GetMostRecentVerificationTimestamp()

lggr.Debugw("Aggregated report created", "timestamp", mostRecentTimestamp, "verificationCount", len(dedupedVerifications))

quorumMet, err := c.quorum.CheckQuorum(ctx, aggregatedReport)
Expand Down
2 changes: 1 addition & 1 deletion aggregator/pkg/common/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type CommitVerificationStore interface {

type CommitVerificationAggregatedStore interface {
// QueryAggregatedReports retrieves all aggregated reports within a specific time range.
QueryAggregatedReports(ctx context.Context, start, end int64, committeeID string, token *string) (*model.PaginatedAggregatedReports, error)
QueryAggregatedReports(ctx context.Context, start int64, committeeID string, token *string) (*model.PaginatedAggregatedReports, error)
// GetCCVData retrieves the aggregated CCV data for a specific message ID.
GetCCVData(ctx context.Context, messageID model.MessageID, committeeID string) (*model.CommitAggregatedReport, error)
}
Expand Down
4 changes: 2 additions & 2 deletions aggregator/pkg/handlers/get_ccv_data_for_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ func (h *GetCCVDataForMessageHandler) logger(ctx context.Context) logger.Sugared
}

// Handle processes the get request and retrieves the commit verification data.
func (h *GetCCVDataForMessageHandler) Handle(ctx context.Context, req *pb.GetCCVDataForMessageRequest) (*pb.MessageWithCCVData, error) {
func (h *GetCCVDataForMessageHandler) Handle(ctx context.Context, req *pb.GetVerifierResultForMessageRequest) (*pb.VerifierResult, error) {
committeeID := LoadCommitteeIDFromContext(ctx)
ctx = scope.WithMessageID(ctx, req.MessageId)
ctx = scope.WithCommitteeID(ctx, committeeID)
h.logger(ctx).Infof("Received GetCCVDataForMessageRequest")
h.logger(ctx).Infof("Received GetVerifierResultForMessageRequest")

data, err := h.storage.GetCCVData(ctx, req.MessageId, committeeID)
if err != nil {
Expand Down
26 changes: 7 additions & 19 deletions aggregator/pkg/handlers/get_messages_since.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,8 @@ package handlers

import (
"context"
"fmt"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/auth"
"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/common"
"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/model"
"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/scope"
Expand All @@ -31,26 +26,15 @@ func (h *GetMessagesSinceHandler) logger(ctx context.Context) logger.SugaredLogg

// Handle processes the get request and retrieves the commit verification data since the specified time.
func (h *GetMessagesSinceHandler) Handle(ctx context.Context, req *pb.GetMessagesSinceRequest) (*pb.GetMessagesSinceResponse, error) {
identity, ok := auth.IdentityFromContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "no caller identity in context")
}

if identity.IsAnonymous && h.maxAnonymousGetMessageSinceRange > 0 {
if time.Since(time.Unix(req.Since, 0)) > h.maxAnonymousGetMessageSinceRange {
return nil, status.Error(codes.PermissionDenied, fmt.Sprintf("anonymous access is limited to data from the last %s", h.maxAnonymousGetMessageSinceRange.String()))
}
}

committeeID := LoadCommitteeIDFromContext(ctx)

h.logger(ctx).Tracef("Received GetMessagesSinceRequest")
storage, err := h.storage.QueryAggregatedReports(ctx, req.Since, time.Now().Unix(), committeeID, &req.NextToken)
h.logger(ctx).Tracef("Received GetMessagesSinceRequest, sinceSequence: %d, nextToken: %v", req.SinceSequence, req.NextToken)
storage, err := h.storage.QueryAggregatedReports(ctx, req.SinceSequence, committeeID, &req.NextToken)
if err != nil {
return nil, err
}

records := make([]*pb.MessageWithCCVData, 0, len(storage.Reports))
records := make([]*pb.VerifierResult, 0, len(storage.Reports))
for _, report := range storage.Reports {
ccvData, err := model.MapAggregatedReportToCCVDataProto(report, h.committee)
if err != nil {
Expand All @@ -62,6 +46,10 @@ func (h *GetMessagesSinceHandler) Handle(ctx context.Context, req *pb.GetMessage
h.m.Metrics().RecordMessageSinceNumberOfRecordsReturned(ctx, len(records))
h.logger(ctx).Tracef("Returning %d records for GetMessagesSinceRequest", len(records))

for _, report := range storage.Reports {
h.logger(ctx).Tracef("Report MessageID: %x, Sequence: %d, Verifications: %d", report.MessageID, report.Sequence, len(report.Verifications))
}

if storage.NextPageToken != nil {
return &pb.GetMessagesSinceResponse{
Results: records,
Expand Down
29 changes: 25 additions & 4 deletions aggregator/pkg/model/commit_aggregated_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package model

import (
"encoding/hex"
"math"
"time"

pb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go/v1"
Expand All @@ -13,10 +14,7 @@ type CommitAggregatedReport struct {
MessageID MessageID
CommitteeID CommitteeID
Verifications []*CommitVerificationRecord
// Timestamp represents the latest verification time among all verifications.
// This field is used for idempotency - reports with the same verifications will have
// the same Timestamp, ensuring retries don't create duplicate records.
Timestamp int64
Sequence int64
// WrittenAt represents when the aggregated report was written to storage (in Unix seconds).
// This field is used for ordering in the GetMessagesSince API to return reports
// in the order they were finalized/stored, not the order of individual verifications.
Expand All @@ -28,6 +26,29 @@ type PaginatedAggregatedReports struct {
NextPageToken *string
}

func normalizeTimestampToSeconds(timestamp int64) int64 {
if timestamp <= 0 {
return timestamp
}
digits := int(math.Log10(float64(timestamp))) + 1
if digits > 10 {
divisor := int64(math.Pow10(digits - 10))
return timestamp / divisor
}
return timestamp
}

func (c *CommitAggregatedReport) GetMostRecentVerificationTimestamp() int64 {
var mostRecent int64
for _, v := range c.Verifications {
vTimestampSeconds := normalizeTimestampToSeconds(v.GetTimestamp())
if vTimestampSeconds > mostRecent {
mostRecent = vTimestampSeconds
}
}
return mostRecent
}

func GetAggregatedReportID(messageID MessageID, committeeID CommitteeID) string {
return hex.EncodeToString(messageID) + ":" + committeeID
}
Expand Down
5 changes: 3 additions & 2 deletions aggregator/pkg/model/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ func (q *QuorumConfig) GetDestVerifierAddressBytes() []byte {
type StorageType string

const (
StorageTypeMemory StorageType = "memory"
StorageTypeDynamoDB StorageType = "dynamodb"
StorageTypeMemory StorageType = "memory"
StorageTypePostgreSQL StorageType = "postgres"
StorageTypeDynamoDB StorageType = "dynamodb"
)

type DynamoDBConfig struct {
Expand Down
5 changes: 3 additions & 2 deletions aggregator/pkg/model/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func compareStringCaseInsensitive(a, b string) bool {
return bytes.EqualFold([]byte(a), []byte(b))
}

func MapAggregatedReportToCCVDataProto(report *CommitAggregatedReport, committees map[string]*Committee) (*pb.MessageWithCCVData, error) {
func MapAggregatedReportToCCVDataProto(report *CommitAggregatedReport, committees map[string]*Committee) (*pb.VerifierResult, error) {
participantSignatures := make(map[string]protocol.Data)
for _, verification := range report.Verifications {
if verification.IdentifierSigner == nil {
Expand Down Expand Up @@ -90,11 +90,12 @@ func MapAggregatedReportToCCVDataProto(report *CommitAggregatedReport, committee
return nil, fmt.Errorf("failed to encode signatures: %w", err)
}

return &pb.MessageWithCCVData{
return &pb.VerifierResult{
Message: report.GetMessage(),
SourceVerifierAddress: report.GetSourceVerifierAddress(),
DestVerifierAddress: quorumConfig.GetDestVerifierAddressBytes(),
CcvData: encodedSignatures,
Timestamp: report.WrittenAt,
Sequence: report.Sequence,
}, nil
}
16 changes: 12 additions & 4 deletions aggregator/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
// Server represents a gRPC server for the aggregator service.
type Server struct {
pb.UnimplementedAggregatorServer
pb.UnimplementedCCVDataServer
pb.UnimplementedVerifierResultAPIServer

l logger.Logger
config *model.AggregatorConfig
Expand Down Expand Up @@ -75,7 +75,7 @@ func (s *Server) ReadCommitCCVNodeData(ctx context.Context, req *pb.ReadCommitCC
return s.readCommitCCVNodeDataHandler.Handle(ctx, req)
}

func (s *Server) GetCCVDataForMessage(ctx context.Context, req *pb.GetCCVDataForMessageRequest) (*pb.MessageWithCCVData, error) {
func (s *Server) GetVerifierResultForMessage(ctx context.Context, req *pb.GetVerifierResultForMessageRequest) (*pb.VerifierResult, error) {
return s.getCCVDataForMessageHandler.Handle(ctx, req)
}

Expand Down Expand Up @@ -264,8 +264,14 @@ func NewServer(l logger.SugaredLogger, config *model.AggregatorConfig) *Server {
hmacAuthMiddleware := middlewares.NewHMACAuthMiddleware(&config.APIKeys, l)
anonymousAuthMiddleware := middlewares.NewAnonymousAuthMiddleware()

// Initialize rate limiting middleware
rateLimitingMiddleware, err := middlewares.NewRateLimitingMiddlewareFromConfig(config.RateLimiting, config.APIKeys, l)
if err != nil {
l.Fatalf("Failed to initialize rate limiting middleware: %v", err)
}

isCCVDataService := func(ctx context.Context, callMeta interceptors.CallMeta) bool {
return callMeta.Service == pb.CCVData_ServiceDesc.ServiceName
return callMeta.Service == pb.VerifierResultAPI_ServiceDesc.ServiceName
}

aggMonitoring.Metrics().IncrementPendingAggregationsChannelBuffer(context.Background(), 1000) // Pre-increment the buffer size metric
Expand All @@ -290,6 +296,8 @@ func NewServer(l logger.SugaredLogger, config *model.AggregatorConfig) *Server {

// Require authentication for all requests (ensures identity is set)
middlewares.RequireAuthInterceptor,

rateLimitingMiddleware.Intercept,
),
)

Expand All @@ -314,7 +322,7 @@ func NewServer(l logger.SugaredLogger, config *model.AggregatorConfig) *Server {
mu: sync.Mutex{},
}

pb.RegisterCCVDataServer(grpcServer, server)
pb.RegisterVerifierResultAPIServer(grpcServer, server)
pb.RegisterAggregatorServer(grpcServer, server)
reflection.Register(grpcServer)

Expand Down
Loading