Skip to content

Commit eea6cbc

Browse files
authored
Add shard distributor canary service (cadence-workflow#7252)
**What changed?** Added a new shard distributor canary service that provides testing and validation capabilities for the shard distributor functionality. The service includes: - A standalone canary application that can be built and run independently - Two types of shard processors: - Fixed shard processors for long-running shard processing validation - Ephemeral shard processors that simulate dynamic shard creation and completion - Makefile integration for building and running the canary - Modular design using fx dependency injection for easy integration with different environments **Why?** The shard distributor service needs a way to validate its functionality. This canary service provides: 1. Continuous validation that the shard distributor is working correctly 2. Load testing capabilities through ephemeral shard creation 3. Monitoring of shard processing lifecycle (creation, processing, completion) 4. A foundation for integration testing with different shard distributor configurations The modular design allows the same codebase to be used in both OSS and internal environments by swapping out key dependencies. **How did you test it?** - Unit tests - Tested the build process using the Makefile targets - Verified the fx module dependency injection works correctly - Tested both fixed and ephemeral shard processing scenarios locally - Validated that the canary can connect to and interact with a running shard distributor service **Potential risks** **Release notes** **Documentation Changes** --------- Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent d0913a9 commit eea6cbc

File tree

17 files changed

+837
-1
lines changed

17 files changed

+837
-1
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ dockerize
3838
/cadence-cassandra-tool
3939
/cadence-sql-tool
4040
/cadence-releaser
41+
/sharddistributor-canary
4142

4243
# SQLite databases
4344
cadence.db*

Makefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,11 @@ cadence-canary: $(BINS_DEPEND_ON)
527527
$Q echo "compiling cadence-canary with OS: $(GOOS), ARCH: $(GOARCH)"
528528
$Q ./scripts/build-with-ldflags.sh -o $@ cmd/canary/main.go
529529

530+
BINS += sharddistributor-canary
531+
sharddistributor-canary: $(BINS_DEPEND_ON)
532+
$Q echo "compiling sharddistributor-canary with OS: $(GOOS), ARCH: $(GOARCH)"
533+
$Q ./scripts/build-with-ldflags.sh -o $@ cmd/sharddistributor-canary/main.go
534+
530535
BINS += cadence-bench
531536
cadence-bench: $(BINS_DEPEND_ON)
532537
$Q echo "compiling cadence-bench with OS: $(GOOS), ARCH: $(GOARCH)"
@@ -834,6 +839,9 @@ start-xdc-cluster2: cadence-server
834839
start-canary: cadence-canary
835840
./cadence-canary start
836841

842+
start-sharddistributor-canary: sharddistributor-canary
843+
./sharddistributor-canary start
844+
837845
start-bench: cadence-bench
838846
./cadence-bench start
839847

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package main
2+
3+
import (
4+
"os"
5+
"time"
6+
7+
"github.com/uber-go/tally"
8+
"github.com/urfave/cli/v2"
9+
"go.uber.org/fx"
10+
"go.uber.org/yarpc"
11+
"go.uber.org/yarpc/transport/grpc"
12+
"go.uber.org/zap"
13+
14+
"github.com/uber/cadence/common/clock"
15+
"github.com/uber/cadence/common/log"
16+
"github.com/uber/cadence/service/sharddistributor/canary"
17+
"github.com/uber/cadence/service/sharddistributor/executorclient"
18+
"github.com/uber/cadence/tools/common/commoncli"
19+
)
20+
21+
const (
22+
// Default configuration
23+
defaultShardDistributorEndpoint = "127.0.0.1:7943"
24+
defaultFixedNamespace = "shard-distributor-canary"
25+
defaultEphemeralNamespace = "shard-distributor-canary-ephemeral"
26+
27+
shardDistributorServiceName = "cadence-shard-distributor"
28+
)
29+
30+
func runApp(c *cli.Context) {
31+
endpoint := c.String("endpoint")
32+
fixedNamespace := c.String("fixed-namespace")
33+
ephemeralNamespace := c.String("ephemeral-namespace")
34+
35+
fx.New(opts(fixedNamespace, ephemeralNamespace, endpoint)).Run()
36+
}
37+
38+
func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option {
39+
config := executorclient.Config{
40+
Namespaces: []executorclient.NamespaceConfig{
41+
{Namespace: fixedNamespace, HeartBeatInterval: 1 * time.Second},
42+
{Namespace: ephemeralNamespace, HeartBeatInterval: 1 * time.Second},
43+
},
44+
}
45+
46+
transport := grpc.NewTransport()
47+
yarpcConfig := yarpc.Config{
48+
Name: "shard-distributor-canary",
49+
Outbounds: yarpc.Outbounds{
50+
shardDistributorServiceName: {
51+
Unary: transport.NewSingleOutbound(endpoint),
52+
},
53+
},
54+
}
55+
56+
return fx.Options(
57+
fx.Supply(
58+
fx.Annotate(tally.NoopScope, fx.As(new(tally.Scope))),
59+
fx.Annotate(clock.NewRealTimeSource(), fx.As(new(clock.TimeSource))),
60+
yarpcConfig,
61+
config,
62+
),
63+
fx.Provide(
64+
yarpc.NewDispatcher,
65+
func(d *yarpc.Dispatcher) yarpc.ClientConfig { return d }, // Reprovide the dispatcher as a client config
66+
),
67+
fx.Provide(zap.NewDevelopment),
68+
fx.Provide(log.NewLogger),
69+
70+
// Start the YARPC dispatcher
71+
fx.Invoke(func(lc fx.Lifecycle, dispatcher *yarpc.Dispatcher) {
72+
lc.Append(fx.StartStopHook(dispatcher.Start, dispatcher.Stop))
73+
}),
74+
75+
// Include the canary module
76+
canary.Module(fixedNamespace, ephemeralNamespace, shardDistributorServiceName),
77+
)
78+
}
79+
80+
func buildCLI() *cli.App {
81+
app := cli.NewApp()
82+
app.Name = "sharddistributor-canary"
83+
app.Usage = "Cadence shard distributor canary"
84+
app.Version = "0.0.1"
85+
86+
app.Commands = []*cli.Command{
87+
{
88+
Name: "start",
89+
Usage: "start shard distributor canary",
90+
Flags: []cli.Flag{
91+
&cli.StringFlag{
92+
Name: "endpoint",
93+
Aliases: []string{"e"},
94+
Value: defaultShardDistributorEndpoint,
95+
Usage: "shard distributor endpoint address",
96+
},
97+
&cli.StringFlag{
98+
Name: "fixed-namespace",
99+
Value: defaultFixedNamespace,
100+
Usage: "namespace for fixed shard processing",
101+
},
102+
&cli.StringFlag{
103+
Name: "ephemeral-namespace",
104+
Value: defaultEphemeralNamespace,
105+
Usage: "namespace for ephemeral shard creation testing",
106+
},
107+
},
108+
Action: func(c *cli.Context) error {
109+
runApp(c)
110+
return nil
111+
},
112+
},
113+
}
114+
115+
return app
116+
}
117+
118+
func main() {
119+
app := buildCLI()
120+
commoncli.ExitHandler(app.Run(os.Args))
121+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package main
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"go.uber.org/fx"
8+
)
9+
10+
func TestDependenciesAreSatisfied(t *testing.T) {
11+
assert.NoError(t, fx.ValidateApp(opts(defaultFixedNamespace, defaultEphemeralNamespace, defaultShardDistributorEndpoint)))
12+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package factory
2+
3+
import (
4+
"go.uber.org/fx"
5+
"go.uber.org/zap"
6+
7+
"github.com/uber/cadence/common/clock"
8+
"github.com/uber/cadence/service/sharddistributor/executorclient"
9+
)
10+
11+
// ShardProcessorFactory is a generic factory for creating ShardProcessor instances.
12+
type ShardProcessorFactory[T executorclient.ShardProcessor] struct {
13+
logger *zap.Logger
14+
timeSource clock.TimeSource
15+
constructor func(shardID string, timeSource clock.TimeSource, logger *zap.Logger) T
16+
}
17+
18+
// NewShardProcessor creates a new ShardProcessor.
19+
func (s *ShardProcessorFactory[T]) NewShardProcessor(shardID string) (T, error) {
20+
return s.constructor(shardID, s.timeSource, s.logger), nil
21+
}
22+
23+
// Params are the parameters for creating a ShardProcessorFactory.
24+
type Params struct {
25+
fx.In
26+
27+
Logger *zap.Logger
28+
TimeSource clock.TimeSource
29+
}
30+
31+
// NewShardProcessorFactory creates a new ShardProcessorFactory with a constructor function.
32+
func NewShardProcessorFactory[T executorclient.ShardProcessor](
33+
params Params,
34+
constructor func(shardID string, timeSource clock.TimeSource, logger *zap.Logger) T,
35+
) executorclient.ShardProcessorFactory[T] {
36+
return &ShardProcessorFactory[T]{
37+
logger: params.Logger,
38+
timeSource: params.TimeSource,
39+
constructor: constructor,
40+
}
41+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package factory
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
"go.uber.org/zap/zaptest"
9+
10+
"github.com/uber/cadence/common/clock"
11+
"github.com/uber/cadence/service/sharddistributor/canary/processor"
12+
"github.com/uber/cadence/service/sharddistributor/executorclient"
13+
)
14+
15+
func TestNewShardProcessorFactory(t *testing.T) {
16+
logger := zaptest.NewLogger(t)
17+
timeSource := clock.NewRealTimeSource()
18+
19+
params := Params{
20+
Logger: logger,
21+
TimeSource: timeSource,
22+
}
23+
24+
factory := NewShardProcessorFactory(params, processor.NewShardProcessor)
25+
26+
// Test that the factory implements the correct interface
27+
var _ executorclient.ShardProcessorFactory[*processor.ShardProcessor] = factory
28+
29+
// Test creating a processor
30+
processor, err := factory.NewShardProcessor("test-shard-1")
31+
require.NoError(t, err)
32+
assert.NotNil(t, processor)
33+
}
34+
35+
func TestShardProcessorFactory_NewShardProcessor(t *testing.T) {
36+
logger := zaptest.NewLogger(t)
37+
timeSource := clock.NewRealTimeSource()
38+
39+
factory := &ShardProcessorFactory[*processor.ShardProcessor]{
40+
logger: logger,
41+
timeSource: timeSource,
42+
constructor: processor.NewShardProcessor,
43+
}
44+
45+
// Test creating multiple processors
46+
processor1, err := factory.NewShardProcessor("shard-1")
47+
require.NoError(t, err)
48+
49+
processor2, err := factory.NewShardProcessor("shard-2")
50+
require.NoError(t, err)
51+
52+
// Ensure they are different instances
53+
assert.NotEqual(t, processor1, processor2)
54+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package canary
2+
3+
import (
4+
"go.uber.org/fx"
5+
6+
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
7+
"github.com/uber/cadence/service/sharddistributor/canary/factory"
8+
"github.com/uber/cadence/service/sharddistributor/canary/processor"
9+
"github.com/uber/cadence/service/sharddistributor/canary/processorephemeral"
10+
"github.com/uber/cadence/service/sharddistributor/canary/sharddistributorclient"
11+
"github.com/uber/cadence/service/sharddistributor/executorclient"
12+
)
13+
14+
func Module(fixedNamespace, ephemeralNamespace, sharddistributorServiceName string) fx.Option {
15+
return fx.Module("shard-distributor-canary", opts(fixedNamespace, ephemeralNamespace, sharddistributorServiceName))
16+
}
17+
18+
func opts(fixedNamespace, ephemeralNamespace, sharddistributorServiceName string) fx.Option {
19+
return fx.Options(
20+
fx.Provide(sharddistributorv1.NewFxShardDistributorExecutorAPIYARPCClient(sharddistributorServiceName)),
21+
fx.Provide(sharddistributorv1.NewFxShardDistributorAPIYARPCClient(sharddistributorServiceName)),
22+
23+
fx.Provide(sharddistributorclient.NewShardDistributorClient),
24+
25+
// Modules for the shard distributor canary
26+
fx.Provide(
27+
func(params factory.Params) executorclient.ShardProcessorFactory[*processor.ShardProcessor] {
28+
return factory.NewShardProcessorFactory(params, processor.NewShardProcessor)
29+
},
30+
func(params factory.Params) executorclient.ShardProcessorFactory[*processorephemeral.ShardProcessor] {
31+
return factory.NewShardProcessorFactory(params, processorephemeral.NewShardProcessor)
32+
},
33+
),
34+
35+
executorclient.ModuleWithNamespace[*processor.ShardProcessor](fixedNamespace),
36+
executorclient.ModuleWithNamespace[*processorephemeral.ShardProcessor](ephemeralNamespace),
37+
38+
processorephemeral.ShardCreatorModule(ephemeralNamespace),
39+
)
40+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package canary
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/golang/mock/gomock"
8+
"github.com/uber-go/tally"
9+
"go.uber.org/fx"
10+
"go.uber.org/fx/fxtest"
11+
"go.uber.org/yarpc"
12+
"go.uber.org/yarpc/api/transport/transporttest"
13+
"go.uber.org/yarpc/transport/grpc"
14+
"go.uber.org/yarpc/yarpctest"
15+
"go.uber.org/zap/zaptest"
16+
17+
"github.com/uber/cadence/common/clock"
18+
"github.com/uber/cadence/common/log"
19+
"github.com/uber/cadence/service/sharddistributor/executorclient"
20+
)
21+
22+
func TestModule(t *testing.T) {
23+
// Create mocks
24+
ctrl := gomock.NewController(t)
25+
mockClientConfig := transporttest.NewMockClientConfig(ctrl)
26+
outbound := grpc.NewTransport().NewOutbound(yarpctest.NewFakePeerList())
27+
28+
mockClientConfig.EXPECT().Caller().Return("test-executor").Times(2)
29+
mockClientConfig.EXPECT().Service().Return("shard-distributor").Times(2)
30+
mockClientConfig.EXPECT().GetUnaryOutbound().Return(outbound).Times(2)
31+
32+
mockClientConfigProvider := transporttest.NewMockClientConfigProvider(ctrl)
33+
mockClientConfigProvider.EXPECT().ClientConfig("cadence-shard-distributor").Return(mockClientConfig).Times(2)
34+
35+
config := executorclient.Config{
36+
Namespaces: []executorclient.NamespaceConfig{
37+
{Namespace: "test-fixed", HeartBeatInterval: 5 * time.Second},
38+
{Namespace: "test-ephemeral", HeartBeatInterval: 5 * time.Second},
39+
},
40+
}
41+
42+
// Create a test app with the library, check that it starts and stops
43+
fxtest.New(t,
44+
fx.Supply(
45+
fx.Annotate(tally.NoopScope, fx.As(new(tally.Scope))),
46+
fx.Annotate(clock.NewMockedTimeSource(), fx.As(new(clock.TimeSource))),
47+
fx.Annotate(log.NewNoop(), fx.As(new(log.Logger))),
48+
fx.Annotate(mockClientConfigProvider, fx.As(new(yarpc.ClientConfig))),
49+
zaptest.NewLogger(t),
50+
config,
51+
),
52+
53+
Module("test-fixed", "test-ephemeral", "cadence-shard-distributor"),
54+
).RequireStart().RequireStop()
55+
}

0 commit comments

Comments
 (0)