Skip to content

Commit

Permalink
fix: improve filter (#421)
Browse files Browse the repository at this point in the history
* improve filter

* fix allocate
  • Loading branch information
LexLuthr authored Feb 25, 2025
1 parent 1f962a1 commit da0f41d
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 117 deletions.
85 changes: 38 additions & 47 deletions cmd/sptool/toolbox_deal_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,15 @@ var allocateCmd = &cli.Command{
Usage: "storage provider address[es]",
Aliases: []string{"m", "provider", "p"},
},
&cli.StringSliceFlag{
Name: "piece-info",
Usage: "data piece-info[s] to create the allocation. The format must be --piece-info pieceCid1=pieceSize1 --piece-info pieceCid2=pieceSize2",
Aliases: []string{"pi"},
&cli.StringFlag{
Name: "piece-cid",
Usage: "data piece-cid to create the allocation",
Aliases: []string{"piece"},
},
&cli.Int64Flag{
Name: "piece-size",
Usage: "piece size to create the allocation",
Aliases: []string{"size"},
},
&cli.StringFlag{
Name: "wallet",
Expand Down Expand Up @@ -457,7 +462,7 @@ var allocateCmd = &cli.Command{
},
&cli.StringFlag{
Name: "piece-file",
Usage: "file containing piece-info[s] to create the allocation. Each line in the file should be in the format 'pieceCid,pieceSize,miner,tmin,tmax,expiration'",
Usage: "file containing piece information to create the allocation. Each line in the file should be in the format 'pieceCid,pieceSize,miner,tmin,tmax,expiration'",
Aliases: []string{"pf"},
},
&cli.IntFlag{
Expand Down Expand Up @@ -490,18 +495,18 @@ var allocateCmd = &cli.Command{

pieceFile := cctx.String("piece-file")
miners := cctx.StringSlice("miner")
pinfos := cctx.StringSlice("piece-info")
pcids := cctx.String("piece-cid")

if pieceFile == "" && len(pinfos) < 1 {
return fmt.Errorf("must provide at least one --piece-info or use --piece-file")
if pieceFile == "" && pcids == "" {
return fmt.Errorf("must provide at least one --piece-cid or use --piece-file")
}

if pieceFile == "" && len(miners) < 1 {
return fmt.Errorf("must provide at least one miner address or use --piece-file")
}

if pieceFile != "" && len(pinfos) > 0 {
return fmt.Errorf("cannot use both --piece-info and --piece-file flags at once")
if pieceFile != "" && pcids != "" {
return fmt.Errorf("cannot use both --piece-cid and --piece-file flags at once")
}

var pieceInfos []PieceInfos
Expand Down Expand Up @@ -590,44 +595,30 @@ var allocateCmd = &cli.Command{
if err != nil {
return fmt.Errorf("failed to convert miner address %w", err)
}
for _, p := range cctx.StringSlice("piece-info") {
pieceDetail := strings.Split(p, "=")
if len(pieceDetail) != 2 {
return fmt.Errorf("incorrect pieceInfo format: %s", pieceDetail)
}

size, err := strconv.ParseInt(pieceDetail[1], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse the piece size for %s for pieceCid %s: %w", pieceDetail[0], pieceDetail[1], err)
}
pcid, err := cid.Parse(pieceDetail[0])
if err != nil {
return fmt.Errorf("failed to parse the pieceCid for %s: %w", pieceDetail[0], err)
}

tmin := abi.ChainEpoch(cctx.Int64("term-min"))

tmax := abi.ChainEpoch(cctx.Int64("term-max"))

exp := abi.ChainEpoch(cctx.Int64("expiration"))
if exp == verifreg13.MaximumVerifiedAllocationExpiration {
exp -= 5
}

if tmax < tmin {
return fmt.Errorf("maximum duration %d cannot be smaller than minimum duration %d", tmax, tmin)
}

pieceInfos = append(pieceInfos, PieceInfos{
Cid: pcid,
Size: size,
Miner: abi.ActorID(mid),
MinerAddr: maddr,
Tmin: tmin,
Tmax: tmax,
Exp: exp,
})
pcid, err := cid.Parse(cctx.String("piece-cid"))
if err != nil {
return fmt.Errorf("failed to parse pieceCid %w", err)
}
size := cctx.Int64("piece-size")

tmin := abi.ChainEpoch(cctx.Int64("term-min"))

tmax := abi.ChainEpoch(cctx.Int64("term-max"))

exp := abi.ChainEpoch(cctx.Int64("expiration"))
if exp == verifreg13.MaximumVerifiedAllocationExpiration {
exp -= 5
}

pieceInfos = append(pieceInfos, PieceInfos{
Cid: pcid,
Size: size,
Miner: abi.ActorID(mid),
MinerAddr: maddr,
Tmin: tmin,
Tmax: tmax,
Exp: exp,
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion docker/piece-server/sample/ddo-deal.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ miner_actor=$(lotus state list-miners | grep -v t01000)

mv /var/lib/curio-client/data/$PAYLOAD_CID.car /var/lib/curio-client/data/$COMMP_CID

sptool --actor t01000 toolbox mk12-client allocate -y -p $miner_actor --pi $COMMP_CID=$PIECE --confidence 0
sptool --actor t01000 toolbox mk12-client allocate -y -p $miner_actor --piece-cid $COMMP_CID --piece-size $PIECE --confidence 0

CLIENT=$(sptool --actor t01000 toolbox mk12-client wallet default)

Expand Down
5 changes: 3 additions & 2 deletions documentation/en/curio-cli/sptool.md
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,8 @@ DESCRIPTION:
OPTIONS:
--miner value, -m value, --provider value, -p value [ --miner value, -m value, --provider value, -p value ] storage provider address[es]
--piece-info value, --pi value [ --piece-info value, --pi value ] data piece-info[s] to create the allocation. The format must be --piece-info pieceCid1=pieceSize1 --piece-info pieceCid2=pieceSize2
--piece-cid value, --piece value data piece-cid to create the allocation
--piece-size value, --size value piece size to create the allocation (default: 0)
--wallet value the wallet address that will used create the allocation
--quiet do not print the allocation list (default: false)
--term-min value, --tmin value The minimum duration which the provider must commit to storing the piece to avoid early-termination penalties (epochs).
Expand All @@ -650,7 +651,7 @@ OPTIONS:
Default is 5 years. (default: 5256000)
--expiration value The latest epoch by which a provider must commit data before the allocation expires (epochs).
Default is 60 days. (default: 172800)
--piece-file value, --pf value file containing piece-info[s] to create the allocation. Each line in the file should be in the format 'pieceCid,pieceSize,miner,tmin,tmax,expiration'
--piece-file value, --pf value file containing piece information to create the allocation. Each line in the file should be in the format 'pieceCid,pieceSize,miner,tmin,tmax,expiration'
--batch-size value number of extend requests per batch. If set incorrectly, this will lead to out of gas error (default: 500)
--confidence value number of block confirmations to wait for (default: 5)
--assume-yes, -y, --yes automatic yes to prompts; assume 'yes' as answer to all prompts and run non-interactively (default: false)
Expand Down
20 changes: 20 additions & 0 deletions documentation/en/curio-market/storage-market.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,23 @@ curio market seal --actor <actor address> <sector>
* **--actor**: Specifies the actor address.

Consequences: Sealing early can speed up the process, but it may result in inefficiencies if all deals are not batched correctly.

## Offline Verified DDO deals
Curio only supports offline verified DDO deals as of now. The allocation must be created by the client for the piece and handed over to the SP alongside the data.


### How to create allocation
Clients can create allocation using the `sptool toolbox` or other methods.

```shell
sptool --actor t01000 toolbox mk12-client allocate -p <MINER ID> --piece-cid <COMMP> --piece-size <PIECE SIZE>
```

### Start a DDO deal
Storage providers can onboard the DDO deal using the below command.

```shell
curio market ddo --actor <MINER ID> <client-address> <allocation-id>
```

Since this is an offline deal, user must either make the data available via PieceLocator or add a data URL for this offline deal.
10 changes: 5 additions & 5 deletions documentation/en/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ To build Curio, you need a working installation of [Go](https://golang.org/dl/):
Example of an OLD version's CLI download:
```shell
wget -c https://golang.org/dl/go1.22.3.linux-amd64.tar.gz -O - | sudo tar -xz -C /usr/local
wget -c https://golang.org/dl/go1.23.6.linux-amd64.tar.gz -O - | sudo tar -xz -C /usr/local
```
{% hint style="info" %}
Expand Down Expand Up @@ -181,9 +181,9 @@ This will put `curio` in `/usr/local/bin`. `curio` will use the `$HOME/.curio` f
Run `curio --version`
```md
curio version 1.23.0+mainnet+git_ae625a5_2024-08-21T15:21:45+04:00
curio version 1.24.5-rc1+mainnet+git_214226e7_2025-02-19T17:02:54+04:00
# or
curio version 1.23.0+calibnet+git_ae625a5_2024-08-21T15:21:45+04:00
curio version 1.24.5-rc1+calibnet+git_214226e7_2025-02-19T17:02:54+04:00
```
1. You should now have Curio installed. You can now [finish setting up the Curio node](https://lotus.filecoin.io/storage-providers/curio/setup/).
Expand Down Expand Up @@ -291,9 +291,9 @@ The installation instructions are different depending on which CPU is in your Ma
6. Run `curio --version`
```md
curio version 1.23.0+mainnet+git_ae625a5_2024-08-21T15:21:45+04:00
curio version 1.24.5-rc1+mainnet+git_214226e7_2025-02-19T17:02:54+04:00
# or
curio version 1.23.0+calibnet+git_ae625a5_2024-08-21T15:21:45+04:00
curio version 1.24.5-rc1+calibnet+git_214226e7_2025-02-19T17:02:54+04:00
```
7. You should now have Curio installed. You can now [set up a new Curio cluster or migrating from Lotus-Miner](https://lotus.filecoin.io/storage-providers/curio/setup/).
Expand Down
78 changes: 54 additions & 24 deletions market/mk12/mk12.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package mk12
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -49,6 +50,7 @@ type MK12API interface {
StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateVerifiedClientStatus(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
WalletSign(context.Context, address.Address, []byte) (*crypto.Signature, error)
}

Expand Down Expand Up @@ -780,14 +782,18 @@ FROM joined
func (m *MK12) applyFilters(ctx context.Context, deal *ProviderDealState) *validationError {

var clientRules []struct {
Name string `db:"name"`
Wallets []string `db:"wallets"`
PeerIDs []string `db:"peer_id"`
PeerIDs []string `db:"peer_ids"`
PricingFilters []string `db:"pricing_filters"`
MaxDealsPerHour int64 `db:"max_deals_per_hour"`
MaxDealSizePerHour int64 `db:"max_deal_size_per_hour"`
}

var skipDefaultAsk bool

err := m.db.Select(ctx, &clientRules, `SELECT
name,
wallets,
peer_ids,
pricing_filters,
Expand All @@ -799,22 +805,30 @@ func (m *MK12) applyFilters(ctx context.Context, deal *ProviderDealState) *valid
return &validationError{error: xerrors.Errorf("failed to query the client rules from DB: %w", err)}
}

log.Debugw("Applicable Deal Client Filters", "Deal", deal.DealUuid.String(), "Client rules", "client_rules", clientRules)

// Check if we have any client rules and match them to client details
for i := range clientRules {
if lo.Contains(clientRules[i].Wallets, deal.ClientDealProposal.Proposal.Client.String()) || lo.Contains(clientRules[i].PeerIDs, deal.ClientPeerID.String()) {
client, err := m.api.StateLookupID(ctx, deal.ClientDealProposal.Proposal.Client, types.EmptyTSK)
if err != nil {
return &validationError{error: xerrors.Errorf("wallet not found: %w", err)}
}
if lo.Contains(clientRules[i].Wallets, deal.ClientDealProposal.Proposal.Client.String()) || lo.Contains(clientRules[i].PeerIDs, deal.ClientPeerID.String()) || lo.Contains(clientRules[i].Wallets, client.String()) {
// Check if Cumulative Storage size has not exceeded the specified limit
if clientRules[i].MaxDealSizePerHour > 0 {
var size int64
err = m.db.QueryRow(ctx, `SELECT COALESCE(SUM(piece_size), 0) AS total_piece_size
FROM market_mk12_deals
WHERE created_at >= NOW() - INTERVAL '1 hour'
AND (
client_peer_id = 'provided_client_peer_id'
OR proposal->>'Client' = 'desired_client_value'
)`).Scan(&size)
client_peer_id = $1
OR proposal->>'Client' = $2
OR proposal->>'Client' = $3
)`, deal.ClientPeerID.String(), deal.ClientDealProposal.Proposal.Client.String(), client.String()).Scan(&size)
if err != nil {
return &validationError{error: xerrors.Errorf("failed to query the cummulative size from DB: %w", err)}
}
log.Debugw("MaxDealSizePerHour Check", "Deal", deal.DealUuid.String(), "Client Rule", clientRules[i].Name, "MaxDealSizePerHour", "size", size, "max", clientRules[i].MaxDealSizePerHour)
if size > clientRules[i].MaxDealSizePerHour {
return &validationError{reason: "deal rejected as cumulative size of deals in past 1 hour has reached the maximum allowed for the client, please retry in some time"}
}
Expand All @@ -826,17 +840,21 @@ func (m *MK12) applyFilters(ctx context.Context, deal *ProviderDealState) *valid
WHERE created_at >= NOW() - INTERVAL '1 hour'
AND (
client_peer_id = $1
OR proposal->>'Client' = $2)`, deal.ClientPeerID.String(),
deal.ClientDealProposal.Proposal.Client.String()).Scan(&dealCount)
OR proposal->>'Client' = $2
OR proposal->>'Client' = $3)`, deal.ClientPeerID.String(), deal.ClientDealProposal.Proposal.Client.String(),
client.String()).Scan(&dealCount)
if err != nil {
return &validationError{error: xerrors.Errorf("failed to query the deal count from DB: %w", err)}
}
log.Debugw("MaxDealsPerHour Check", "Deal", deal.DealUuid.String(), "Client Rule", clientRules[i].Name, "MaxDealsPerHour", "count", dealCount, "max", clientRules[i].MaxDealsPerHour)
if dealCount >= clientRules[i].MaxDealsPerHour {
return &validationError{reason: "deal rejected as maximum allowed deals per hour limit has been reached for the client, please retry in some time"}
}
}
// Apply pricing filters
if len(clientRules[i].PricingFilters) > 0 {
log.Debugw("Applicable Pricing Filters", "Deal", deal.DealUuid.String(), "Client Rule", clientRules[i].Name, "Pricing Filters", clientRules[i].PricingFilters)
skipDefaultAsk = true
var priceFilters []struct {
MinDur int64 `db:"min_duration_days"`
MaxDur int64 `db:"max_duration_days"`
Expand All @@ -859,58 +877,70 @@ func (m *MK12) applyFilters(ctx context.Context, deal *ProviderDealState) *valid
}
ret := new(validationError)
for j := range priceFilters {
log.Debugw("Applying Pricing Filter", "Deal", deal.DealUuid.String(), "Client Rule", clientRules[i].Name,
"Filter Verified", priceFilters[j].Verified, "Filter MinSize", priceFilters[j].MinSize, "Filter MaxSize", priceFilters[j].MaxSize,
"Filter MinDur", priceFilters[j].MinDur, "Filter MaxDur", priceFilters[j].MaxDur, "Filter Price", priceFilters[j].Price,
"Deal Verified", deal.ClientDealProposal.Proposal.VerifiedDeal, "Deal PieceSize", deal.ClientDealProposal.Proposal.PieceSize,
"Deal Duration", deal.ClientDealProposal.Proposal.Duration, "Deal Price", deal.ClientDealProposal.Proposal.StoragePricePerEpoch)
// Skip filters which are not meant for verified/unverified deals
if !(deal.ClientDealProposal.Proposal.VerifiedDeal && priceFilters[j].Verified) {
if deal.ClientDealProposal.Proposal.VerifiedDeal != priceFilters[j].Verified {
continue
}
if deal.ClientDealProposal.Proposal.PieceSize > abi.PaddedPieceSize(priceFilters[j].MaxSize) {
ret.reason = "deal rejected as piece size is greater than the maximum allowed by the pricing filter"
continue
return ret
}
if deal.ClientDealProposal.Proposal.PieceSize < abi.PaddedPieceSize(priceFilters[j].MinSize) {
ret.reason = "deal rejected as piece size is smaller than the minimum allowed by the pricing filter"
continue
return ret
}
if deal.ClientDealProposal.Proposal.Duration() > abi.ChainEpoch(builtin.EpochsInDay*priceFilters[j].MaxDur) {
ret.reason = "deal rejected as duration is greater than the maximum allowed by the pricing filter"
continue
return ret
}
if deal.ClientDealProposal.Proposal.Duration() < abi.ChainEpoch(builtin.EpochsInDay*priceFilters[j].MinDur) {
ret.reason = "deal rejected as duration is smaller than the minimum allowed by the pricing filter"
continue
return ret
}
if deal.ClientDealProposal.Proposal.StoragePricePerEpoch.LessThan(big.NewInt(priceFilters[j].Price)) {
ret.reason = "deal rejected as storage price per epoch is less than the amount allowed by the pricing filter"
continue
return ret
}
// Accept the deal if any price filter reaches here
return nil
}
// If none of the deal filters matched then we should reject the deal instead if going to default
if ret.reason != "" {
return ret
}
}
}
}

// If no client/pricing rules are found or match the client then apply default Ask validation
if err := m.validateAsk(ctx, deal); err != nil {
return &validationError{error: err}
if !skipDefaultAsk {
if err := m.validateAsk(ctx, deal); err != nil {
return &validationError{error: err}
}
}

return nil
}

// applyAllowList checks if the client making the deal proposal is allowed by the provider
// based on the market_mk12_allow_list table in the database
func (m *MK12) applyAllowList(ctx context.Context, deal *ProviderDealState) (bool, error) {
var allowed bool
err := m.db.QueryRow(ctx, `SELECT status FROM market_allow_list WHERE wallet = $1`, deal.ClientDealProposal.Proposal.Client.String()).Scan(&allowed)
client, err := m.api.StateLookupID(ctx, deal.ClientDealProposal.Proposal.Client, types.EmptyTSK)
if err != nil {
return false, xerrors.Errorf("wallet not found: %w", err)
}

var allowed sql.NullBool
err = m.db.QueryRow(ctx, `SELECT status FROM market_allow_list WHERE wallet = $1 OR wallet = $2`, deal.ClientDealProposal.Proposal.Client.String(), client.String()).Scan(&allowed)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return false, xerrors.Errorf("failed to query the allow list status from DB: %w", err)
}
return !m.cfg.Market.StorageMarketConfig.MK12.DenyUnknownClients, nil
}
return allowed, nil

if allowed.Valid {
return allowed.Bool, nil
}

return false, nil
}
Loading

0 comments on commit da0f41d

Please sign in to comment.