Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,16 @@ import (
substrateExecutor "github.com/ChainSafe/sygma-relayer/chains/substrate/executor"
substrate_listener "github.com/ChainSafe/sygma-relayer/chains/substrate/listener"
substrate_pallet "github.com/ChainSafe/sygma-relayer/chains/substrate/pallet"
"github.com/ChainSafe/sygma-relayer/metrics"
"github.com/centrifuge/go-substrate-rpc-client/v4/signature"

"github.com/ChainSafe/sygma-relayer/comm/elector"
"github.com/ChainSafe/sygma-relayer/comm/p2p"
"github.com/ChainSafe/sygma-relayer/config"
"github.com/ChainSafe/sygma-relayer/health"
"github.com/ChainSafe/sygma-relayer/jobs"
"github.com/ChainSafe/sygma-relayer/keyshare"
"github.com/ChainSafe/sygma-relayer/metrics"
"github.com/ChainSafe/sygma-relayer/topology"
"github.com/ChainSafe/sygma-relayer/tss"
"github.com/centrifuge/go-substrate-rpc-client/v4/signature"
"github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -132,7 +131,11 @@ func Run() error {
exitLock := &sync.RWMutex{}
defer exitLock.Lock()

mp, err := opentelemetry.InitMetricProvider(context.Background(), configuration.RelayerConfig.OpenTelemetryCollectorURL)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

OTLPResource := opentelemetry.InitResource(fmt.Sprintf("Relayer-%s", configuration.RelayerConfig.Id), configuration.RelayerConfig.Env)
mp, err := opentelemetry.InitMetricProvider(ctx, OTLPResource, configuration.RelayerConfig.OpenTelemetryCollectorURL)
if err != nil {
panic(err)
}
Expand All @@ -146,8 +149,16 @@ func Run() error {
panic(err)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tp, err := opentelemetry.InitTracesProvider(ctx, OTLPResource, configuration.RelayerConfig.OpenTelemetryCollectorURL)
if err != nil {
panic(err)
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Error().Msgf("Error shutting down tracer provider: %v", err)
}
}()

chains := []relayer.RelayedChain{}
for _, chainConfig := range configuration.ChainConfigs {
switch chainConfig["type"] {
Expand Down
7 changes: 3 additions & 4 deletions chains/evm/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

type BatchProposalExecutor interface {
Execute(msgs []*message.Message) error
Execute(ctx context.Context, msgs []*message.Message) error
}

type EVMChain struct {
Expand Down Expand Up @@ -53,13 +53,12 @@ func NewEVMChain(
}
}

func (c *EVMChain) Write(msgs []*message.Message) error {
err := c.executor.Execute(msgs)
func (c *EVMChain) Write(ctx context.Context, msgs []*message.Message) error {
err := c.executor.Execute(ctx, msgs)
if err != nil {
log.Err(err).Str("domainID", string(c.domainID)).Msgf("error writing messages %+v", msgs)
return err
}

return nil
}

Expand Down
64 changes: 44 additions & 20 deletions chains/evm/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@ import (
"sync"
"time"

"github.com/binance-chain/tss-lib/common"
"github.com/sourcegraph/conc/pool"

ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/host"
"github.com/rs/zerolog/log"

"github.com/ChainSafe/chainbridge-core/chains/evm/calls/transactor"
"github.com/ChainSafe/chainbridge-core/chains/evm/executor/proposal"
"github.com/ChainSafe/chainbridge-core/relayer/message"
"github.com/ChainSafe/sygma-relayer/chains"
"github.com/ChainSafe/sygma-relayer/comm"
"github.com/ChainSafe/sygma-relayer/tss"
"github.com/ChainSafe/sygma-relayer/tss/signing"
"github.com/binance-chain/tss-lib/common"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/host"
"github.com/rs/zerolog/log"
"github.com/sourcegraph/conc/pool"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
traceapi "go.opentelemetry.io/otel/trace"
)

var (
Expand Down Expand Up @@ -72,26 +74,34 @@ func NewExecutor(
}

// Execute starts a signing process and executes proposals when signature is generated
func (e *Executor) Execute(msgs []*message.Message) error {
func (e *Executor) Execute(ctx context.Context, msgs []*message.Message) error {
e.exitLock.RLock()
defer e.exitLock.RUnlock()
ctxWithSpan, span := otel.Tracer("relayer-sygma").Start(ctx, "relayer.sygma.evm.Execute")
defer span.End()
logger := log.With().Str("dd.trace_id", span.SpanContext().TraceID().String()).Logger()

proposals := make([]*chains.Proposal, 0)
for _, m := range msgs {
logger.Info().Str("msg.id", m.ID()).Msgf("Message to execute %s", m.String())
span.AddEvent("Message to execute received", traceapi.WithAttributes(attribute.String("msg.id", m.ID()), attribute.String("msg.type", string(m.Type))))
prop, err := e.mh.HandleMessage(m)
if err != nil {
return err
return fmt.Errorf("failed to handle message %s with error: %w", m.String(), err)
}
evmProposal := chains.NewProposal(prop.Source, prop.Destination, prop.DepositNonce, prop.ResourceId, prop.Data, prop.Metadata)
isExecuted, err := e.bridge.IsProposalExecuted(evmProposal)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return err
}
if isExecuted {
log.Info().Msgf("Prop %p already executed", prop)
logger.Info().Str("msg.id", m.ID()).Msgf("Message already executed %s", m.String())
span.AddEvent("Message already executed", traceapi.WithAttributes(attribute.String("msg.id", m.ID()), attribute.String("msg.type", string(m.Type))))
continue
}

logger.Info().Str("msg.id", m.ID()).Msgf("Executing message %s", m.String())
span.AddEvent("Executing message", traceapi.WithAttributes(attribute.String("msg.id", m.ID()), attribute.String("msg.type", string(m.Type))))
proposals = append(proposals, evmProposal)
}
if len(proposals) == 0 {
Expand All @@ -100,39 +110,47 @@ func (e *Executor) Execute(msgs []*message.Message) error {

propHash, err := e.bridge.ProposalsHash(proposals)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return err
}

sessionID := e.sessionID(propHash)

span.AddEvent("SessionID created", traceapi.WithAttributes(attribute.String("tss.session.id", sessionID)))

msg := big.NewInt(0)
msg.SetBytes(propHash)
signing, err := signing.NewSigning(
msg,
e.sessionID(propHash),
span.SpanContext().TraceID().String(),
e.host,
e.comm,
e.fetcher)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return err
}

sigChn := make(chan interface{})
executionContext, cancelExecution := context.WithCancel(context.Background())
watchContext, cancelWatch := context.WithCancel(context.Background())
executionContext, cancelExecution := context.WithCancel(ctxWithSpan)
watchContext, cancelWatch := context.WithCancel(ctxWithSpan)
pool := pool.New().WithErrors()
pool.Go(func() error {
err := e.coordinator.Execute(executionContext, signing, sigChn)
if err != nil {
cancelWatch()
}

return err
})
pool.Go(func() error { return e.watchExecution(watchContext, cancelExecution, proposals, sigChn, sessionID) })
return pool.Wait()
}

func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.CancelFunc, proposals []*chains.Proposal, sigChn chan interface{}, sessionID string) error {
ctxWithSpan, span := otel.Tracer("relayer-sygma").Start(ctx, "relayer.sygma.evm.watchExecution")
defer span.End()
logger := log.With().Str("dd.trace_id", span.SpanContext().TraceID().String()).Logger()
ticker := time.NewTicker(executionCheckPeriod)
timeout := time.NewTicker(signingTimeout)
defer ticker.Stop()
Expand All @@ -149,25 +167,28 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C
}

signatureData := sigResult.(*common.SignatureData)
hash, err := e.executeProposal(proposals, signatureData)
hash, err := e.executeProposal(ctxWithSpan, proposals, signatureData)
if err != nil {
_ = e.comm.Broadcast(e.host.Peerstore().Peers(), []byte{}, comm.TssFailMsg, sessionID)
_ = e.comm.Broadcast(ctxWithSpan, e.host.Peerstore().Peers(), []byte{}, comm.TssFailMsg, sessionID)
span.SetStatus(codes.Error, fmt.Errorf("executing proposel has failed %w", err).Error())
return err
}

log.Info().Str("SessionID", sessionID).Msgf("Sent proposals execution with hash: %s", hash)
logger.Info().Str("SessionID", sessionID).Msgf("Sent proposals execution with hash: %s", hash)
}
case <-ticker.C:
{
if !e.areProposalsExecuted(proposals, sessionID) {
continue
}

log.Info().Str("SessionID", sessionID).Msgf("Successfully executed proposals")
logger.Info().Str("SessionID", sessionID).Msgf("Successfully executed proposals")
span.AddEvent("Proposals executed", traceapi.WithAttributes(attribute.String("tss.session.id", sessionID)))
return nil
}
case <-timeout.C:
{
span.SetStatus(codes.Error, fmt.Errorf("execution timed out in %s", signingTimeout).Error())
return fmt.Errorf("execution timed out in %s", signingTimeout)
}
case <-ctx.Done():
Expand All @@ -178,7 +199,9 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C
}
}

func (e *Executor) executeProposal(proposals []*chains.Proposal, signatureData *common.SignatureData) (*ethCommon.Hash, error) {
func (e *Executor) executeProposal(ctx context.Context, proposals []*chains.Proposal, signatureData *common.SignatureData) (*ethCommon.Hash, error) {
_, span := otel.Tracer("relayer-sygma").Start(ctx, "relayer.sygma.evm.executeProposal")
defer span.End()
sig := []byte{}
sig = append(sig[:], ethCommon.LeftPadBytes(signatureData.R, 32)...)
sig = append(sig[:], ethCommon.LeftPadBytes(signatureData.S, 32)...)
Expand All @@ -195,9 +218,10 @@ func (e *Executor) executeProposal(proposals []*chains.Proposal, signatureData *
GasLimit: gasLimit,
})
if err != nil {
span.SetStatus(codes.Error, err.Error())
return nil, err
}

span.AddEvent("Deposit executed", traceapi.WithAttributes(attribute.String("tx.hash", hash.String())))
return hash, err
}

Expand Down
Loading