Skip to content

Commit ddd9a0a

Browse files
committed
Remove proxy.NewBatchFactory; convert BatchFactory directly from node.Key
1 parent 30c9792 commit ddd9a0a

6 files changed

Lines changed: 13 additions & 17 deletions

File tree

core/pkg/distribution/channel/service.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,9 @@ func OpenService(ctx context.Context, cfgs ...ServiceConfig) (s *Service, err er
128128
db: cfg.ClusterDB,
129129
otg: cfg.Ontology,
130130
indexes: newIndexes(),
131-
createRouter: proxy.NewBatchFactory[Channel](cfg.HostResolver.HostKey()),
132-
keyRouter: proxy.NewBatchFactory[Key](cfg.HostResolver.HostKey()),
133-
renameRouter: proxy.NewBatchFactory[renameBatchEntry](cfg.HostResolver.HostKey()),
131+
createRouter: proxy.BatchFactory[Channel](cfg.HostResolver.HostKey()),
132+
keyRouter: proxy.BatchFactory[Key](cfg.HostResolver.HostKey()),
133+
renameRouter: proxy.BatchFactory[renameBatchEntry](cfg.HostResolver.HostKey()),
134134
}
135135
cleanup, ok := service.NewOpener(ctx, &s.closer)
136136
defer func() { err = cleanup(err) }()

core/pkg/distribution/framer/deleter/lease_proxy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func newLeaseProxy(
3232
) (*leaseProxy, error) {
3333
p := &leaseProxy{
3434
ServiceConfig: cfg,
35-
keyRouter: proxy.NewBatchFactory[channel.Key](cfg.HostResolver.HostKey()),
35+
keyRouter: proxy.BatchFactory[channel.Key](cfg.HostResolver.HostKey()),
3636
}
3737
return p, nil
3838
}

core/pkg/distribution/framer/iterator/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (s *Service) NewStream(ctx context.Context, cfg Config) (StreamIterator, er
157157
cfg.Keys = cfg.Keys.Unique()
158158
var (
159159
hostID = s.cfg.HostResolver.HostKey()
160-
batch = proxy.NewBatchFactory[channel.Key](hostID).Batch(cfg.Keys)
160+
batch = proxy.BatchFactory[channel.Key](hostID).Batch(cfg.Keys)
161161
pipe = plumber.New()
162162
needPeerRouting = len(batch.Peers) > 0
163163
needGatewayRouting = len(batch.Gateway) > 0

core/pkg/distribution/framer/writer/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ func (s *Service) NewStream(ctx context.Context, cfgs ...Config) (StreamWriter,
351351

352352
var (
353353
hostKey = s.cfg.HostResolver.HostKey()
354-
batch = proxy.NewBatchFactory[keyAuthority](hostKey).Batch(cfg.keyAuthorities())
354+
batch = proxy.BatchFactory[keyAuthority](hostKey).Batch(cfg.keyAuthorities())
355355
pipe = plumber.New()
356356
hasPeer = len(batch.Peers) > 0
357357
hasGateway = len(batch.Gateway) > 0

core/pkg/distribution/proxy/proxy.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,10 @@ type Entry interface {
2323
}
2424

2525
// BatchFactory partitions Entry values by their leaseholder relative to a fixed host
26-
// node. Construct one with NewBatchFactory, then call Batch.
27-
type BatchFactory[E Entry] struct{ host node.Key }
28-
29-
// NewBatchFactory returns a BatchFactory that treats host as the local node: Batch
30-
// routes entries leased to host into the gateway bucket and all other leased entries to
31-
// peers.
32-
func NewBatchFactory[E Entry](host node.Key) BatchFactory[E] {
33-
return BatchFactory[E]{host: host}
34-
}
26+
// node, treating that host as the local node: Batch routes entries leased to it into the
27+
// gateway bucket and all other leased entries to peers. Construct one by converting a
28+
// host node.Key, e.g. BatchFactory[E](host), then call Batch.
29+
type BatchFactory[E Entry] node.Key
3530

3631
// Batch is the result of partitioning a set of Entry values by leaseholder. Every input
3732
// entry appears in exactly one of the three buckets.
@@ -50,12 +45,13 @@ type Batch[E Entry] struct {
5045
// grouped into Peers by their leaseholder key. The relative order of entries is
5146
// preserved within each bucket.
5247
func (f BatchFactory[E]) Batch(entries []E) Batch[E] {
48+
host := node.Key(f)
5349
b := Batch[E]{Peers: make(map[node.Key][]E)}
5450
for _, entry := range entries {
5551
lease := entry.Lease()
5652
if lease.IsFree() {
5753
b.Free = append(b.Free, entry)
58-
} else if lease == f.host {
54+
} else if lease == host {
5955
b.Gateway = append(b.Gateway, entry)
6056
} else {
6157
b.Peers[lease] = append(b.Peers[lease], entry)

core/pkg/distribution/proxy/proxy_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (e entry) Lease() node.Key { return e.lease }
3434
var _ = Describe("Batch", func() {
3535
var factory proxy.BatchFactory[entry]
3636
BeforeEach(func() {
37-
factory = proxy.NewBatchFactory[entry](host)
37+
factory = proxy.BatchFactory[entry](host)
3838
})
3939

4040
It("Should route entries leased to the host into the gateway bucket", func() {

0 commit comments

Comments
 (0)