Skip to content

Commit 6ebcd76

Browse files
authored
Add Export Support in Tcld (#217)
* export support in tcld * address test failure * fix * address comments * Address linting error
1 parent 194741b commit 6ebcd76

7 files changed

Lines changed: 9491 additions & 1720 deletions

File tree

app/common.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
package app
22

3+
import (
4+
"fmt"
5+
"regexp"
6+
)
7+
8+
var (
9+
assumedRolePattern = regexp.MustCompile(`^arn:aws:iam::([0-9]{12}):role/(\S+)$`)
10+
)
11+
312
func IsFeatureEnabled(feature string) bool {
413
jsonData, err := getFeatureFlags()
514

@@ -15,3 +24,22 @@ func IsFeatureEnabled(feature string) bool {
1524

1625
return false
1726
}
27+
28+
func parseAssumedRole(assumedRole string) (string, string, error) {
29+
var accountID, roleName string
30+
re := assumedRolePattern
31+
submatch := re.FindStringSubmatch(assumedRole)
32+
33+
if len(submatch) != 3 {
34+
return "", "", fmt.Errorf("invalid assumed role: %s", assumedRole)
35+
}
36+
37+
accountID = submatch[1]
38+
roleName = submatch[2]
39+
40+
return accountID, roleName, nil
41+
}
42+
43+
func getAssumedRoleArn(awsAccountId string, awsRoleName string) string {
44+
return fmt.Sprintf("arn:aws:iam::%s:role/%s", awsAccountId, awsRoleName)
45+
}

app/namespace.go

Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ import (
77
"fmt"
88
"io/ioutil"
99
"net/mail"
10+
"strconv"
1011
"strings"
1112

1213
"github.com/temporalio/tcld/protogen/api/auth/v1"
14+
"github.com/temporalio/tcld/protogen/api/sink/v1"
1315
"go.uber.org/multierr"
1416

1517
"github.com/kylelemons/godebug/diff"
@@ -56,6 +58,47 @@ var (
5658
"us-east-1",
5759
"us-west-2",
5860
}
61+
sinkNameFlag = &cli.StringFlag{
62+
Name: "sink-name",
63+
Usage: "Provide a name for the export sink",
64+
Required: true,
65+
}
66+
sinkAssumedRoleFlagOptional = &cli.StringFlag{
67+
Name: "role-arn",
68+
Usage: "Provide role arn for the IAM Role",
69+
}
70+
sinkAssumedRoleFlagRequired = &cli.StringFlag{
71+
Name: "role-arn",
72+
Usage: "Provide role arn for the IAM Role",
73+
Required: true,
74+
}
75+
s3BucketFlagOptional = &cli.StringFlag{
76+
Name: "s3-bucket-name",
77+
Usage: "Provide the name of an AWS S3 bucket that Temporal will send closed workflow histories to",
78+
}
79+
s3BucketFlagRequired = &cli.StringFlag{
80+
Name: "s3-bucket-name",
81+
Usage: "Provide the name of an AWS S3 bucket that Temporal will send closed workflow histories to",
82+
Required: true,
83+
}
84+
sinkEnabledFlag = &cli.StringFlag{
85+
Name: "enabled",
86+
Usage: "Whether export is enabled",
87+
}
88+
kmsArnFlag = &cli.StringFlag{
89+
Name: "kms-arn",
90+
Usage: "Provide the ARN of the KMS key to use for encryption. Note: If the KMS ARN needs to be updated, user should modify the created IAM Role accordingly.",
91+
}
92+
93+
pageSizeFlag = &cli.IntFlag{
94+
Name: "page-size",
95+
Usage: "The page size for list operations",
96+
Value: 100,
97+
}
98+
pageTokenFlag = &cli.StringFlag{
99+
Name: "page-token",
100+
Usage: "The page token for list operations",
101+
}
59102
)
60103

61104
type NamespaceClient struct {
@@ -82,6 +125,78 @@ func GetNamespaceClient(ctx *cli.Context) (*NamespaceClient, error) {
82125
return NewNamespaceClient(ct, conn), nil
83126
}
84127

128+
func (c *NamespaceClient) getExportSink(ctx *cli.Context, namespaceName, sinkName string) (*sink.ExportSink, error) {
129+
getRequest := &namespaceservice.GetExportSinkRequest{
130+
Namespace: namespaceName,
131+
SinkName: sinkName,
132+
}
133+
134+
getResp, err := c.client.GetExportSink(c.ctx, getRequest)
135+
if err != nil {
136+
return nil, fmt.Errorf("failed to get export sink: %w", err)
137+
}
138+
return getResp.Sink, nil
139+
}
140+
141+
func (c *NamespaceClient) selectExportSinkResourceVersion(ctx *cli.Context, sink *sink.ExportSink) string {
142+
if ctx.String(ResourceVersionFlagName) != "" {
143+
return ctx.String(ResourceVersionFlagName)
144+
}
145+
return sink.ResourceVersion
146+
}
147+
148+
func (c *NamespaceClient) isS3BucketChange(ctx *cli.Context, sink *sink.ExportSink) bool {
149+
if !ctx.IsSet(s3BucketFlagRequired.Name) {
150+
return false
151+
}
152+
153+
return sink.GetSpec().GetS3Sink().GetBucketName() != ctx.String(s3BucketFlagRequired.Name)
154+
}
155+
156+
func (c *NamespaceClient) isAssumedRoleChange(ctx *cli.Context, sink *sink.ExportSink) bool {
157+
if !ctx.IsSet(sinkAssumedRoleFlagRequired.Name) {
158+
return false
159+
}
160+
161+
roleArn := getAssumedRoleArn(sink.GetSpec().GetS3Sink().GetAwsAccountId(), sink.GetSpec().GetS3Sink().GetRoleName())
162+
return roleArn != ctx.String(sinkAssumedRoleFlagRequired.Name)
163+
164+
}
165+
166+
func (c *NamespaceClient) isKmsArnChange(ctx *cli.Context, sink *sink.ExportSink) bool {
167+
if !ctx.IsSet(kmsArnFlag.Name) {
168+
return false
169+
}
170+
171+
return sink.GetSpec().GetS3Sink().GetKmsArn() != ctx.String(kmsArnFlag.Name)
172+
}
173+
174+
func (c *NamespaceClient) isSinkEnabledChange(ctx *cli.Context, sink *sink.ExportSink) (bool, error) {
175+
if !ctx.IsSet(sinkEnabledFlag.Name) {
176+
return false, nil
177+
}
178+
179+
enabledValue, err := strconv.ParseBool(ctx.String(sinkEnabledFlag.Name))
180+
if err != nil {
181+
return false, fmt.Errorf("invalid value for enabled flag: %w. Only allowed true or false", err)
182+
}
183+
184+
if sink.GetSpec().GetEnabled() == enabledValue {
185+
return false, nil
186+
}
187+
return true, nil
188+
}
189+
190+
func (c *NamespaceClient) getExportSinkResourceVersion(ctx *cli.Context, namespaceName, sinkName string) (string, error) {
191+
sink, err := c.getExportSink(ctx, namespaceName, sinkName)
192+
if err != nil {
193+
return "", err
194+
}
195+
196+
resourceVersion := c.selectExportSinkResourceVersion(ctx, sink)
197+
198+
return resourceVersion, nil
199+
}
85200
func (c *NamespaceClient) deleteNamespace(ctx *cli.Context, n *namespace.Namespace) error {
86201
resourceVersion := n.ResourceVersion
87202
if v := ctx.String(ResourceVersionFlagName); v != "" {
@@ -1035,6 +1150,199 @@ func NewNamespaceCommand(getNamespaceClientFn GetNamespaceClientFn) (CommandOut,
10351150
Name: "export",
10361151
Usage: "Manage export sinks",
10371152
Aliases: []string{"es"},
1153+
Subcommands: []*cli.Command{
1154+
{
1155+
Name: "create",
1156+
Aliases: []string{"c"},
1157+
Usage: "Create export sink",
1158+
Flags: []cli.Flag{
1159+
NamespaceFlag,
1160+
sinkNameFlag,
1161+
sinkAssumedRoleFlagRequired,
1162+
s3BucketFlagRequired,
1163+
RequestIDFlag,
1164+
},
1165+
Action: func(ctx *cli.Context) error {
1166+
awsAccountID, roleName, err := parseAssumedRole(ctx.String(sinkAssumedRoleFlagRequired.Name))
1167+
if err != nil {
1168+
return err
1169+
}
1170+
1171+
namespace := ctx.String(NamespaceFlagName)
1172+
ns, err := c.getNamespace(namespace)
1173+
if err != nil {
1174+
return fmt.Errorf("unable to get namespace: %v", err)
1175+
}
1176+
1177+
request := &namespaceservice.CreateExportSinkRequest{
1178+
Namespace: namespace,
1179+
Spec: &sink.ExportSinkSpec{
1180+
Name: ctx.String(sinkNameFlag.Name),
1181+
Enabled: true,
1182+
DestinationType: sink.EXPORT_DESTINATION_TYPE_S3,
1183+
S3Sink: &sink.S3Spec{
1184+
RoleName: roleName,
1185+
BucketName: ctx.String(s3BucketFlagRequired.Name),
1186+
Region: ns.Spec.Region,
1187+
KmsArn: ctx.String(kmsArnFlag.Name),
1188+
AwsAccountId: awsAccountID,
1189+
},
1190+
},
1191+
RequestId: ctx.String(RequestIDFlagName),
1192+
}
1193+
1194+
res, err := c.client.CreateExportSink(c.ctx, request)
1195+
if err != nil {
1196+
return err
1197+
}
1198+
1199+
return PrintProto(res.RequestStatus)
1200+
},
1201+
},
1202+
{
1203+
Name: "get",
1204+
Aliases: []string{"g"},
1205+
Usage: "Get export sink",
1206+
Flags: []cli.Flag{
1207+
NamespaceFlag,
1208+
sinkNameFlag,
1209+
},
1210+
Action: func(ctx *cli.Context) error {
1211+
sink, err := c.getExportSink(ctx, ctx.String(NamespaceFlagName), ctx.String(sinkNameFlag.Name))
1212+
1213+
if err != nil {
1214+
return err
1215+
}
1216+
1217+
return PrintProto(sink)
1218+
},
1219+
},
1220+
{
1221+
Name: "delete",
1222+
Aliases: []string{"d"},
1223+
Usage: "Delete export sink",
1224+
Flags: []cli.Flag{
1225+
NamespaceFlag,
1226+
sinkNameFlag,
1227+
ResourceVersionFlag,
1228+
RequestIDFlag,
1229+
},
1230+
Action: func(ctx *cli.Context) error {
1231+
namespaceName := ctx.String(NamespaceFlagName)
1232+
sinkName := ctx.String(sinkNameFlag.Name)
1233+
resourceVersion, err := c.getExportSinkResourceVersion(ctx, namespaceName, sinkName)
1234+
if err != nil {
1235+
return err
1236+
}
1237+
1238+
deleteRequest := &namespaceservice.DeleteExportSinkRequest{
1239+
Namespace: namespaceName,
1240+
SinkName: sinkName,
1241+
ResourceVersion: resourceVersion,
1242+
RequestId: ctx.String(RequestIDFlagName),
1243+
}
1244+
1245+
deleteResp, err := c.client.DeleteExportSink(c.ctx, deleteRequest)
1246+
if err != nil {
1247+
return err
1248+
}
1249+
1250+
return PrintProto(deleteResp.RequestStatus)
1251+
},
1252+
},
1253+
{
1254+
Name: "list",
1255+
Aliases: []string{"l"},
1256+
Usage: "List export sinks",
1257+
Flags: []cli.Flag{
1258+
NamespaceFlag,
1259+
pageSizeFlag,
1260+
pageTokenFlag,
1261+
},
1262+
Action: func(ctx *cli.Context) error {
1263+
request := &namespaceservice.ListExportSinksRequest{
1264+
Namespace: ctx.String(NamespaceFlagName),
1265+
PageSize: int32(pageSizeFlag.Value),
1266+
PageToken: ctx.String(pageTokenFlag.Name),
1267+
}
1268+
1269+
resp, err := c.client.ListExportSinks(c.ctx, request)
1270+
if err != nil {
1271+
return err
1272+
}
1273+
1274+
return PrintProto(resp)
1275+
},
1276+
},
1277+
{
1278+
Name: "update",
1279+
Aliases: []string{"u"},
1280+
Usage: "Update export sink",
1281+
Flags: []cli.Flag{
1282+
NamespaceFlag,
1283+
sinkEnabledFlag,
1284+
sinkAssumedRoleFlagOptional,
1285+
s3BucketFlagOptional,
1286+
ResourceVersionFlag,
1287+
kmsArnFlag,
1288+
RequestIDFlag,
1289+
},
1290+
Action: func(ctx *cli.Context) error {
1291+
namespaceName := ctx.String(NamespaceFlagName)
1292+
sinkName := ctx.String(sinkNameFlag.Name)
1293+
sink, err := c.getExportSink(ctx, namespaceName, sinkName)
1294+
if err != nil {
1295+
return err
1296+
}
1297+
resourceVersion := c.selectExportSinkResourceVersion(ctx, sink)
1298+
1299+
isEnabledChange, err := c.isSinkEnabledChange(ctx, sink)
1300+
if err != nil {
1301+
return err
1302+
}
1303+
1304+
if !isEnabledChange && !c.isAssumedRoleChange(ctx, sink) && !c.isKmsArnChange(ctx, sink) && !c.isS3BucketChange(ctx, sink) {
1305+
fmt.Println("nothing to update")
1306+
return nil
1307+
}
1308+
1309+
if isEnabledChange {
1310+
sink.Spec.Enabled = !sink.Spec.Enabled
1311+
}
1312+
1313+
if c.isAssumedRoleChange(ctx, sink) {
1314+
awsAccountID, roleName, err := parseAssumedRole(ctx.String(sinkAssumedRoleFlagOptional.Name))
1315+
if err != nil {
1316+
return err
1317+
}
1318+
sink.Spec.S3Sink.RoleName = roleName
1319+
sink.Spec.S3Sink.AwsAccountId = awsAccountID
1320+
}
1321+
1322+
if c.isKmsArnChange(ctx, sink) {
1323+
sink.Spec.S3Sink.KmsArn = ctx.String(kmsArnFlag.Name)
1324+
}
1325+
1326+
if c.isS3BucketChange(ctx, sink) {
1327+
sink.Spec.S3Sink.BucketName = ctx.String(s3BucketFlagOptional.Name)
1328+
}
1329+
1330+
request := &namespaceservice.UpdateExportSinkRequest{
1331+
Namespace: ctx.String(NamespaceFlagName),
1332+
Spec: sink.Spec,
1333+
ResourceVersion: resourceVersion,
1334+
RequestId: ctx.String(RequestIDFlagName),
1335+
}
1336+
1337+
resp, err := c.client.UpdateExportSink(c.ctx, request)
1338+
if err != nil {
1339+
return err
1340+
}
1341+
1342+
return PrintProto(resp.RequestStatus)
1343+
},
1344+
},
1345+
},
10381346
})
10391347
}
10401348

0 commit comments

Comments
 (0)