Skip to content

Commit 3b23ab2

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents eeaabee + bc94732 commit 3b23ab2

15 files changed

Lines changed: 920 additions & 121 deletions

File tree

Makefile

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,23 @@ test: .bin-image
2727
test-integration: .bin-image
2828
docker run -t --rm -e PACKAGER=$(PACKAGER) -e BB_BUILD_ENV=$(BB_BUILD_ENV) -e GITCOMMIT=$(GITCOMMIT) $(BB_RPC_ENV) -v "$(CURDIR):/src" --network="host" $(BIN_IMAGE) make test-integration ARGS="$(ARGS)"
2929

30+
# `make test-e2e [coin] [url]` — e.g. `make test-e2e bitcoin https://btc.trezor.io`.
31+
# Extra goals after test-e2e are consumed as positional args (coin, base URL)
32+
# and turned into OPENAPI_COINS / BB_DEV_API_URL_HTTP_<coin> for the test runner.
33+
ifeq (test-e2e,$(firstword $(MAKECMDGOALS)))
34+
E2E_ARGS := $(wordlist 2,$(words $(MAKECMDGOALS)),$(MAKECMDGOALS))
35+
ifneq ($(E2E_ARGS),)
36+
# no-op catch-all so make does not try to build the positional args as targets
37+
%:
38+
@:
39+
endif
40+
endif
41+
3042
test-e2e:
3143
@if [ ! -x tests/openapi/node_modules/.bin/redocly ]; then npm ci --prefix tests/openapi --prefer-offline --no-audit --no-fund; fi
44+
@coin="$(word 1,$(E2E_ARGS))"; url="$(word 2,$(E2E_ARGS))"; \
45+
if [ -n "$$coin" ]; then export OPENAPI_COINS="$$coin"; fi; \
46+
if [ -n "$$url" ]; then export "BB_DEV_API_URL_HTTP_$$(printf %s "$$coin" | tr - _)=$$url"; fi; \
3247
contrib/tests/run-openapi-tests.sh
3348

3449
test-connectivity: .bin-image

api/worker.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2045,7 +2045,7 @@ func (w *Worker) waitForBackendSync() {
20452045
}
20462046
}
20472047

