Skip to content

Commit 444c7ec

Browse files
APIE-744 - Flink Materialized Tables (#3254)
Co-authored-by: Cynthia Qin <cqin@confluent.io>
1 parent 26aad2c commit 444c7ec

34 files changed

Lines changed: 1710 additions & 7 deletions

cmd/lint/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ var commandRules = []linter.CommandRule{
4343
linter.ExcludeCommand("connect custom-plugin version create"),
4444
linter.ExcludeCommand("connect custom-plugin version update"),
4545
linter.ExcludeCommand("pipeline update"),
46-
linter.ExcludeCommand("flink statement update")),
46+
linter.ExcludeCommand("flink statement update"),
47+
linter.ExcludeCommand("flink materialized-table update")),
4748

4849
// Soft Requirements
4950
linter.Filter(linter.RequireLengthBetween("Short", 10, 60),
@@ -126,6 +127,7 @@ var flagRules = []linter.FlagRule{
126127
"source-bootstrap-server",
127128
"update-schema-registry",
128129
"worker-configurations",
130+
"distribution-bucket-count",
129131
),
130132
),
131133
linter.FlagFilter(
@@ -139,6 +141,7 @@ var flagRules = []linter.FlagRule{
139141
"schema-registry-api-key",
140142
"schema-registry-api-secret",
141143
"skip-message-on-error",
144+
"distributed-by-column-names",
142145
),
143146
),
144147
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ require (
3030
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9
3131
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0
3232
github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0
33-
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0
33+
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.19.0
3434
github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0
3535
github.com/confluentinc/ccloud-sdk-go-v2/iam-ip-filtering v0.5.0
3636
github.com/confluentinc/ccloud-sdk-go-v2/identity-provider v0.3.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0 h1:gRRtad0RRit38+54vKg6Dt
218218
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0/go.mod h1:JHg9yHyCBLL0Zm24skG4pGaSR49IfxPJaQg/HXzMJpw=
219219
github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0 h1:DVWL3Y4b5azgCADubtyp3EhGZuyJkleINaTy2V3iius=
220220
github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0/go.mod h1:P4fdIkI1ynjSvhDEGX283KhtzG51eGHQc5Cqtp7bu1Q=
221-
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0 h1:KzlhRDrUsXbs4ZPZy6T9OWmFIVkZWHxNsDorHHSnwFs=
222-
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0/go.mod h1:CuvhIQpYj/LbQeYzp7Sw2LJkTKzLh8xlFdQoKq9ZQlY=
221+
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.19.0 h1:fS7l0rJQ1PT5J8Qg4EBZvD+hIq5zaF+Io9LtwNpQpSg=
222+
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.19.0/go.mod h1:ZkJWrKzzyILrvExh2F6ZeR6tE/meO1HDLs3fEfRYhUc=
223223
github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0 h1:37Gjdo+0Ev3g2NPEXyiVm7yTT85AlWbjXYRLvq6Aj9E=
224224
github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0/go.mod h1:jnWqax4kM22sutPGMtGmHqe2usgfqYig4UtmHsLENz0=
225225
github.com/confluentinc/ccloud-sdk-go-v2/iam-ip-filtering v0.5.0 h1:xD7CXcyAqezFnSVFB4U27oWUY4FlbyciVP0ftDIiI18=

internal/flink/command.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {
5555
cmd.AddCommand(c.newConnectionCommand())
5656
cmd.AddCommand(c.newConnectivityTypeCommand())
5757
cmd.AddCommand(c.newEndpointCommand())
58+
cmd.AddCommand(c.newMaterializedTableCommand())
5859
cmd.AddCommand(c.newRegionCommand())
5960

6061
return cmd
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package flink
2+
3+
import (
4+
"github.com/spf13/cobra"
5+
6+
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
7+
)
8+
9+
type materializedTableOut struct {
10+
Name string `human:"Name" serialized:"name"`
11+
ClusterID string `human:"Database" serialized:"database"`
12+
Environment string `human:"Environment" serialized:"environment"`
13+
ComputePool string `human:"Compute Pool" serialized:"compute_pool"`
14+
ServiceAccount string `human:"Principal" serialized:"principal"`
15+
Stopped bool `human:"Stopped" serialized:"stopped"`
16+
Query string `human:"Query,omitempty" serialized:"query,omitempty"`
17+
Columns []string `human:"Columns,omitempty" serialized:"columns,omitempty"`
18+
WaterMarkColumnName string `human:"Watermark Column,omitempty" serialized:"watermark_column,omitempty"`
19+
WaterMarkExpression string `human:"Watermark Expression,omitempty" serialized:"watermark_expression,omitempty"`
20+
Constraints []string `human:"Constraints,omitempty" serialized:"constraints,omitempty"`
21+
DistributionKeys []string `human:"Distribution Keys,omitempty" serialized:"distribution_keys,omitempty"`
22+
DistributionBucketCount int `human:"Distribution Bucket Count,omitempty" serialized:"distribution_bucket_count,omitempty"`
23+
}
24+
25+
func (c *command) newMaterializedTableCommand() *cobra.Command {
26+
cmd := &cobra.Command{
27+
Use: "materialized-table",
28+
Short: "Manage Flink materialized tables.",
29+
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin},
30+
}
31+
32+
cmd.AddCommand(c.newMaterializedTableCreateCommand())
33+
cmd.AddCommand(c.newMaterializedTableDeleteCommand())
34+
cmd.AddCommand(c.newMaterializedTableDescribeCommand())
35+
cmd.AddCommand(c.newMaterializedTableListCommand())
36+
cmd.AddCommand(c.newMaterializedTableUpdateCommand())
37+
38+
return cmd
39+
}
40+
41+
func (c *command) validMaterializedTableArgs(cmd *cobra.Command, args []string) []string {
42+
if len(args) > 0 {
43+
return nil
44+
}
45+
46+
return c.validMaterializedTablesArgsMultiple(cmd, args)
47+
}
48+
49+
func (c *command) validMaterializedTablesArgsMultiple(cmd *cobra.Command, args []string) []string {
50+
if err := c.PersistentPreRunE(cmd, args); err != nil {
51+
return nil
52+
}
53+
54+
environmentId, err := c.Context.EnvironmentId()
55+
if err != nil {
56+
return nil
57+
}
58+
59+
client, err := c.GetFlinkGatewayClient(false)
60+
if err != nil {
61+
return nil
62+
}
63+
64+
tables, err := client.ListMaterializedTables(environmentId, c.Context.GetCurrentOrganization())
65+
if err != nil {
66+
return nil
67+
}
68+
69+
suggestions := make([]string, len(tables))
70+
for i, table := range tables {
71+
suggestions[i] = table.GetName()
72+
}
73+
return suggestions
74+
}
75+
76+
func (c *command) addOptionalMaterializedTableFlags(cmd *cobra.Command) {
77+
cmd.Flags().String("columns-physical", "", "Path to the columns data for type physical.")
78+
cmd.Flags().String("columns-metadata", "", "Path to the columns data for type metadata.")
79+
cmd.Flags().String("columns-computed", "", "Path to the columns data for type computed.")
80+
cmd.Flags().String("watermark-column", "", "The name of the watermark columns.")
81+
cmd.Flags().String("watermark-expression", "", "The watermark expression.")
82+
cmd.Flags().String("constraints", "", "Path to the constraints.")
83+
cmd.Flags().String("distribution-keys", "", "The names of the columns the table is distributed by.")
84+
cmd.Flags().Int("distribution-bucket-count", 0, "The number of buckets.")
85+
}

0 commit comments

Comments
 (0)