Skip to content

Commit b24d47f

Browse files
Wondertanwalldiss
andauthored
perf(shwap/bitswap): add session pools inside Getter (#3947)
In the Robusta environment this change: * Improved load distribution across BN/FNs * Sped up sampling Originally, I was testing Bitswap with this change but decided against including it in the patch, as it is solely an optimization. However, without this change on the rc tag, load distribution was rather strange. One LN sampled at 180/s, while all 4 others at 4/s. Adding pooling fixed the issue and resulted in all 5 LNs at about 100/s. Co-authored-by: Vlad <[email protected]>
1 parent 72c08f7 commit b24d47f

File tree

2 files changed

+166
-19
lines changed

2 files changed

+166
-19
lines changed

Diff for: share/shwap/p2p/bitswap/getter.go

+80-19
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ package bitswap
33
import (
44
"context"
55
"fmt"
6+
"sync"
67
"time"
78

89
"github.com/ipfs/boxo/blockstore"
910
"github.com/ipfs/boxo/exchange"
1011
"go.opentelemetry.io/otel"
1112
"go.opentelemetry.io/otel/attribute"
1213
"go.opentelemetry.io/otel/codes"
13-
"go.opentelemetry.io/otel/trace"
1414

1515
"github.com/celestiaorg/celestia-app/v3/pkg/wrapper"
1616
libshare "github.com/celestiaorg/go-square/v2/share"
@@ -30,8 +30,8 @@ type Getter struct {
3030
bstore blockstore.Blockstore
3131
availWndw time.Duration
3232

33-
availableSession exchange.Fetcher
34-
archivalSession exchange.Fetcher
33+
availablePool *pool
34+
archivalPool *pool
3535

3636
cancel context.CancelFunc
3737
}
@@ -42,7 +42,13 @@ func NewGetter(
4242
bstore blockstore.Blockstore,
4343
availWndw time.Duration,
4444
) *Getter {
45-
return &Getter{exchange: exchange, bstore: bstore, availWndw: availWndw}
45+
return &Getter{
46+
exchange: exchange,
47+
bstore: bstore,
48+
availWndw: availWndw,
49+
availablePool: newPool(exchange),
50+
archivalPool: newPool(exchange),
51+
}
4652
}
4753

4854
// Start kicks off internal fetching sessions.
@@ -57,12 +63,13 @@ func NewGetter(
5763
// with regular full node peers.
5864
func (g *Getter) Start() {
5965
ctx, cancel := context.WithCancel(context.Background())
60-
g.availableSession = g.exchange.NewSession(ctx)
61-
g.archivalSession = g.exchange.NewSession(ctx)
6266
g.cancel = cancel
67+
68+
g.availablePool.ctx = ctx
69+
g.availablePool.ctx = ctx
6370
}
6471

65-
// Stop shuts down Getter's internal fetching session.
72+
// Stop shuts down Getter's internal fetching getSession.
6673
func (g *Getter) Stop() {
6774
g.cancel()
6875
}
@@ -97,7 +104,12 @@ func (g *Getter) GetShares(
97104
blks[i] = sid
98105
}
99106

100-
ses := g.session(ctx, hdr)
107+
isArchival := g.isArchival(hdr)
108+
span.SetAttributes(attribute.Bool("is_archival", isArchival))
109+
110+
ses, release := g.getSession(isArchival)
111+
defer release()
112+
101113
err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithStore(g.bstore), WithFetcher(ses))
102114
if err != nil {
103115
span.RecordError(err)
@@ -156,7 +168,12 @@ func (g *Getter) GetEDS(
156168
blks[i] = blk
157169
}
158170

159-
ses := g.session(ctx, hdr)
171+
isArchival := g.isArchival(hdr)
172+
span.SetAttributes(attribute.Bool("is_archival", isArchival))
173+
174+
ses, release := g.getSession(isArchival)
175+
defer release()
176+
160177
err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(ses))
161178
if err != nil {
162179
span.RecordError(err)
@@ -210,7 +227,12 @@ func (g *Getter) GetNamespaceData(
210227
blks[i] = rndblk
211228
}
212229

213-
ses := g.session(ctx, hdr)
230+
isArchival := g.isArchival(hdr)
231+
span.SetAttributes(attribute.Bool("is_archival", isArchival))
232+
233+
ses, release := g.getSession(isArchival)
234+
defer release()
235+
214236
if err = Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(ses)); err != nil {
215237
span.RecordError(err)
216238
span.SetStatus(codes.Error, "Fetch")
@@ -230,17 +252,19 @@ func (g *Getter) GetNamespaceData(
230252
return nsShrs, nil
231253
}
232254

233-
// session decides which fetching session to use for the given header.
234-
func (g *Getter) session(ctx context.Context, hdr *header.ExtendedHeader) exchange.Fetcher {
235-
session := g.archivalSession
255+
// isArchival reports whether the header is for archival data
256+
func (g *Getter) isArchival(hdr *header.ExtendedHeader) bool {
257+
return !availability.IsWithinWindow(hdr.Time(), g.availWndw)
258+
}
236259

237-
isWithinAvailability := availability.IsWithinWindow(hdr.Time(), g.availWndw)
238-
if isWithinAvailability {
239-
session = g.availableSession
260+
// getSession takes a session out of the respective session pool
261+
func (g *Getter) getSession(isArchival bool) (ses exchange.Fetcher, release func()) {
262+
if isArchival {
263+
ses = g.archivalPool.get()
264+
return ses, func() { g.archivalPool.put(ses) }
240265
}
241-
242-
trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("within_availability", isWithinAvailability))
243-
return session
266+
ses = g.availablePool.get()
267+
return ses, func() { g.availablePool.put(ses) }
244268
}
245269

246270
// edsFromRows imports given Rows and computes EDS out of them, assuming enough Rows were provided.
@@ -274,3 +298,40 @@ func edsFromRows(roots *share.AxisRoots, rows []shwap.Row) (*rsmt2d.ExtendedData
274298

275299
return square, nil
276300
}
301+
302+
// pool is a pool of Bitswap sessions.
303+
type pool struct {
304+
lock sync.Mutex
305+
sessions []exchange.Fetcher
306+
ctx context.Context
307+
exchange exchange.SessionExchange
308+
}
309+
310+
func newPool(ex exchange.SessionExchange) *pool {
311+
return &pool{
312+
exchange: ex,
313+
sessions: make([]exchange.Fetcher, 0),
314+
}
315+
}
316+
317+
// get returns a session from the pool or creates a new one if the pool is empty.
318+
func (p *pool) get() exchange.Fetcher {
319+
p.lock.Lock()
320+
defer p.lock.Unlock()
321+
322+
if len(p.sessions) == 0 {
323+
return p.exchange.NewSession(p.ctx)
324+
}
325+
326+
ses := p.sessions[len(p.sessions)-1]
327+
p.sessions = p.sessions[:len(p.sessions)-1]
328+
return ses
329+
}
330+
331+
// put returns a session to the pool.
332+
func (p *pool) put(ses exchange.Fetcher) {
333+
p.lock.Lock()
334+
defer p.lock.Unlock()
335+
336+
p.sessions = append(p.sessions, ses)
337+
}

Diff for: share/shwap/p2p/bitswap/getter_test.go

+86
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package bitswap
22

33
import (
4+
"context"
5+
"sync"
46
"testing"
57

8+
"github.com/ipfs/boxo/exchange"
69
"github.com/stretchr/testify/require"
710

811
libshare "github.com/celestiaorg/go-square/v2/share"
@@ -28,3 +31,86 @@ func TestEDSFromRows(t *testing.T) {
2831
require.NoError(t, err)
2932
require.True(t, edsIn.Equals(edsOut))
3033
}
34+
35+
// mockSessionExchange is a mock implementation of exchange.SessionExchange
36+
type mockSessionExchange struct {
37+
exchange.SessionExchange
38+
sessionCount int
39+
mu sync.Mutex
40+
}
41+
42+
func (m *mockSessionExchange) NewSession(ctx context.Context) exchange.Fetcher {
43+
m.mu.Lock()
44+
defer m.mu.Unlock()
45+
m.sessionCount++
46+
return &mockFetcher{id: m.sessionCount}
47+
}
48+
49+
// mockFetcher is a mock implementation of exchange.Fetcher
50+
type mockFetcher struct {
51+
exchange.Fetcher
52+
id int
53+
}
54+
55+
func TestPoolGetFromEmptyPool(t *testing.T) {
56+
ex := &mockSessionExchange{}
57+
p := newPool(ex)
58+
ctx := context.Background()
59+
p.ctx = ctx
60+
61+
ses := p.get().(*mockFetcher)
62+
require.NotNil(t, ses)
63+
require.Equal(t, 1, ses.id)
64+
}
65+
66+
func TestPoolPutAndGet(t *testing.T) {
67+
ex := &mockSessionExchange{}
68+
p := newPool(ex)
69+
ctx := context.Background()
70+
p.ctx = ctx
71+
72+
// Get a session
73+
ses := p.get().(*mockFetcher)
74+
75+
// Put it back
76+
p.put(ses)
77+
78+
// Get again
79+
ses2 := p.get().(*mockFetcher)
80+
81+
require.Equal(t, ses.id, ses2.id)
82+
}
83+
84+
func TestPoolConcurrency(t *testing.T) {
85+
ex := &mockSessionExchange{}
86+
p := newPool(ex)
87+
ctx := context.Background()
88+
p.ctx = ctx
89+
90+
const numGoroutines = 50
91+
var wg sync.WaitGroup
92+
93+
sessionIDSet := make(map[int]struct{})
94+
lock := sync.Mutex{}
95+
96+
// Start multiple goroutines to get sessions
97+
for i := 0; i < numGoroutines; i++ {
98+
wg.Add(1)
99+
go func() {
100+
defer wg.Done()
101+
ses := p.get()
102+
mockSes := ses.(*mockFetcher)
103+
p.put(ses)
104+
lock.Lock()
105+
sessionIDSet[mockSes.id] = struct{}{}
106+
lock.Unlock()
107+
}()
108+
}
109+
wg.Wait()
110+
111+
// Since the pool reuses sessions, the number of unique session IDs should be less than or equal to numGoroutines
112+
if len(sessionIDSet) > numGoroutines {
113+
t.Fatalf("expected number of unique sessions to be less than or equal to %d, got %d",
114+
numGoroutines, len(sessionIDSet))
115+
}
116+
}

0 commit comments

Comments
 (0)