Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions OpenApi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -683,34 +683,38 @@ components:
rsk:
type: string
type: object
SummariesResponse:
properties:
peginSummary:
$ref: '#/components/schemas/SummaryData'
type: object
pegoutSummary:
$ref: '#/components/schemas/SummaryData'
type: object
type: object
SummaryData:
properties:
confirmedQuotesCount:
acceptedQuotesCount:
type: integer
lpEarnings:
type: string
$ref: '#/components/schemas/Wei'
paidQuotesAmount:
$ref: '#/components/schemas/Wei'
paidQuotesCount:
type: integer
refundedQuotesCount:
type: integer
totalAcceptedQuotedAmount:
type: string
totalAcceptedQuotesCount:
type: integer
$ref: '#/components/schemas/Wei'
totalFeesCollected:
type: string
$ref: '#/components/schemas/Wei'
totalPenaltyAmount:
type: string
totalQuotedAmount:
type: string
$ref: '#/components/schemas/Wei'
totalQuotesCount:
type: integer
type: object
SummaryResult:
properties:
peginSummary:
$ref: '#/components/schemas/SummaryData'
type: object
pegoutSummary:
$ref: '#/components/schemas/SummaryData'
type: object
type: object
Wei: {}
entities.Wei: {}
pkg.AcceptQuoteRequest:
properties:
quoteHash:
Expand Down Expand Up @@ -1086,7 +1090,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/SummariesResponse'
$ref: '#/components/schemas/SummaryResult'
description: Financial data for the given period
summary: Summaries
/userQuotes:
Expand Down
200 changes: 0 additions & 200 deletions internal/adapters/dataproviders/database/mongo/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package mongo

import (
"context"
"fmt"
"time"

log "github.com/sirupsen/logrus"

"go.mongodb.org/mongo-driver/bson"
)

const (
Expand Down Expand Up @@ -78,200 +75,3 @@ func (c *Connection) CheckConnection(ctx context.Context) bool {
}
return err == nil
}

type QuoteResult[Q any, R QuoteHashProvider] struct {
Quotes []Q
RetainedQuotes []R
QuoteHashToIndex map[string]int
Error error
}

type QuoteQuery struct {
Ctx context.Context
Conn *Connection
StartDate time.Time
EndDate time.Time
QuoteCollection string
RetainedCollection string
}

func ListQuotesByDateRange[Q any, R QuoteHashProvider](
query QuoteQuery,
mapper func(bson.D) Q,
) QuoteResult[Q, R] {
dbCtx, cancel := context.WithTimeout(query.Ctx, query.Conn.timeout)
defer cancel()
quotes, quoteHashes, err := fetchQuotesByDateRange(dbCtx, query.Conn, query.StartDate, query.EndDate, query.QuoteCollection, mapper)
if err != nil {
return QuoteResult[Q, R]{Error: err}
}
quoteHashToIndex := make(map[string]int, len(quoteHashes))
for i, hash := range quoteHashes {
if hash != "" {
quoteHashToIndex[hash] = i
}
}
retainedQuotes, additionalHashes, err := fetchRetainedQuotes[R](dbCtx, query.Conn, query.StartDate, query.EndDate, query.RetainedCollection, quoteHashes)
if err != nil {
return QuoteResult[Q, R]{Error: err}
}
if len(additionalHashes) > 0 {
additionalQuotes, additionalHashIndices, err := fetchAdditionalQuotes(dbCtx, query.Conn, query.QuoteCollection, additionalHashes, mapper)
if err != nil {
log.Errorf("Error processing additional quotes: %v", err)
} else {
baseIndex := len(quotes)
for i, hash := range additionalHashIndices {
if hash != "" {
quoteHashToIndex[hash] = baseIndex + i
}
}
quotes = append(quotes, additionalQuotes...)
}
}
logDbInteraction(Read, fmt.Sprintf("Found %d quotes and %d retained quotes in date range",
len(quotes), len(retainedQuotes)))
return QuoteResult[Q, R]{
Quotes: quotes,
RetainedQuotes: retainedQuotes,
QuoteHashToIndex: quoteHashToIndex,
Error: nil,
}
}

