-
Notifications
You must be signed in to change notification settings - Fork 22
[PYF-260] Add flink statement result list command and display results on create --wait #3261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,167 @@ | ||
| package flink | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "net/url" | ||
|
|
||
| "github.com/olekukonko/tablewriter" | ||
| "github.com/spf13/cobra" | ||
|
|
||
| flinkgatewayv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway/v1" | ||
|
|
||
| "github.com/confluentinc/cli/v4/pkg/ccloudv2" | ||
| pcmd "github.com/confluentinc/cli/v4/pkg/cmd" | ||
| "github.com/confluentinc/cli/v4/pkg/config" | ||
| "github.com/confluentinc/cli/v4/pkg/output" | ||
| ) | ||
|
|
||
| const defaultMaxResultRows = 100 | ||
|
|
||
| func (c *command) newStatementResultCommand(cfg *config.Config) *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "result", | ||
| Short: "Manage Flink SQL statement results.", | ||
| } | ||
|
|
||
| if cfg.IsCloudLogin() { | ||
| pcmd.AddCloudFlag(cmd) | ||
| pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) | ||
| cmd.AddCommand(c.newStatementResultListCommand()) | ||
| } | ||
|
|
||
| return cmd | ||
| } | ||
|
|
||
| type serializedResultOutput struct { | ||
| Columns []string `json:"columns" yaml:"columns"` | ||
| Rows []map[string]any `json:"rows" yaml:"rows"` | ||
| } | ||
|
|
||
| type statementResultData struct { | ||
| Headers []string | ||
| Rows [][]string | ||
| } | ||
|
|
||
| func printStatementResults(cmd *cobra.Command, data *statementResultData) error { | ||
| if data == nil || len(data.Rows) == 0 { | ||
| if output.GetFormat(cmd).IsSerialized() { | ||
| headers := []string{} | ||
| if data != nil { | ||
| headers = data.Headers | ||
| } | ||
| return output.SerializedOutput(cmd, &serializedResultOutput{ | ||
| Columns: headers, | ||
| Rows: []map[string]any{}, | ||
| }) | ||
| } | ||
| fmt.Fprintln(cmd.OutOrStdout(), "No results found.") | ||
| return nil | ||
| } | ||
|
|
||
| if output.GetFormat(cmd).IsSerialized() { | ||
| rows := make([]map[string]any, len(data.Rows)) | ||
| for i, row := range data.Rows { | ||
| rowMap := make(map[string]any) | ||
| for j, val := range row { | ||
| if j < len(data.Headers) { | ||
| rowMap[data.Headers[j]] = val | ||
| } | ||
| } | ||
| rows[i] = rowMap | ||
| } | ||
| return output.SerializedOutput(cmd, &serializedResultOutput{ | ||
| Columns: data.Headers, | ||
| Rows: rows, | ||
| }) | ||
| } | ||
|
|
||
| table := tablewriter.NewWriter(cmd.OutOrStdout()) | ||
| table.SetAutoFormatHeaders(false) | ||
| table.SetHeader(data.Headers) | ||
| table.SetAutoWrapText(false) | ||
| table.SetBorder(false) | ||
|
|
||
| for _, row := range data.Rows { | ||
| table.Append(row) | ||
| } | ||
|
|
||
| table.Render() | ||
| return nil | ||
| } | ||
|
|
||
| func fetchAllResults(client ccloudv2.GatewayClientInterface, envId, name, orgId string, schema flinkgatewayv1.SqlV1ResultSchema, maxRows int) (*statementResultData, error) { | ||
| columns := schema.GetColumns() | ||
| headers := make([]string, len(columns)) | ||
| for i, col := range columns { | ||
| headers[i] = col.GetName() | ||
| } | ||
|
|
||
| var allRows [][]string | ||
| pageToken := "" | ||
|
|
||
| for { | ||
| resp, err := client.GetStatementResults(envId, name, orgId, pageToken) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| resultSet := resp.GetResults() | ||
| rawData := resultSet.GetData() | ||
| for _, item := range rawData { | ||
| resultItem, ok := item.(map[string]any) | ||
| if !ok { | ||
| continue | ||
| } | ||
| rowFields, _ := resultItem["row"].([]any) | ||
| row := make([]string, len(headers)) | ||
| for j, field := range rowFields { | ||
| if j < len(headers) { | ||
| row[j] = fieldToString(field) | ||
| } | ||
| } | ||
| allRows = append(allRows, row) | ||
| } | ||
|
|
||
| if maxRows > 0 && len(allRows) >= maxRows { | ||
| allRows = allRows[:maxRows] | ||
| break | ||
| } | ||
|
|
||
| nextUrl := resp.Metadata.GetNext() | ||
| nextToken, err := extractResultPageToken(nextUrl) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if nextToken == "" { | ||
| break | ||
| } | ||
| pageToken = nextToken | ||
| } | ||
|
|
||
| return &statementResultData{ | ||
| Headers: headers, | ||
| Rows: allRows, | ||
| }, nil | ||
| } | ||
|
|
||
| func fieldToString(field any) string { | ||
| if field == nil { | ||
| return "NULL" | ||
| } | ||
| return fmt.Sprintf("%v", field) | ||
| } | ||
|
|
||
| func extractResultPageToken(nextUrl string) (string, error) { | ||
| if nextUrl == "" { | ||
| return "", nil | ||
| } | ||
| parsed, err := url.Parse(nextUrl) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| params, err := url.ParseQuery(parsed.RawQuery) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| return params.Get("page_token"), nil | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| package flink | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "time" | ||
|
|
||
| "github.com/spf13/cobra" | ||
|
|
||
| pcmd "github.com/confluentinc/cli/v4/pkg/cmd" | ||
| "github.com/confluentinc/cli/v4/pkg/retry" | ||
| ) | ||
|
|
||
| func (c *command) newStatementResultListCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "list <statement-name>", | ||
| Short: "List results for a Flink SQL statement.", | ||
| Args: cobra.ExactArgs(1), | ||
| ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStatementArgs), | ||
| RunE: c.statementResultList, | ||
| } | ||
|
|
||
| pcmd.AddCloudFlag(cmd) | ||
| pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddContextFlag(cmd, c.CLICommand) | ||
| pcmd.AddOutputFlag(cmd) | ||
| cmd.Flags().Bool("wait", false, "Block until the statement is no longer pending before fetching results.") | ||
| cmd.Flags().Int("max-rows", defaultMaxResultRows, "Maximum number of result rows to fetch.") | ||
|
|
||
| return cmd | ||
| } | ||
|
|
||
| func (c *command) statementResultList(cmd *cobra.Command, args []string) error { | ||
| environmentId, err := c.Context.EnvironmentId() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| client, err := c.GetFlinkGatewayClient(false) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| statement, err := client.GetStatement(environmentId, args[0], c.Context.GetCurrentOrganization()) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| phase := statement.Status.GetPhase() | ||
|
|
||
| if phase == "FAILED" { | ||
| return fmt.Errorf("statement %q has failed: %s", args[0], statement.Status.GetDetail()) | ||
| } | ||
|
|
||
| if phase == "PENDING" { | ||
| wait, err := cmd.Flags().GetBool("wait") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if !wait { | ||
| return fmt.Errorf("statement %q is still pending, use --wait to wait for it to complete", args[0]) | ||
| } | ||
|
|
||
| err = retry.Retry(time.Second, 2*time.Minute, func() error { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this intentional that the timeout is 2x longer for result list compared to create? Otherwise it looks like an inconsistency. |
||
| statement, err = client.GetStatement(environmentId, args[0], c.Context.GetCurrentOrganization()) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if statement.Status.GetPhase() == "PENDING" { | ||
| return fmt.Errorf(`statement phase is "%s"`, statement.Status.GetPhase()) | ||
| } | ||
| return nil | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if statement.Status.GetPhase() == "FAILED" { | ||
| return fmt.Errorf("statement %q has failed: %s", args[0], statement.Status.GetDetail()) | ||
| } | ||
| } | ||
|
|
||
| traits := statement.Status.GetTraits() | ||
| schema := traits.GetSchema() | ||
| columns := schema.GetColumns() | ||
| if len(columns) == 0 { | ||
| fmt.Fprintln(cmd.OutOrStdout(), "Statement has no results to display.") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When a DDL statement has no schema (empty columns), the code prints a plain text message regardless of --output flag. |
||
| return nil | ||
| } | ||
|
|
||
| if !traits.GetIsBounded() { | ||
| return fmt.Errorf("statement %q is unbounded and may produce results indefinitely; use the Flink shell to view results from streaming queries", args[0]) | ||
| } | ||
|
|
||
| maxRows, err := cmd.Flags().GetInt("max-rows") | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we add a check to make sure max-rows is positive value? |
||
|
|
||
| statementResults, err := fetchAllResults(client, environmentId, args[0], c.Context.GetCurrentOrganization(), schema, maxRows) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if err := printStatementResults(cmd, statementResults); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if maxRows > 0 && len(statementResults.Rows) >= maxRows { | ||
| fmt.Fprintf(cmd.ErrOrStderr(), "Warning: results truncated at %d rows. Use --max-rows to adjust the limit.\n", maxRows) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,4 @@ | ||
| +---------------+-------------------------------+ | ||
| | Creation Date | 2022-01-01 00:00:00 +0000 UTC | | ||
| | Name | my-statement | | ||
| | Statement | CREATE TABLE test; | | ||
| | Compute Pool | lfcp-123456 | | ||
| | Status | COMPLETED | | ||
| | Status Detail | SQL statement is completed | | ||
| +---------------+-------------------------------+ | ||
| database_name | ||
| ----------------- | ||
| my-cluster | ||
| other-cluster |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| Manage Flink SQL statement results. | ||
|
|
||
| Usage: | ||
| confluent flink statement result [command] | ||
|
|
||
| Available Commands: | ||
| list List results for a Flink SQL statement. | ||
|
|
||
| Flags: | ||
| --cloud string Specify the cloud provider as "aws", "azure", or "gcp". | ||
| --region string Cloud region for Flink (use "confluent flink region list" to see all). | ||
|
|
||
| Global Flags: | ||
| -h, --help Show help for this command. | ||
| --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. | ||
| -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). | ||
|
|
||
| Use "confluent flink statement result [command] --help" for more information about a command. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026` | ||
| Error: statement "failed-statement" has failed: Syntax error in SQL statement |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| List results for a Flink SQL statement. | ||
|
|
||
| Usage: | ||
| confluent flink statement result list <statement-name> [flags] | ||
|
|
||
| Flags: | ||
| --cloud string Specify the cloud provider as "aws", "azure", or "gcp". | ||
| --region string Cloud region for Flink (use "confluent flink region list" to see all). | ||
| --environment string Environment ID. | ||
| --context string CLI context name. | ||
| -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") | ||
| --wait Block until the statement is no longer pending before fetching results. | ||
| --max-rows int Maximum number of result rows to fetch. (default 100) | ||
|
|
||
| Global Flags: | ||
| -h, --help Show help for this command. | ||
| --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. | ||
| -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026` | ||
| { | ||
| "columns": ["database_name"], | ||
| "rows": [ | ||
| { | ||
| "database_name": "my-cluster" | ||
| }, | ||
| { | ||
| "database_name": "other-cluster" | ||
| } | ||
| ] | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026` | ||
| Statement has no results to display. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026` | ||
| Error: statement "pending-statement" is still pending, use --wait to wait for it to complete |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026` | ||
| Error: statement "unbounded-statement" is unbounded and may produce results indefinitely; use the Flink shell to view results from streaming queries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there's no --max-rows support on
create --waitThe logic hardcodes defaultMaxResultRows (100) and doesn't print a truncation warning. If a query returns >100 rows, the output is silently truncated. Compare with result list which both allows --max-rows and prints a truncation warning to stderr. Could we either: