Skip to content

Commit 5f4f00c

Browse files
authored
SY-4432: Refactor distribution proxy And transport (#2531)
1 parent 8cbe798 commit 5f4f00c

79 files changed

Lines changed: 3712 additions & 1125 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

aspen/options.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func mergeDefaultOptions(o *options) {
186186
// provide their own transport. Otherwise the eagerly-created pool would
187187
// leak: nothing would reference it and nothing would close it.
188188
if o.transport.Transport == nil {
189-
pool := fgrpc.NewPool("", grpc.WithTransportCredentials(insecure.NewCredentials()))
189+
pool := fgrpc.OpenPool("", grpc.WithTransportCredentials(insecure.NewCredentials()))
190190
o.transport.ownedPool = pool
191191
o.transport.Transport = grpct.New(pool)
192192
}

core/cmd/start/start.go

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,18 @@ import (
1414
"io"
1515
"os"
1616
"path/filepath"
17-
"slices"
1817
"strconv"
1918
"time"
2019

2120
"github.com/samber/lo"
2221
"github.com/synnaxlabs/alamos"
2322
aspentransport "github.com/synnaxlabs/aspen/transport/grpc"
24-
"github.com/synnaxlabs/freighter/grpc"
2523
"github.com/synnaxlabs/freighter/http"
2624
cmdcert "github.com/synnaxlabs/synnax/cmd/cert"
2725
"github.com/synnaxlabs/synnax/pkg/api"
2826
"github.com/synnaxlabs/synnax/pkg/console"
2927
"github.com/synnaxlabs/synnax/pkg/distribution"
30-
channeltransport "github.com/synnaxlabs/synnax/pkg/distribution/transport/grpc/channel"
31-
framertransport "github.com/synnaxlabs/synnax/pkg/distribution/transport/grpc/framer"
28+
disttransport "github.com/synnaxlabs/synnax/pkg/distribution/transport/grpc"
3229
"github.com/synnaxlabs/synnax/pkg/driver"
3330
"github.com/synnaxlabs/synnax/pkg/security"
3431
"github.com/synnaxlabs/synnax/pkg/security/cert"
@@ -42,10 +39,10 @@ import (
4239
"github.com/synnaxlabs/x/config"
4340
"github.com/synnaxlabs/x/errors"
4441
xio "github.com/synnaxlabs/x/io"
45-
xfs "github.com/synnaxlabs/x/io/fs"
42+
"github.com/synnaxlabs/x/io/fs"
4643
"github.com/synnaxlabs/x/override"
4744
xservice "github.com/synnaxlabs/x/service"
48-
xsignal "github.com/synnaxlabs/x/signal"
45+
"github.com/synnaxlabs/x/signal"
4946
"github.com/synnaxlabs/x/validate"
5047
"go.uber.org/zap"
5148
)
@@ -199,23 +196,16 @@ func BootupCore(ctx context.Context, onServerStarted chan struct{}, cfgs ...Core
199196
return ctx.Err()
200197
}
201198
var (
202-
aspenTransport = aspentransport.New(grpcClientPool)
203-
frameTransport = framertransport.New(grpcClientPool)
204-
channelTransport = channeltransport.New(grpcClientPool)
205-
distributionTransports = []grpc.BindableTransport{
206-
aspenTransport,
207-
frameTransport,
208-
channelTransport,
209-
}
199+
aspenTransport = aspentransport.New(grpcClientPool)
200+
distTransport = disttransport.New(grpcClientPool)
210201
)
211202

212203
if distributionLayer, err = distribution.OpenLayer(ctx, distribution.LayerConfig{
213204
Instrumentation: cfg.Child("distribution"),
214205
AdvertiseAddress: cfg.listenAddress,
215206
PeerAddresses: cfg.peers,
216207
AspenTransport: aspenTransport,
217-
FrameTransport: frameTransport,
218-
ChannelTransport: channelTransport,
208+
Transport: distTransport,
219209
Verifier: cfg.verifier,
220210
Storage: storageLayer,
221211
ValidateChannelNames: cfg.validateChannelNames,
@@ -270,9 +260,10 @@ func BootupCore(ctx context.Context, onServerStarted chan struct{}, cfgs ...Core
270260
&server.SecureHTTPBranch{
271261
Transports: []http.BindableTransport{r, embeddedConsole},
272262
},
273-
&server.GRPCBranch{Transports: slices.Concat(
263+
&server.GRPCBranch{Transports: append(
274264
transportLayer.GRPC,
275-
distributionTransports,
265+
aspenTransport,
266+
distTransport,
276267
)},
277268
server.NewHTTPRedirectBranch(),
278269
},
@@ -350,7 +341,7 @@ func openWorkDir() (string, io.Closer, error) {
350341
"workdir",
351342
strconv.Itoa(os.Getpid()),
352343
)
353-
if err = os.MkdirAll(dir, xfs.UserRWX); err != nil {
344+
if err = os.MkdirAll(dir, fs.UserRWX); err != nil {
354345
return "", nil, err
355346
}
356347
return dir, xio.CloserFunc(func() error { return os.RemoveAll(dir) }), nil
@@ -363,12 +354,12 @@ func runStartupSearchIndexing(
363354
// Run indexing inside an isolated signal context, so that if we receive an early
364355
// cancellation signal, we can ensure that we exit indexing before we close any
365356
// resources that it depends on (notably storage KV).
366-
searchIndexCtx, cancelIndexing := xsignal.WithCancel(ctx)
357+
searchIndexCtx, cancelIndexing := signal.WithCancel(ctx)
367358
searchIndexCtx.Go(
368359
dist.Search.Initialize,
369-
xsignal.WithKey("startup_search_indexing"),
360+
signal.WithKey("startup_search_indexing"),
370361
)
371-
return xsignal.NewHardShutdown(searchIndexCtx, cancelIndexing)
362+
return signal.NewHardShutdown(searchIndexCtx, cancelIndexing)
372363
}
373364

374365
func parseIntegrations(enabled, disabled []string) []string {

core/cmd/start/transport.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func configureClientGRPC(
2222
sec security.Provider,
2323
insecure bool,
2424
) *fgrpc.Pool {
25-
return fgrpc.NewPool(
25+
return fgrpc.OpenPool(
2626
"",
2727
grpc.WithTransportCredentials(getClientGRPCTransportCredentials(sec, insecure)),
2828
)

core/pkg/distribution/channel/pb/services.pb.go

Lines changed: 41 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/pkg/distribution/channel/pb/services.proto

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,33 +16,61 @@ import "google/protobuf/empty.proto";
1616

1717
option go_package = "github.com/synnaxlabs/synnax/pkg/distribution/channel/pb";
1818

19-
service ChannelCreateService {
19+
// CreateService forwards channel create requests from a gateway node to the leaseholder
20+
// that will own the new channels.
21+
service CreateService {
22+
// Exec creates the requested channels and returns them populated with their assigned
23+
// keys.
2024
rpc Exec(CreateMessage) returns (CreateMessage) {}
2125
}
2226

23-
service ChannelDeleteService {
27+
// DeleteService forwards channel delete requests from a gateway node to the leaseholder
28+
// that owns the channels.
29+
service DeleteService {
30+
// Exec deletes the requested channels.
2431
rpc Exec(DeleteRequest) returns (google.protobuf.Empty) {}
2532
}
2633

27-
service ChannelRenameService {
34+
// RenameService forwards channel rename requests from a gateway node to the leaseholder
35+
// that owns the channels.
36+
service RenameService {
37+
// Exec renames the requested channels.
2838
rpc Exec(RenameRequest) returns (google.protobuf.Empty) {}
2939
}
3040

41+
// CreateOptions controls how a create request behaves when a channel with the same name
42+
// already exists.
3143
message CreateOptions {
44+
// retrieve_if_name_exists returns the existing channel instead of creating a new one
45+
// when a channel with the same name already exists.
3246
bool retrieve_if_name_exists = 1;
47+
// overwrite_if_name_exists overwrites the existing channel when a channel with the
48+
// same name already exists.
3349
bool overwrite_if_name_exists = 2;
3450
}
3551

52+
// CreateMessage is the request and response payload for a channel create operation. On
53+
// the request the channels carry no keys; on the response they are populated with their
54+
// assigned keys.
3655
message CreateMessage {
56+
// channels are the channels to create on the request, and the created channels with
57+
// their assigned keys on the response.
3758
repeated pb.Channel channels = 1;
59+
// opts controls create behavior on name collisions.
3860
CreateOptions opts = 2;
3961
}
4062

63+
// DeleteRequest is the payload for a channel delete operation.
4164
message DeleteRequest {
65+
// keys identifies the channels to delete.
4266
repeated uint32 keys = 1;
4367
}
4468

69+
// RenameRequest is the payload for a channel rename operation. keys and names are
70+
// positional: the channel at keys[i] is renamed to names[i].
4571
message RenameRequest {
72+
// keys identifies the channels to rename.
4673
repeated uint32 keys = 1;
74+
// names holds the new name for each channel in keys, by position.
4775
repeated string names = 2;
4876
}

0 commit comments

Comments
 (0)