func fetchQuotesByDateRange[Q any](
ctx context.Context,
conn *Connection,
startDate, endDate time.Time,
collectionName string,
mapper func(bson.D) Q,
) ([]Q, []string, error) {
quoteFilter := bson.D{
{Key: "agreement_timestamp", Value: bson.D{
{Key: "$gte", Value: startDate.Unix()},
{Key: "$lte", Value: endDate.Unix()},
}},
}
var storedQuotes []bson.D
quoteCursor, err := conn.Collection(collectionName).Find(ctx, quoteFilter)
if err != nil {
return nil, nil, err
}
if err = quoteCursor.All(ctx, &storedQuotes); err != nil {
return nil, nil, err
}
quoteHashes := make([]string, 0, len(storedQuotes))
quotes := make([]Q, 0, len(storedQuotes))
for _, stored := range storedQuotes {
quoteObj := mapper(stored)
quotes = append(quotes, quoteObj)
hashValue, ok := getStringValueFromBSON(stored, "hash")
if ok {
quoteHashes = append(quoteHashes, hashValue)
}
}
return quotes, quoteHashes, nil
}

func getStringValueFromBSON(doc bson.D, key string) (string, bool) {
data, err := bson.Marshal(doc)
if err != nil {
return "", false
}
rawValue := bson.Raw(data).Lookup(key)
return rawValue.StringValueOK()
}

type QuoteHashProvider interface {
GetQuoteHash() string
}

func fetchRetainedQuotes[R QuoteHashProvider](
ctx context.Context,
conn *Connection,
startDate, endDate time.Time,
collectionName string,
existingQuoteHashes []string,
) ([]R, []string, error) {
retainedFilter := createRetainedFilter(startDate, endDate, existingQuoteHashes)
var retainedQuotes []R
retainedCursor, err := conn.Collection(collectionName).Find(ctx, retainedFilter)
if err != nil {
return nil, nil, err
}
if err = retainedCursor.All(ctx, &retainedQuotes); err != nil {
return nil, nil, err
}
additionalHashes := findAdditionalQuoteHashes(retainedQuotes, existingQuoteHashes)
return retainedQuotes, additionalHashes, nil
}

func createRetainedFilter(startDate, endDate time.Time, quoteHashes []string) bson.D {
return bson.D{
{Key: "$or", Value: bson.A{
bson.D{{Key: "quote_hash", Value: bson.D{
{Key: "$in", Value: quoteHashes},
}}},
bson.D{
{Key: "created_at", Value: bson.D{
{Key: "$gte", Value: startDate.Unix()},
{Key: "$lte", Value: endDate.Unix()},
}},
},
}},
}
}

func findAdditionalQuoteHashes[R QuoteHashProvider](retainedQuotes []R, existingQuoteHashes []string) []string {
existingMap := make(map[string]bool, len(existingQuoteHashes))
for _, hash := range existingQuoteHashes {
existingMap[hash] = true
}
additionalMap := make(map[string]bool)
for i := range retainedQuotes {
hash := retainedQuotes[i].GetQuoteHash()
if !existingMap[hash] {
additionalMap[hash] = true
}
}
additionalHashes := make([]string, 0, len(additionalMap))
for hash := range additionalMap {
additionalHashes = append(additionalHashes, hash)
}
return additionalHashes
}

func fetchAdditionalQuotes[Q any](
ctx context.Context,
conn *Connection,
collectionName string,
hashes []string,
mapper func(bson.D) Q,
) ([]Q, []string, error) {
quoteFilter := bson.D{
{Key: "hash", Value: bson.D{
{Key: "$in", Value: hashes},
}},
}
var storedQuotes []bson.D
quoteCursor, err := conn.Collection(collectionName).Find(ctx, quoteFilter)
if err != nil {
return nil, nil, err
}
if err = quoteCursor.All(ctx, &storedQuotes); err != nil {
return nil, nil, err
}
quotes := make([]Q, 0, len(storedQuotes))
resultHashes := make([]string, 0, len(storedQuotes))
for _, stored := range storedQuotes {
quoteObj := mapper(stored)
quotes = append(quotes, quoteObj)
hashValue, ok := getStringValueFromBSON(stored, "hash")
if ok {
resultHashes = append(resultHashes, hashValue)
} else {
resultHashes = append(resultHashes, "")
}
}
return quotes, resultHashes, nil
}
Loading
Loading