Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/application/lps/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,18 @@ func NewApplication(initCtx context.Context, env environment.Environment, timeou
if err != nil {
log.Fatal("Error creating BTC registry:", err)
}

dbRegistry := registry.NewDatabaseRegistry(dbConnection)
rootstockRegistry, err := registry.NewRootstockRegistry(env, rskClient, walletFactory, timeouts)
if err != nil {
log.Fatal("Error creating Rootstock registry:", err)
}

messagingRegistry := registry.NewMessagingRegistry(initCtx, env, rskClient, btcConnection, externalClients)
liquidityProvider := registry.NewLiquidityProvider(dbRegistry, rootstockRegistry, btcRegistry, messagingRegistry)
mutexes := environment.NewApplicationMutexes()

useCaseRegistry := registry.NewUseCaseRegistry(env, rootstockRegistry, btcRegistry, dbRegistry, liquidityProvider, messagingRegistry, mutexes)
useCaseRegistry, err := registry.NewUseCaseRegistry(env, rootstockRegistry, btcRegistry, dbRegistry, liquidityProvider, messagingRegistry, mutexes)
if err != nil {
log.Fatal("Error creating use case registry:", err)
}
watcherRegistry := registry.NewWatcherRegistry(env, useCaseRegistry, rootstockRegistry, btcRegistry, liquidityProvider, messagingRegistry, watcher.NewApplicationTickers(), timeouts)
return &Application{
env: env,
Expand Down
1 change: 1 addition & 0 deletions docker-compose/local/docker-compose.lps.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ services:
- BTC_RELEASE_WATCHER_START_BLOCK
- BTC_RELEASE_WATCHER_PAGE_SIZE
- BTC_RELEASE_CHECK_TIMEOUT
- REBALANCE_STRATEGY
ports:
- "8080:8080"
volumes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestPegoutBridgeWatcher_Start(t *testing.T) {
bridge := &mocks.BridgeMock{}
bridge.On("GetAddress").Return(test.AnyAddress)
mutexes := environment.NewApplicationMutexes()
bridgeUseCase := pegout.NewBridgePegoutUseCase(pegoutRepository, providerMock, rskWallet, blockchain.RskContracts{Bridge: bridge}, mutexes.RskWalletMutex())
bridgeUseCase := pegout.NewBridgePegoutUseCase(pegoutRepository, providerMock, rskWallet, blockchain.RskContracts{Bridge: bridge}, mutexes.RskWalletMutex(), pegout.AllAtOnce)
getUseCase := w.NewGetWatchedPegoutQuoteUseCase(pegoutRepository)
bridgeWatcher := watcher.NewPegoutBridgeWatcher(getUseCase, bridgeUseCase, ticker)
resetMocks := func() {
Expand Down
1 change: 1 addition & 0 deletions internal/configuration/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ type PegoutEnv struct {
DepositCacheStartBlock uint64 `env:"PEGOUT_DEPOSIT_CACHE_START_BLOCK"`
BtcReleaseWatcherStartBlock uint64 `env:"BTC_RELEASE_WATCHER_START_BLOCK"`
BtcReleaseWatcherPageSize uint64 `env:"BTC_RELEASE_WATCHER_PAGE_SIZE"`
RebalanceStrategy string `env:"REBALANCE_STRATEGY" validate:"required,oneof=ALL_AT_ONCE UTXO_SPLIT"`
}

type CaptchaEnv struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func setUpEnv(t *testing.T) {
"BTC_RELEASE_WATCHER_START_BLOCK": "1",
"USE_SEGWIT_FEDERATION": "true",
"ALLOWED_ORIGINS": "http://example.com,http://example2.com",
"REBALANCE_STRATEGY": "ALL_AT_ONCE",
}
const envFilePath = "../../../sample-config.env"
envFile, err := os.ReadFile(envFilePath)
Expand Down
9 changes: 7 additions & 2 deletions internal/configuration/registry/usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ func NewUseCaseRegistry(
liquidityProvider *dataproviders.LocalLiquidityProvider,
messaging *Messaging,
mutexes entities.ApplicationMutexes,
) *UseCaseRegistry {
) (*UseCaseRegistry, error) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is no chance this returns an error anymore you should update the signature

rebalanceStrategy, err := pegout.ParseRebalanceStrategy(env.Pegout.RebalanceStrategy)
if err != nil {
return nil, err
}
return &UseCaseRegistry{
summariesUseCase: reports.NewSummariesUseCase(
databaseRegistry.PeginRepository,
Expand Down Expand Up @@ -259,6 +263,7 @@ func NewUseCaseRegistry(
rskRegistry.Wallet,
rskRegistry.Contracts,
mutexes.RskWalletMutex(),
rebalanceStrategy,
),
peginStatusUseCase: pegin.NewStatusUseCase(databaseRegistry.PeginRepository),
pegoutStatusUseCase: pegout.NewStatusUseCase(databaseRegistry.PegoutRepository),
Expand Down Expand Up @@ -348,7 +353,7 @@ func NewUseCaseRegistry(
messaging.Rpc,
utils.Scale,
),
}
}, nil
}

func (registry *UseCaseRegistry) GetPeginQuoteUseCase() *pegin.GetQuoteUseCase {
Expand Down
5 changes: 3 additions & 2 deletions internal/configuration/registry/usecase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestNewUseCaseRegistry(t *testing.T) {
BridgeAddress: "0x0000000000000000000000000000000001000006",
},
Btc: environment.BtcEnv{Network: "testnet"},
Pegout: environment.PegoutEnv{RebalanceStrategy: "ALL_AT_ONCE"},
}

client := &mocks.DbClientBindingMock{}
Expand All @@ -55,8 +56,8 @@ func TestNewUseCaseRegistry(t *testing.T) {
lp := registry.NewLiquidityProvider(dbRegistry, rskRegistry, btcRegistry, messagingRegistry)
mutexes := environment.NewApplicationMutexes()

useCaseRegistry := registry.NewUseCaseRegistry(env, rskRegistry, btcRegistry, dbRegistry, lp, messagingRegistry, mutexes)

useCaseRegistry, err := registry.NewUseCaseRegistry(env, rskRegistry, btcRegistry, dbRegistry, lp, messagingRegistry, mutexes)
require.NoError(t, err)
require.NotNil(t, useCaseRegistry)
value := reflect.ValueOf(useCaseRegistry).Elem()
for i := 0; i < value.NumField(); i++ {
Expand Down
4 changes: 3 additions & 1 deletion internal/configuration/registry/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestNewWatcherRegistry(t *testing.T) {
BridgeAddress: "0x0000000000000000000000000000000001000006",
},
Btc: environment.BtcEnv{Network: "testnet"},
Pegout: environment.PegoutEnv{RebalanceStrategy: "ALL_AT_ONCE"},
}

client := &mocks.DbClientBindingMock{}
Expand All @@ -53,7 +54,8 @@ func TestNewWatcherRegistry(t *testing.T) {
messagingRegistry := registry.NewMessagingRegistry(context.Background(), environment.Environment{}, rskClient, connection, registry.ExternalRpc{})
lp := registry.NewLiquidityProvider(dbRegistry, rskRegistry, btcRegistry, messagingRegistry)
mutexes := environment.NewApplicationMutexes()
useCaseRegistry := registry.NewUseCaseRegistry(env, rskRegistry, btcRegistry, dbRegistry, lp, messagingRegistry, mutexes)
useCaseRegistry, err := registry.NewUseCaseRegistry(env, rskRegistry, btcRegistry, dbRegistry, lp, messagingRegistry, mutexes)
require.NoError(t, err)

watcherRegistry := registry.NewWatcherRegistry(env, useCaseRegistry, rskRegistry, btcRegistry, lp, messagingRegistry, watcher.NewApplicationTickers(), environment.DefaultTimeouts())

Expand Down
103 changes: 98 additions & 5 deletions internal/usecases/pegout/bridge_pegout.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pegout
import (
"context"
"errors"
"fmt"
"sync"

"github.com/rsksmart/liquidity-provider-server/internal/entities"
Expand All @@ -20,12 +21,31 @@ const (
BridgeConversionGasPrice = 60000000
)

type RebalanceStrategy string

const (
AllAtOnce RebalanceStrategy = "ALL_AT_ONCE"
UtxoSplit RebalanceStrategy = "UTXO_SPLIT"
)

func ParseRebalanceStrategy(s string) (RebalanceStrategy, error) {
switch s {
case string(AllAtOnce):
return AllAtOnce, nil
case string(UtxoSplit):
return UtxoSplit, nil
default:
return "", fmt.Errorf("unknown rebalance strategy: %q", s)
}
}

type BridgePegoutUseCase struct {
quoteRepository quote.PegoutQuoteRepository
pegoutProvider liquidity_provider.PegoutLiquidityProvider
rskWallet blockchain.RootstockWallet
contracts blockchain.RskContracts
rskWalletMutex sync.Locker
strategy RebalanceStrategy
}

func NewBridgePegoutUseCase(
Expand All @@ -34,19 +54,21 @@ func NewBridgePegoutUseCase(
rskWallet blockchain.RootstockWallet,
contracts blockchain.RskContracts,
rskWalletMutex sync.Locker,
strategy RebalanceStrategy,
) *BridgePegoutUseCase {
return &BridgePegoutUseCase{
quoteRepository: quoteRepository,
pegoutProvider: pegoutProvider,
rskWallet: rskWallet,
contracts: contracts,
rskWalletMutex: rskWalletMutex,
strategy: strategy,
}
}

func (useCase *BridgePegoutUseCase) Run(ctx context.Context, watchedQuotes ...quote.WatchedPegoutQuote) error {
var err error
var balance, totalValue *entities.Wei
var totalValue *entities.Wei

totalValue, err = useCase.calculateTotalToPegout(watchedQuotes)
if err != nil {
Expand All @@ -66,23 +88,94 @@ func (useCase *BridgePegoutUseCase) Run(ctx context.Context, watchedQuotes ...qu
useCase.rskWalletMutex.Lock()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the mutex should be passed to the handler. Otherwise if you call the function from somewhere else or scan the file without context it can throw false positive alerts for possible race conditions

defer useCase.rskWalletMutex.Unlock()

requiredBalance := new(entities.Wei).Add(totalValue, entities.NewWei(BridgeConversionGasLimit*BridgeConversionGasPrice))
if balance, err = useCase.rskWallet.GetBalance(ctx); err != nil {
switch useCase.strategy {
case UtxoSplit:
return useCase.runUtxoSplit(ctx, totalValue, pegoutConfig, watchedQuotes)
default:
return useCase.runAllAtOnce(ctx, totalValue, watchedQuotes)
}
}

func (useCase *BridgePegoutUseCase) checkBalance(ctx context.Context, requiredBalance *entities.Wei) error {
balance, err := useCase.rskWallet.GetBalance(ctx)
if err != nil {
return usecases.WrapUseCaseError(usecases.BridgePegoutId, err)
} else if balance.Cmp(requiredBalance) < 0 {
}
if balance.Cmp(requiredBalance) < 0 {
return usecases.WrapUseCaseError(usecases.BridgePegoutId, usecases.InsufficientAmountError)
}
return nil
}

func (useCase *BridgePegoutUseCase) runAllAtOnce(ctx context.Context, totalValue *entities.Wei, watchedQuotes []quote.WatchedPegoutQuote) error {
requiredBalance := new(entities.Wei).Add(totalValue, entities.NewWei(BridgeConversionGasLimit*BridgeConversionGasPrice))
if err := useCase.checkBalance(ctx, requiredBalance); err != nil {
return err
}

config := blockchain.NewTransactionConfig(totalValue, BridgeConversionGasLimit, entities.NewWei(BridgeConversionGasPrice))
receipt, txErr := useCase.rskWallet.SendRbtc(ctx, config, useCase.contracts.Bridge.GetAddress())
if txErr == nil {
log.Debugf("%s: transaction sent to the bridge successfully (%s)", usecases.BridgePegoutId, receipt.TransactionHash)
}

err = useCase.updateQuotes(ctx, receipt, txErr, watchedQuotes)
if err := useCase.updateQuotes(ctx, receipt, txErr, watchedQuotes); err != nil {
return usecases.WrapUseCaseError(usecases.BridgePegoutId, err)
}
return nil
}

func (useCase *BridgePegoutUseCase) runUtxoSplit(
ctx context.Context,
totalValue *entities.Wei,
pegoutConfig liquidity_provider.PegoutConfiguration,
watchedQuotes []quote.WatchedPegoutQuote,
) error {
bridgeMin := pegoutConfig.BridgeTransactionMin
numTxs, err := new(entities.Wei).Div(totalValue, bridgeMin)
if err != nil {
return usecases.WrapUseCaseError(usecases.BridgePegoutId, err)
}
remainder := new(entities.Wei).Sub(totalValue, new(entities.Wei).Mul(numTxs, bridgeMin))

gasPerTx := entities.NewWei(BridgeConversionGasLimit * BridgeConversionGasPrice)
requiredBalance := new(entities.Wei).Add(totalValue, new(entities.Wei).Mul(numTxs, gasPerTx))
if err := useCase.checkBalance(ctx, requiredBalance); err != nil {
return err
}

bridgeAddress := useCase.contracts.Bridge.GetAddress()
n := numTxs.Uint64()

// First chunk absorbs the remainder (when N=1, firstChunk == totalValue)
firstChunk := new(entities.Wei).Add(bridgeMin.Copy(), remainder)
var receipt blockchain.TransactionReceipt
var txErr error

config := blockchain.NewTransactionConfig(firstChunk, BridgeConversionGasLimit, entities.NewWei(BridgeConversionGasPrice))
receipt, txErr = useCase.rskWallet.SendRbtc(ctx, config, bridgeAddress)
if txErr == nil {
log.Debugf("%s: split tx 1/%d sent to the bridge successfully (%s)", usecases.BridgePegoutId, n, receipt.TransactionHash)
} else {
if err := useCase.updateQuotes(ctx, receipt, txErr, watchedQuotes); err != nil {
return usecases.WrapUseCaseError(usecases.BridgePegoutId, err)
}
return usecases.WrapUseCaseError(usecases.BridgePegoutId, txErr)
}

for i := uint64(1); i < n; i++ {
config = blockchain.NewTransactionConfig(bridgeMin.Copy(), BridgeConversionGasLimit, entities.NewWei(BridgeConversionGasPrice))
receipt, txErr = useCase.rskWallet.SendRbtc(ctx, config, bridgeAddress)
if txErr == nil {
log.Debugf("%s: split tx %d/%d sent to the bridge successfully (%s)", usecases.BridgePegoutId, i+1, n, receipt.TransactionHash)
} else {
break
}
}

if err := useCase.updateQuotes(ctx, receipt, txErr, watchedQuotes); err != nil {
return usecases.WrapUseCaseError(usecases.BridgePegoutId, err)
}
return nil
}

Expand Down
Loading
Loading