Skip to content

Commit 963aca0

Browse files
committed
[PYF-260] Add flink statement result list command and display results on create --wait
1 parent 615d0c0 commit 963aca0

File tree

13 files changed

+381
-8
lines changed

13 files changed

+381
-8
lines changed

internal/flink/command_statement.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func (c *command) newStatementCommand(cfg *config.Config) *cobra.Command {
5858
cmd.AddCommand(c.newStatementWebUiForwardCommand())
5959
}
6060
cmd.AddCommand(c.newStatementExceptionCommand(cfg))
61+
cmd.AddCommand(c.newStatementResultCommand(cfg))
6162

6263
return cmd
6364
}

internal/flink/command_statement_create.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,17 @@ func (c *command) statementCreate(cmd *cobra.Command, args []string) error {
155155
if err != nil {
156156
return err
157157
}
158+
159+
// If the statement produces results, fetch and display them
160+
traits := statement.Status.GetTraits()
161+
schema := traits.GetSchema()
162+
if columns := schema.GetColumns(); len(columns) > 0 {
163+
statementResults, err := fetchAllResults(client, environmentId, name, c.Context.LastOrgId, schema, 0)
164+
if err != nil {
165+
return err
166+
}
167+
return printStatementResults(cmd, statementResults)
168+
}
158169
}
159170

