-
Notifications
You must be signed in to change notification settings - Fork 857
feat: sql jobs outputting to s3 + streaming for high-number of rows #5704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
3d509a1 to
f29110e
Compare
Deploying windmill with
|
| Latest commit: |
58d5bd8
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://6329338a.windmill.pages.dev |
| Branch Preview URL: | https://di-stream-sql-to-s3.windmill.pages.dev |
095178e to
62df7af
Compare
84ecc8d to
3a58cb8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Changes requested ❌
Reviewed everything up to 3a58cb8 in 3 minutes and 11 seconds. Click for details.
- Reviewed
1278lines of code in12files - Skipped
0files when reviewing. - Skipped posting
16draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. backend/windmill-worker/src/worker.rs:768
- Draft comment:
The run_worker loop is very long and has deeply nested and complex logic. Consider refactoring into smaller helper functions to improve readability and maintainability. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
2. backend/windmill-worker/src/worker.rs:669
- Draft comment:
Avoid hardcoding system paths such as '/proc/sys/vm/drop_caches'. Prefer making such paths configurable or using a dedicated abstraction so that the code works under varied environments and proper privileges can be managed. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
3. backend/windmill-worker/src/worker.rs:1010
- Draft comment:
There are several unwraps (or expect calls) in metrics registration and similar parts. In production code these can lead to panics when the expected environment assumptions (e.g. metric registration) are not met. Consider using proper error propagation instead. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
4. backend/windmill-worker/src/worker.rs:2920
- Draft comment:
The language‐specific job handling in handle_code_execution_job is very long. Consider moving each language branch into separate modules or helper functions to reduce the size of the match block and improve modularity. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
5. backend/windmill-worker/src/worker.rs:1270
- Draft comment:
The handling of same_worker jobs via try_recv and subsequent branching is quite intricate. Abstracting this logic into a dedicated helper function would make it easier to follow and test. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
6. backend/windmill-worker/src/mssql_executor.rs:480
- Draft comment:
When using S3 mode (branch starting at line 204), ensure that errors from the async stream are properly handled and that any temporary resources (like files) are cleaned up even on error. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
7. backend/windmill-worker/src/pg_executor.rs:137
- Draft comment:
When extracting column order from query results (lines 135–144), use explicit error handling instead of unwrap_or_default to catch cases of missing schema information. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
8. backend/windmill-worker/src/worker.rs:80
- Draft comment:
The run_worker function is very long and handles many responsibilities (job pulling, metrics, logging, scheduling, etc.). Consider refactoring it into smaller, modular components to improve readability and maintainability. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
9. backend/windmill-worker/src/mysql_executor.rs:340
- Draft comment:
The conversion functions (e.g. string_date_to_mysql_date) use unwrap_or_default() which may mask parsing errors. Consider returning a proper error when an input date string is invalid instead of defaulting silently. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
10. backend/windmill-worker/src/worker.rs:387
- Draft comment:
In AuthedClient::get_resource_value and similar methods, the response header values are constructed using unwrap_or defaults. Consider more robust error handling to avoid silently passing invalid header or token values. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
11. backend/windmill-common/src/s3_helpers.rs:497
- Draft comment:
Typo: In the comment above the RecordBatchWriterEnum (line 497), consider changing 'Originally used a Arc<Mutex<dyn RecordBatchWriter + Send>>' to 'Originally used an Arc<Mutex<dyn RecordBatchWriter + Send>>' for proper grammar. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 0% vs. threshold = 50% While the grammar correction is technically correct, comments about pure grammar/spelling in comments are generally not important enough to warrant a PR comment. The meaning is perfectly clear either way. This falls under the "Do NOT make comments that are obvious or unimportant" rule. The grammar error could theoretically make the code less professional or harder to maintain, and fixing it is very low effort. The impact on maintainability is negligible - developers will understand the meaning perfectly well either way. We should focus PR comments on actual code issues. Delete this comment as it's too minor and doesn't affect code functionality or maintainability in any meaningful way.
12. backend/windmill-worker/Cargo.toml:37
- Draft comment:
Typo in comment on line 37: Consider replacing 'audit-worth actions' with 'audit-worthy actions' for clarity. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
13. backend/windmill-worker/src/bigquery_executor.rs:43
- Draft comment:
The struct name 'BigQueryResponseJobReference' uses a different capitalization (with an uppercase 'Q') compared to the other related structs (e.g. 'BigqueryResponse', 'BigqueryResponseRow', etc.). For consistency and clarity, consider aligning the capitalization (either use 'Bigquery' or 'BigQuery' consistently across the file). - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 20% vs. threshold = 50% While this is technically correct about the inconsistency, it's a very minor code style issue that doesn't affect functionality. The comment is about a newly added struct in the diff, so it is about changes. The inconsistency isn't resolved yet. However, this kind of minor style issue may not be worth a PR comment - it's not clearly wrong enough to require a change. The inconsistency could cause confusion for future developers trying to understand the code structure. However, both "BigQuery" and "Bigquery" are commonly used variations of the product name. While consistency is good, this is an extremely minor issue that doesn't impact functionality or significantly hurt readability. The effort of changing it may not be worth the benefit. Delete this comment as it's too minor of a style issue to warrant a PR comment. The inconsistency, while real, doesn't significantly impact code quality or maintainability.
14. backend/windmill-worker/src/pg_executor.rs:90
- Draft comment:
Typo: The error message 'Missing otzyp for pg arg' seems to be a misspelling. Consider correcting it to 'Missing otyp for pg arg' or a more accurate message. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
15. backend/windmill-worker/src/pg_executor.rs:158
- Draft comment:
Typo: In the error message comparing query size, the code uses 'MAX_RESULT_SIZE & 4' which likely is a typo. It should be 'MAX_RESULT_SIZE * 4' to match the condition check. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
16. backend/windmill-worker/src/worker.rs:683
- Draft comment:
Typographical error: The word 'spurrious' should be corrected to 'spurious' in the error message within the drop_cache function. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
Workflow ID: wflow_frBtlmPAupSm1nFJ
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
| // let rows_stream = | ||
| // rows_stream.map(|r| serde_json::value::to_value(&r?).map_err(to_anyhow)); | ||
| // let stream = convert_json_line_stream(rows_stream.boxed(), s3.format).await?; | ||
| // TODO fix this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is commented-out S3 streaming code with a TODO (lines 247–251). Please either complete the implementation or remove the commented code to avoid confusion.
| // TODO fix this |
| }; | ||
|
|
||
| let stream = convert_json_line_stream(rows_stream.boxed(), s3.format).await?; | ||
| s3.upload(stream.boxed()).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the S3 branch (starting around line 77), consider adding more detailed logging and stronger error messages when executing query streaming, so that S3 upload failures can be diagnosed more easily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i yield error in stream and everything is awaited and checked with ? operator
| static ref RE_NONEMPTY_SQL_BLOCK: Regex = Regex::new(r#"(?m)^\s*[^\s](?:[^-]|$)"#).unwrap(); | ||
|
|
||
| static ref RE_DB: Regex = Regex::new(r#"(?m)^-- database (\S+) *(?:\r|\n|$)"#).unwrap(); | ||
| static ref RE_S3_MODE: Regex = Regex::new(r#"(?m)^-- s3( (.+))?*(?:\r|\n|$)"#).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The regex for RE_S3_MODE is defined as:
Regex::new(r"(?m)^-- s3( (.+))?*(?:\r|\n|$)")
The * after the optional group appears redundant and may be a typographical error. Consider revising this pattern (e.g., possibly using (?m)^-- s3( (.+))?(?:\r|\n|$) instead) to ensure it behaves as expected.
| static ref RE_S3_MODE: Regex = Regex::new(r#"(?m)^-- s3( (.+))?*(?:\r|\n|$)"#).unwrap(); | |
| static ref RE_S3_MODE: Regex = Regex::new(r#"(?m)^-- s3( (.+))?(?:\r|\n|$)"#).unwrap(); |
3bd2d74 to
8b87ae4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important
Looks good to me! 👍
Reviewed 8b87ae4 in 1 minute and 45 seconds. Click for details.
- Reviewed
66lines of code in1files - Skipped
0files when reviewing. - Skipped posting
8draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. backend/windmill-worker/src/snowflake_executor.rs:202
- Draft comment:
The cloning of account_identifier and token for use with reqwest’s stream is acceptable, though be mindful of the overhead. The inline comment explains the necessity; consider whether documenting the potential performance implications for very high-throughput streaming might help future maintainers. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
2. backend/windmill-worker/src/snowflake_executor.rs:244
- Draft comment:
The map_ok closure now uses a move lambda to capture its environment. Make sure that all captured variables (e.g. response.resultSetMetaData.rowType) remain valid throughout the stream’s lifetime. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
3. backend/windmill-worker/src/snowflake_executor.rs:254
- Draft comment:
The previously commented-out code for S3 uploading has now been uncommented and integrated. Verify that convert_json_line_stream handles all expected JSON formatting edge cases and that error propagation from s3.upload is properly handled. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
4. backend/windmill-worker/src/snowflake_executor.rs:12
- Draft comment:
Good to see the new import for the S3 stream conversion helper. Ensure that its behavior is well-tested and aligns with other s3_helpers utilities. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
5. backend/windmill-worker/src/snowflake_executor.rs:199
- Draft comment:
Cloning the account_identifier and token to satisfy the 'static lifetime needed by reqwest::Body::wrap_stream is a workable workaround. In the future, consider using Arc to avoid multiple clones if these strings become large. - Reason this comment was not posted:
Confidence changes required:33%<= threshold50%None
6. backend/windmill-worker/src/snowflake_executor.rs:217
- Draft comment:
The updated URL builder now uses the cloned_account_identifier and cloned_token consistently for partition requests. This correctly addresses the lifetime issue for async streaming. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
7. backend/windmill-worker/src/snowflake_executor.rs:233
- Draft comment:
Adding the 'move' keyword to the map_ok closure ensures that the captured environment (including response fields) is moved into the closure. Verify that this does not hold on to heavier state than needed. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
8. backend/windmill-worker/src/snowflake_executor.rs:252
- Draft comment:
The previously commented-out S3 upload block is now active. This code maps each row to a JSON value, converts the stream using convert_json_line_stream, and uploads the stream to S3. Ensure that any errors during conversion or upload are logged and handled appropriately. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
Workflow ID: wflow_RblBPwpK3tEDcw5o
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important
Looks good to me! 👍
Reviewed 83906ea in 1 minute and 40 seconds. Click for details.
- Reviewed
142lines of code in1files - Skipped
0files when reviewing. - Skipped posting
10draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. backend/windmill-common/src/s3_helpers.rs:1
- Draft comment:
Imports have been re-ordered and duplicated under #[cfg(feature = "parquet")]. Consider consolidating these to avoid redundant declarations. - Reason this comment was not posted:
Confidence changes required:30%<= threshold50%None
2. backend/windmill-common/src/s3_helpers.rs:24
- Draft comment:
The new ChannelWriter implementation using blocking_send is acceptable but be cautious when integrating synchronous blocking operations in async contexts. - Reason this comment was not posted:
Confidence changes required:30%<= threshold50%None
3. backend/windmill-common/src/s3_helpers.rs:562
- Draft comment:
The two versions of convert_json_line_stream (#[cfg(not(feature = "parquet"))] and #[cfg(feature = "parquet")]) share the same name. Ensure usage is clear and documented to prevent confusion. - Reason this comment was not posted:
Confidence changes required:40%<= threshold50%None
4. backend/windmill-common/src/s3_helpers.rs:537
- Draft comment:
Removal of the duplicate ChannelWriter definition is good; please ensure no lingering unused code paths remain. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
5. backend/windmill-common/src/s3_helpers.rs:1
- Draft comment:
Good cleanup: the imports for 'to_anyhow' and 'rd_string' are now conditionally included under the 'parquet' feature. This ensures they’re only imported when needed. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
6. backend/windmill-common/src/s3_helpers.rs:19
- Draft comment:
Combining 'Arc' and 'Mutex' into a single import is clearer and follows best practices. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
7. backend/windmill-common/src/s3_helpers.rs:508
- Draft comment:
Defining RecordBatchWriterEnum under #[cfg(feature = "parquet")] limits its scope appropriately to the parquet feature. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
8. backend/windmill-common/src/s3_helpers.rs:534
- Draft comment:
The new ChannelWriter struct and its Write implementation are well organized. Note that using blocking_send is acceptable in a blocking context—but be mindful if the channel saturates. - Reason this comment was not posted:
Confidence changes required:33%<= threshold50%None
9. backend/windmill-common/src/s3_helpers.rs:560
- Draft comment:
Adding a #[cfg(not(feature = "parquet"))] version of convert_json_line_stream that yields an error clearly informs callers when the parquet feature isn’t enabled. Ensure that any caller gracefully handles this error stream. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
10. backend/windmill-common/src/s3_helpers.rs:638
- Draft comment:
Removal of the duplicate ChannelWriter definition at the end reduces maintenance overhead and avoids potential inconsistencies. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
Workflow ID: wflow_F2OvSs6bJNqVPovm
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
f8d6a82 to
af7468d
Compare
| Ok(mut result) => { | ||
| while let Some(row) = result.next().await? { | ||
| yield Ok(convert_row_to_value(row)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling in this stream needs improvement. The current implementation uses result.next().await? which will return early from the async block if an error occurs, but won't properly propagate the error through the stream.
Consider restructuring to handle errors explicitly:
while let Some(row_result) = result.next().await {
match row_result {
Ok(row) => yield Ok(convert_row_to_value(row)),
Err(e) => yield Err(anyhow!("Error fetching row: {:?}", e))
}
}This ensures errors are properly propagated through the stream rather than terminating it prematurely.
| Ok(mut result) => { | |
| while let Some(row) = result.next().await? { | |
| yield Ok(convert_row_to_value(row)); | |
| } | |
| Ok(mut result) => { | |
| while let Some(row_result) = result.next().await { | |
| match row_result { | |
| Ok(row) => yield Ok(convert_row_to_value(row)), | |
| Err(e) => yield Err(anyhow!("Error fetching row: {:?}", e)) | |
| } | |
| } |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important
Looks good to me! 👍
Reviewed 05283a8 in 1 minute and 39 seconds. Click for details.
- Reviewed
26lines of code in2files - Skipped
0files when reviewing. - Skipped posting
5draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. backend/windmill-worker/src/bigquery_executor.rs:4
- Draft comment:
Removed TryFutureExt from the futures import. Verify that no methods from TryFutureExt are used elsewhere to avoid breaking changes. - Reason this comment was not posted:
Comment did not seem useful. Confidence is useful =0%<= threshold50%The comment is asking the PR author to verify that no methods fromTryFutureExtare used elsewhere, which is against the rules as it asks for confirmation and verification. It does not provide a specific suggestion or point out a specific issue.
2. backend/windmill-worker/src/snowflake_executor.rs:5
- Draft comment:
Removed TryFutureExt from the futures import. Ensure that removal doesn't affect any needed extensions; the use of TryStreamExt remains for stream processing. - Reason this comment was not posted:
Comment did not seem useful. Confidence is useful =30%<= threshold50%The comment is asking the author to ensure that the removal ofTryFutureExtdoesn't affect any needed extensions. This is a request for confirmation, which violates the rule against asking the PR author to confirm their intention or ensure behavior. However, it does provide a specific context about the removal ofTryFutureExtand the continued use ofTryStreamExt, which could be useful. Despite this, the primary focus is on ensuring the removal doesn't affect anything, which is not allowed.
3. backend/windmill-worker/src/bigquery_executor.rs:3
- Draft comment:
Removed the unused importTryFutureExt; this cleans up dependency clutter. Please double-check that no methods from this trait are needed elsewhere. - Reason this comment was not posted:
Comment did not seem useful. Confidence is useful =0%<= threshold50%The comment asks the PR author to double-check that no methods from the removed import are needed elsewhere. This violates the rule against asking the author to double-check things. The comment does not provide a specific suggestion or point out a specific issue, so it should be removed.
4. backend/windmill-worker/src/snowflake_executor.rs:4
- Draft comment:
Removed the unused importTryFutureExtwhile keepingTryStreamExt, which helps with stream error handling. This appears to be a tidy cleanup—ensure there is no dependency on TryFutureExt methods. - Reason this comment was not posted:
Comment looked like it was already resolved.
5. backend/windmill-worker/src/bigquery_executor.rs:32
- Draft comment:
There is an inconsistency in the casing used in the names of the BigQuery-related structs. For example, the file definesBigqueryResponse(with a lowercase 'q') but then definesBigQueryResponseJobReference(with an uppercase 'Q'). For clarity and consistency, consider standardizing the spelling (either using 'Bigquery' or 'BigQuery') across the file. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
Workflow ID: wflow_r90pF8ihZ07hKHfN
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important
Looks good to me! 👍
Reviewed cde77fe in 1 minute and 17 seconds. Click for details.
- Reviewed
39lines of code in1files - Skipped
0files when reviewing. - Skipped posting
3draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. backend/windmill-worker/src/mysql_executor.rs:80
- Draft comment:
Returning immediately after yielding an error is clear; ensure downstream consumers handle early termination properly. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
2. backend/windmill-worker/src/mysql_executor.rs:88
- Draft comment:
The loop over rows with match is explicit but consider if awhile letpattern could simplify error handling. - Reason this comment was not posted:
Confidence changes required:33%<= threshold50%None
3. backend/windmill-worker/src/mysql_executor.rs:80
- Draft comment:
Holding the connection lock for the entire streaming loop may lead to contention; verify that this is acceptable under load. - Reason this comment was not posted:
Confidence changes required:33%<= threshold50%None
Workflow ID: wflow_hwgPcuxsXOMUB4QX
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
Add an annotation to stream the results of an SQL query to S3:
All arguments are optional
Supported formats: csv, json and parquet.
I did not use the annotations macro from @pyranota because it only works as a flag and does not parse arguments (maybe a future improvement to make?)
Important
Adds functionality to stream SQL query results to S3 with support for multiple formats and integrates this feature across various SQL executors.
prefix,storage, andformatinwindmill-parser-sql/src/lib.rs.csv,json, andparquetformats.s3_helpers.rs.bigquery_executor.rs,mssql_executor.rs,mysql_executor.rs,pg_executor.rs, andsnowflake_executor.rsto handle S3 streaming.datafusion,tokio-stream, andtokio-utilinCargo.tomlfiles.S3ModeWorkerDatastruct incommon.rsfor managing S3 upload configurations.upload_s3_filemethod inworker.rsfor handling S3 uploads.This description was created by
for cde77fe. You can customize this summary. It will automatically update as commits are pushed.