2048-
func (w *Worker) getAddrDescUtxo(addrDesc bchain.AddressDescriptor, ba *db.AddrBalance, onlyConfirmed bool, onlyMempool bool) (Utxos, error) {
2048+
func (w *Worker) getAddrDescUtxo(addrDesc bchain.AddressDescriptor, ba *db.AddrBalance, onlyConfirmed bool, onlyMempool bool, knownBestHeight *uint32) (Utxos, error) {
20492049
w.waitForBackendSync()
20502050
var err error
20512051
utxos := make(Utxos, 0, 8)
@@ -2115,11 +2115,16 @@ func (w *Worker) getAddrDescUtxo(addrDesc bchain.AddressDescriptor, ba *db.AddrB
21152115
}
21162116
// ba can be nil if the address is only in mempool!
21172117
if ba != nil && len(ba.Utxos) > 0 {
2118-
b, _, err := w.db.GetBestBlock()
2119-
if err != nil {
2120-
return nil, err
2118+
var bestheight int
2119+
if knownBestHeight != nil {
2120+
bestheight = int(*knownBestHeight)
2121+
} else {
2122+
b, _, err := w.db.GetBestBlock()
2123+
if err != nil {
2124+
return nil, err
2125+
}
2126+
bestheight = int(b)
21212127
}
2122-
bestheight := int(b)
21232128
var checksum big.Int
21242129
checksum.Set(&ba.BalanceSat)
21252130
// go backwards to get the newest first
@@ -2175,7 +2180,7 @@ func (w *Worker) GetAddressUtxo(address string, onlyConfirmed bool) (Utxos, erro
21752180
if err != nil {
21762181
return nil, NewAPIError(fmt.Sprintf("Invalid address '%v', %v", address, err), true)
21772182
}
2178-
r, err := w.getAddrDescUtxo(addrDesc, nil, onlyConfirmed, false)
2183+
r, err := w.getAddrDescUtxo(addrDesc, nil, onlyConfirmed, false, nil)
21792184
if err != nil {
21802185
return nil, err
21812186
}

api/xpub.go

Lines changed: 66 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ type xpubData struct {
6969
txCountEstimate uint32
7070
sentSat big.Int
7171
balanceSat big.Int
72+
mergedTxids xpubTxids
7273
addresses [][]xpubAddress
7374
}
7475

@@ -197,10 +198,32 @@ func (w *Worker) xpubGetAddressTxids(addrDesc bchain.AddressDescriptor, mempool
197198
return txs, complete, nil
198199
}
199200

200-
func (w *Worker) xpubCheckAndLoadTxids(ad *xpubAddress, filter *AddressFilter, maxHeight uint32, maxResults int) error {
201+
func isUnfilteredXpubTxidFilter(filter *AddressFilter) bool {
202+
return filter == nil || filter.FromHeight == 0 && filter.ToHeight == 0 && filter.Vout == AddressFilterVoutOff
203+
}
204+
205+
func mergeXpubTxids(data *xpubData) xpubTxids {
206+
txcMap := make(map[string]struct{}, data.txCountEstimate)
207+
txc := make(xpubTxids, 0, data.txCountEstimate)
208+
for _, da := range data.addresses {
209+
for i := range da {
210+
for _, txid := range da[i].txids {
211+
if _, foundTx := txcMap[txid.txid]; foundTx {
212+
continue
213+
}
214+
txcMap[txid.txid] = struct{}{}
215+
txc = append(txc, txid)
216+
}
217+
}
218+
}
219+
sort.Stable(txc)
220+
return txc
221+
}
222+
223+
func (w *Worker) xpubCheckAndLoadTxids(ad *xpubAddress, maxHeight uint32, maxResults int) (bool, error) {
201224
// skip if not used
202225
if ad.balance == nil {
203-
return nil
226+
return false, nil
204227
}
205228
// if completely loaded, check if there are not some new txs and load if necessary
206229
if ad.complete {
@@ -214,14 +237,14 @@ func (w *Worker) xpubCheckAndLoadTxids(ad *xpubAddress, filter *AddressFilter, m
214237
glog.Warning("xpubCheckAndLoadTxids inconsistency ", ad.addrDesc, ", ad.txs=", ad.txs, ", ad.balance.Txs=", ad.balance.Txs)
215238
}
216239
}
217-
return err
240+
return err == nil, err
218241
}
219-
return nil
242+
return false, nil
220243
}
221244
// load all txids to get paging correctly
222245
newTxids, complete, err := w.xpubGetAddressTxids(ad.addrDesc, false, 0, maxHeight, maxInt)
223246
if err != nil {
224-
return err
247+
return false, err
225248
}
226249
ad.txids = newTxids
227250
ad.complete = complete
@@ -232,7 +255,7 @@ func (w *Worker) xpubCheckAndLoadTxids(ad *xpubAddress, filter *AddressFilter, m
232255
glog.Warning("xpubCheckAndLoadTxids inconsistency ", ad.addrDesc, ", ad.txs=", ad.txs, ", ad.balance.Txs=", ad.balance.Txs)
233256
}
234257
}
235-
return nil
258+
return true, nil
236259
}
237260

238261
func (w *Worker) xpubDerivedAddressBalance(data *xpubData, ad *xpubAddress) (bool, error) {
@@ -420,6 +443,7 @@ func (w *Worker) getXpubData(xd *bchain.XpubDescriptor, page int, txsOnPage int,
420443
data.balanceSat = *new(big.Int)
421444
data.sentSat = *new(big.Int)
422445
data.txCountEstimate = 0
446+
data.mergedTxids = nil
423447
var minDerivedIndex int
424448
totalDerived := 0
425449
for i, change := range xd.ChangeIndexes {
@@ -431,13 +455,22 @@ func (w *Worker) getXpubData(xd *bchain.XpubDescriptor, page int, txsOnPage int,
431455
}
432456
}
433457
if option >= AccountDetailsTxidHistory {
458+
txidsChanged := false
434459
for _, da := range data.addresses {
435460
for i := range da {
436-
if err = w.xpubCheckAndLoadTxids(&da[i], filter, bestheight, (page+1)*txsOnPage); err != nil {
461+
changed := false
462+
if changed, err = w.xpubCheckAndLoadTxids(&da[i], bestheight, (page+1)*txsOnPage); err != nil {
437463
return nil, 0, inCache, err
438464
}
465+
txidsChanged = txidsChanged || changed
439466
}
440467
}
468+
if txidsChanged {
469+
data.mergedTxids = nil
470+
}
471+
if isUnfilteredXpubTxidFilter(filter) && data.mergedTxids == nil {
472+
data.mergedTxids = mergeXpubTxids(&data)
473+
}
441474
}
442475
}
443476
data.accessed = time.Now().Unix()
@@ -564,30 +597,35 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc
564597
}
565598
}
566599
if option >= AccountDetailsTxidHistory {
567-
txcMap := make(map[string]bool)
568-
txc = make(xpubTxids, 0, 32)
569-
for _, da := range data.addresses {
570-
for i := range da {
571-
ad := &da[i]
572-
for _, txid := range ad.txids {
573-
added, foundTx := txcMap[txid.txid]
574-
// count txs regardless of filter but only once
575-
if !foundTx {
576-
txCount++
577-
}
578-
// add tx only once
579-
if !added {
580-
add := txidFilter == nil || txidFilter(&txid, ad)
581-
txcMap[txid.txid] = add
582-
if add {
583-
txc = append(txc, txid)
600+
if txidFilter == nil {
601+
// Shared with xpubData cache; do not mutate in this request path.
602+
txc = data.mergedTxids
603+
if txc == nil {
604+
txc = mergeXpubTxids(data)
605+
}
606+
txCount = len(txc)
607+
} else {
608+
txcMap := make(map[string]bool)
609+
txc = make(xpubTxids, 0, 32)
610+
for _, da := range data.addresses {
611+
for i := range da {
612+
ad := &da[i]
613+
for _, txid := range ad.txids {
614+
added := txcMap[txid.txid]
615+
// add tx only once
616+
if !added {
617+
add := txidFilter(&txid, ad)
618+
txcMap[txid.txid] = add
619+
if add {
620+
txc = append(txc, txid)
621+
}
584622
}
585623
}
586624
}
587625
}
626+
sort.Stable(txc)
627+
txCount = len(txcMap)
588628
}
589-
sort.Stable(txc)
590-
txCount = len(txcMap)
591629
totalResults := txCount
592630
if filtered {
593631
totalResults = -1
@@ -692,7 +730,7 @@ func (w *Worker) GetXpubUtxo(xpub string, onlyConfirmed bool, gap int) (Utxos, e
692730
if err != nil {
693731
return nil, err
694732
}
695-
data, _, inCache, err := w.getXpubData(xd, 0, 1, AccountDetailsBasic, &AddressFilter{
733+
data, bestheight, inCache, err := w.getXpubData(xd, 0, 1, AccountDetailsBasic, &AddressFilter{
696734
Vout: AddressFilterVoutOff,
697735
OnlyConfirmed: onlyConfirmed,
698736
}, gap)
@@ -710,7 +748,7 @@ func (w *Worker) GetXpubUtxo(xpub string, onlyConfirmed bool, gap int) (Utxos, e
710748
}
711749
onlyMempool = true
712750
}
713-
utxos, err := w.getAddrDescUtxo(ad.addrDesc, ad.balance, onlyConfirmed, onlyMempool)
751+
utxos, err := w.getAddrDescUtxo(ad.addrDesc, ad.balance, onlyConfirmed, onlyMempool, &bestheight)
714752
if err != nil {
715753
return nil, err
716754
}

api/xpub_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,48 @@ func TestTrimXpubCacheItemsLocked(t *testing.T) {
5252
t.Fatal("second oldest cache entry was not evicted")
5353
}
5454
}
55+
56+
func TestMergeXpubTxidsDeduplicatesAndSorts(t *testing.T) {
57+
data := &xpubData{
58+
txCountEstimate: 4,
59+
addresses: [][]xpubAddress{
60+
{
61+
{txids: xpubTxids{
62+
{txid: "duplicate", height: 5, inputOutput: txOutput},
63+
{txid: "newest", height: 7, inputOutput: txOutput},
64+
}},
65+
},
66+
{
67+
{txids: xpubTxids{
68+
{txid: "duplicate", height: 5, inputOutput: txInput},
69+
{txid: "same-height-input", height: 5, inputOutput: txInput},
70+
}},
71+
},
72+
},
73+
}
74+
75+
txids := mergeXpubTxids(data)
76+
got := make([]string, len(txids))
77+
for i := range txids {
78+
got[i] = txids[i].txid
79+
}
80+
want := []string{"newest", "same-height-input", "duplicate"}
81+
if fmt.Sprint(got) != fmt.Sprint(want) {
82+
t.Fatalf("mergeXpubTxids order = %v, want %v", got, want)
83+
}
84+
if txids[2].inputOutput != txOutput {
85+
t.Fatal("mergeXpubTxids did not preserve the first duplicate occurrence")
86+
}
87+
}
88+
89+
func TestIsUnfilteredXpubTxidFilter(t *testing.T) {
90+
if !isUnfilteredXpubTxidFilter(&AddressFilter{Vout: AddressFilterVoutOff}) {
91+
t.Fatal("default xpub txid filter should be unfiltered")
92+
}
93+
if isUnfilteredXpubTxidFilter(&AddressFilter{Vout: AddressFilterVoutInputs}) {
94+
t.Fatal("input filter should not be treated as unfiltered")
95+
}
96+
if isUnfilteredXpubTxidFilter(&AddressFilter{Vout: AddressFilterVoutOff, FromHeight: 1}) {
97+
t.Fatal("height filter should not be treated as unfiltered")
98+
}
99+
}

server/websocket.go

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.
159159
s.upgrader = &websocket.Upgrader{
160160
ReadBufferSize: 1024 * 32,
161161
WriteBufferSize: 1024 * 32,
162+
WriteBufferPool: &sync.Pool{},
162163
CheckOrigin: s.checkOrigin,
163164
EnableCompression: true,
164165
}
@@ -1797,53 +1798,36 @@ func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *ap
17971798

17981799
func (s *WebsocketServer) getNewTxSubscriptions(vins []bchain.MempoolVin, vouts []bchain.Vout, tokenTransfers bchain.TokenTransfers, internalTransfers []bchain.EthereumInternalTransfer, newBlockTxsOnly bool) map[string]struct{} {
17991800
// check if there is any subscription in inputs, outputs and transfers
1800-
s.addressSubscriptionsLock.Lock()
1801-
defer s.addressSubscriptionsLock.Unlock()
1802-
subscribed := make(map[string]struct{})
1803-
hasSubscription := func(sad string) bool {
1804-
as, ok := s.addressSubscriptions[sad]
1805-
if !ok || len(as) == 0 {
1806-
return false
1807-
}
1808-
if !newBlockTxsOnly {
1809-
return true
1810-
}
1811-
for _, details := range as {
1812-
if details.publishNewBlockTxs {
1813-
return true
1814-
}
1801+
candidates := make(map[string]struct{})
1802+
addAddrDesc := func(addrDesc bchain.AddressDescriptor) {
1803+
if len(addrDesc) > 0 {
1804+
candidates[string(addrDesc)] = struct{}{}
18151805
}
1816-
return false
18171806
}
18181807
processAddress := func(address string) {
18191808
if addrDesc, err := s.chainParser.GetAddrDescFromAddress(address); err == nil && len(addrDesc) > 0 {
1820-
sad := string(addrDesc)
1821-
if hasSubscription(sad) {
1822-
subscribed[sad] = struct{}{}
1823-
}
1809+
addAddrDesc(addrDesc)
18241810
}
18251811
}
18261812
processVout := func(vout bchain.Vout) {
18271813
if addrDesc, err := s.chainParser.GetAddrDescFromVout(&vout); err == nil && len(addrDesc) > 0 {
1828-
sad := string(addrDesc)
1829-
if hasSubscription(sad) {
1830-
subscribed[sad] = struct{}{}
1831-
}
1814+
addAddrDesc(addrDesc)
18321815
}
18331816
}
18341817
for i := range vins {
18351818
if sad := string(vins[i].AddrDesc); len(sad) > 0 {
1836-
if hasSubscription(sad) {
1837-
subscribed[sad] = struct{}{}
1838-
}
1839-
} else if s.chainParser.GetChainType() == bchain.ChainBitcoinType {
1840-
vout := int(vins[i].Vout)
1841-
if vout >= 0 && vout < len(vouts) {
1842-
processVout(vouts[vins[i].Vout])
1843-
}
1844-
} else if s.chainParser.GetChainType() == bchain.ChainEthereumType {
1845-
if len(vins[i].Addresses) > 0 {
1846-
processAddress(vins[i].Addresses[0])
1819+
candidates[sad] = struct{}{}
1820+
} else {
1821+
switch s.chainParser.GetChainType() {
1822+
case bchain.ChainBitcoinType:
1823+
vout := int(vins[i].Vout)
1824+
if vout >= 0 && vout < len(vouts) {
1825+
processVout(vouts[vout])
1826+
}
1827+
case bchain.ChainEthereumType:
1828+
if len(vins[i].Addresses) > 0 {
1829+
processAddress(vins[i].Addresses[0])
1830+
}
18471831
}
18481832
}
18491833
}
@@ -1858,6 +1842,26 @@ func (s *WebsocketServer) getNewTxSubscriptions(vins []bchain.MempoolVin, vouts
18581842
processAddress(internalTransfers[i].From)
18591843
processAddress(internalTransfers[i].To)
18601844
}
1845+
1846+
subscribed := make(map[string]struct{})
1847+
s.addressSubscriptionsLock.Lock()
1848+
defer s.addressSubscriptionsLock.Unlock()
1849+
for sad := range candidates {
1850+
as, ok := s.addressSubscriptions[sad]
1851+
if !ok || len(as) == 0 {
1852+
continue
1853+
}
1854+
if !newBlockTxsOnly {
1855+
subscribed[sad] = struct{}{}
1856+
continue
1857+
}
1858+
for _, details := range as {
1859+
if details.publishNewBlockTxs {
1860+
subscribed[sad] = struct{}{}
1861+
break
1862+
}
1863+
}
1864+
}
18611865
return subscribed
18621866
}
18631867

0 commit comments

Comments
 (0)