160171
table := output.NewTable(cmd)
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package flink
2+
3+
import (
4+
"fmt"
5+
"net/url"
6+
7+
"github.com/olekukonko/tablewriter"
8+
"github.com/spf13/cobra"
9+
10+
flinkgatewayv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway/v1"
11+
12+
"github.com/confluentinc/cli/v4/pkg/ccloudv2"
13+
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
14+
"github.com/confluentinc/cli/v4/pkg/config"
15+
"github.com/confluentinc/cli/v4/pkg/output"
16+
)
17+
18+
func (c *command) newStatementResultCommand(cfg *config.Config) *cobra.Command {
19+
cmd := &cobra.Command{
20+
Use: "result",
21+
Short: "Manage Flink SQL statement results.",
22+
}
23+
24+
if cfg.IsCloudLogin() {
25+
pcmd.AddCloudFlag(cmd)
26+
pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand)
27+
cmd.AddCommand(c.newStatementResultListCommand())
28+
}
29+
30+
return cmd
31+
}
32+
33+
type serializedResultOutput struct {
34+
Columns []string `json:"columns" yaml:"columns"`
35+
Rows []map[string]any `json:"rows" yaml:"rows"`
36+
}
37+
38+
type statementResultData struct {
39+
Headers []string
40+
Rows [][]string
41+
}
42+
43+
func printStatementResults(cmd *cobra.Command, data *statementResultData) error {
44+
if data == nil || len(data.Rows) == 0 {
45+
if output.GetFormat(cmd).IsSerialized() {
46+
headers := []string{}
47+
if data != nil {
48+
headers = data.Headers
49+
}
50+
return output.SerializedOutput(cmd, &serializedResultOutput{
51+
Columns: headers,
52+
Rows: []map[string]any{},
53+
})
54+
}
55+
fmt.Fprintln(cmd.OutOrStdout(), "No results found.")
56+
return nil
57+
}
58+
59+
if output.GetFormat(cmd).IsSerialized() {
60+
rows := make([]map[string]any, len(data.Rows))
61+
for i, row := range data.Rows {
62+
rowMap := make(map[string]any)
63+
for j, val := range row {
64+
if j < len(data.Headers) {
65+
rowMap[data.Headers[j]] = val
66+
}
67+
}
68+
rows[i] = rowMap
69+
}
70+
return output.SerializedOutput(cmd, &serializedResultOutput{
71+
Columns: data.Headers,
72+
Rows: rows,
73+
})
74+
}
75+
76+
table := tablewriter.NewWriter(cmd.OutOrStdout())
77+
table.SetAutoFormatHeaders(false)
78+
table.SetHeader(data.Headers)
79+
table.SetAutoWrapText(false)
80+
table.SetBorder(false)
81+
82+
for _, row := range data.Rows {
83+
table.Append(row)
84+
}
85+
86+
table.Render()
87+
return nil
88+
}
89+
90+
func fetchAllResults(client ccloudv2.GatewayClientInterface, envId, name, orgId string, schema flinkgatewayv1.SqlV1ResultSchema, maxRows int) (*statementResultData, error) {
91+
columns := schema.GetColumns()
92+
headers := make([]string, len(columns))
93+
for i, col := range columns {
94+
headers[i] = col.GetName()
95+
}
96+
97+
var allRows [][]string
98+
pageToken := ""
99+
100+
for {
101+
resp, err := client.GetStatementResults(envId, name, orgId, pageToken)
102+
if err != nil {
103+
return nil, err
104+
}
105+
106+
rawData := resp.Results.GetData()
107+
for _, item := range rawData {
108+
resultItem, ok := item.(map[string]any)
109+
if !ok {
110+
continue
111+
}
112+
rowFields, _ := resultItem["row"].([]any)
113+
row := make([]string, len(headers))
114+
for j, field := range rowFields {
115+
if j < len(headers) {
116+
row[j] = fmt.Sprintf("%v", field)
117+
}
118+
}
119+
allRows = append(allRows, row)
120+
}
121+
122+
if maxRows > 0 && len(allRows) >= maxRows {
123+
allRows = allRows[:maxRows]
124+
break
125+
}
126+
127+
nextUrl := resp.Metadata.GetNext()
128+
nextToken, err := extractResultPageToken(nextUrl)
129+
if err != nil {
130+
return nil, err
131+
}
132+
if nextToken == "" {
133+
break
134+
}
135+
pageToken = nextToken
136+
}
137+
138+
return &statementResultData{
139+
Headers: headers,
140+
Rows: allRows,
141+
}, nil
142+
}
143+
144+
func extractResultPageToken(nextUrl string) (string, error) {
145+
if nextUrl == "" {
146+
return "", nil
147+
}
148+
parsed, err := url.Parse(nextUrl)
149+
if err != nil {
150+
return "", err
151+
}
152+
params, err := url.ParseQuery(parsed.RawQuery)
153+
if err != nil {
154+
return "", err
155+
}
156+
return params.Get("page_token"), nil
157+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package flink
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/spf13/cobra"
8+
9+
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
10+
"github.com/confluentinc/cli/v4/pkg/retry"
11+
)
12+
13+
func (c *command) newStatementResultListCommand() *cobra.Command {
14+
cmd := &cobra.Command{
15+
Use: "list <statement-name>",
16+
Short: "List results for a Flink SQL statement.",
17+
Args: cobra.ExactArgs(1),
18+
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStatementArgs),
19+
RunE: c.statementResultList,
20+
}
21+
22+
pcmd.AddCloudFlag(cmd)
23+
pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand)
24+
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
25+
pcmd.AddContextFlag(cmd, c.CLICommand)
26+
pcmd.AddOutputFlag(cmd)
27+
cmd.Flags().Bool("wait", false, "Block until the statement is no longer pending before fetching results.")
28+
cmd.Flags().Int("max-rows", 100, "Maximum number of result rows to fetch.")
29+
30+
return cmd
31+
}
32+
33+
func (c *command) statementResultList(cmd *cobra.Command, args []string) error {
34+
environmentId, err := c.Context.EnvironmentId()
35+
if err != nil {
36+
return err
37+
}
38+
39+
client, err := c.GetFlinkGatewayClient(false)
40+
if err != nil {
41+
return err
42+
}
43+
44+
statement, err := client.GetStatement(environmentId, args[0], c.Context.GetCurrentOrganization())
45+
if err != nil {
46+
return err
47+
}
48+
49+
phase := statement.Status.GetPhase()
50+
51+
if phase == "FAILED" {
52+
return fmt.Errorf("statement %q has failed: %s", args[0], statement.Status.GetDetail())
53+
}
54+
55+
if phase == "PENDING" {
56+
wait, err := cmd.Flags().GetBool("wait")
57+
if err != nil {
58+
return err
59+
}
60+
if !wait {
61+
return fmt.Errorf("statement %q is still pending, use --wait to wait for it to complete", args[0])
62+
}
63+
64+
err = retry.Retry(time.Second, 2*time.Minute, func() error {
65+
statement, err = client.GetStatement(environmentId, args[0], c.Context.GetCurrentOrganization())
66+
if err != nil {
67+
return err
68+
}
69+
if statement.Status.GetPhase() == "PENDING" {
70+
return fmt.Errorf(`statement phase is "%s"`, statement.Status.GetPhase())
71+
}
72+
return nil
73+
})
74+
if err != nil {
75+
return err
76+
}
77+
78+
if statement.Status.GetPhase() == "FAILED" {
79+
return fmt.Errorf("statement %q has failed: %s", args[0], statement.Status.GetDetail())
80+
}
81+
}
82+
83+
traits := statement.Status.GetTraits()
84+
schema := traits.GetSchema()
85+
columns := schema.GetColumns()
86+
if len(columns) == 0 {
87+
fmt.Fprintln(cmd.OutOrStdout(), "Statement has no results to display.")
88+
return nil
89+
}
90+
91+
maxRows, err := cmd.Flags().GetInt("max-rows")
92+
if err != nil {
93+
return err
94+
}
95+
96+
statementResults, err := fetchAllResults(client, environmentId, args[0], c.Context.GetCurrentOrganization(), schema, maxRows)
97+
if err != nil {
98+
return err
99+
}
100+
101+
if err := printStatementResults(cmd, statementResults); err != nil {
102+
return err
103+
}
104+
105+
if maxRows > 0 && len(statementResults.Rows) >= maxRows {
106+
fmt.Fprintf(cmd.ErrOrStderr(), "Warning: results truncated at %d rows. Use --max-rows to adjust the limit.\n", maxRows)
107+
}
108+
109+
return nil
110+
}
Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
+---------------+-------------------------------+
2-
| Creation Date | 2022-01-01 00:00:00 +0000 UTC |
3-
| Name | my-statement |
4-
| Statement | CREATE TABLE test; |
5-
| Compute Pool | lfcp-123456 |
6-
| Status | COMPLETED |
7-
| Status Detail | SQL statement is completed |
8-
+---------------+-------------------------------+
1+
database_name
2+
-----------------
3+
my-cluster
4+
other-cluster

test/fixtures/output/flink/statement/help.golden

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Available Commands:
99
describe Describe a Flink SQL statement.
1010
exception Manage Flink SQL statement exceptions.
1111
list List Flink SQL statements.
12+
result Manage Flink SQL statement results.
1213
resume Resume a Flink SQL statement.
1314
stop Stop a Flink SQL statement.
1415
update Update a Flink SQL statement.
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
Manage Flink SQL statement results.
2+
3+
Usage:
4+
confluent flink statement result [command]
5+
6+
Available Commands:
7+
list List results for a Flink SQL statement.
8+
9+
Flags:
10+
--cloud string Specify the cloud provider as "aws", "azure", or "gcp".
11+
--region string Cloud region for Flink (use "confluent flink region list" to see all).
12+
13+
Global Flags:
14+
-h, --help Show help for this command.
15+
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
16+
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).
17+
18+
Use "confluent flink statement result [command] --help" for more information about a command.
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
List results for a Flink SQL statement.
2+
3+
Usage:
4+
confluent flink statement result list <statement-name> [flags]
5+
6+
Flags:
7+
--cloud string Specify the cloud provider as "aws", "azure", or "gcp".
8+
--region string Cloud region for Flink (use "confluent flink region list" to see all).
9+
--environment string Environment ID.
10+
--context string CLI context name.
11+
-o, --output string Specify the output format as "human", "json", or "yaml". (default "human")
12+
--wait Block until the statement is no longer pending before fetching results.
13+
--max-rows int Maximum number of result rows to fetch. (default 100)
14+
15+
Global Flags:
16+
-h, --help Show help for this command.
17+
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
18+
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026`
2+
{
3+
"columns": ["database_name"],
4+
"rows": [
5+
{
6+
"database_name": "my-cluster"
7+
},
8+
{
9+
"database_name": "other-cluster"
10+
}
11+
]
12+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026`
2+
columns:
3+
- database_name
4+
rows:
5+
- database_name: my-cluster
6+
- database_name: other-cluster

0 commit comments

Comments
 (0)