Skip to content

Commit b6e9269

Browse files
committed
fix postgres stream format
1 parent f7e36bf commit b6e9269

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

backend/windmill-worker/src/pg_executor.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -117,24 +117,29 @@ fn do_postgresql_inner<'a>(
117117
.await
118118
.map_err(to_anyhow)?;
119119
} else if let Some(ref s3) = s3 {
120-
let rows = client
120+
let rows_stream = client
121121
.query_raw(&query, query_params)
122122
.await?
123123
.map_err(to_anyhow)
124-
.map(|row_result| {
124+
.enumerate()
125+
.map(|(i, row_result)| {
125126
row_result.and_then(|row| {
126127
postgres_row_to_json_value(row)
127128
.map_err(to_anyhow)
128129
.and_then(|ref v| serde_json::to_string(v).map_err(to_anyhow))
130+
.map(|s| if i == 0 { s } else { format!(",\n{}", s) })
129131
})
130132
});
133+
let start_bracket = futures::stream::once(async { Ok("{ rows: [\n".to_string()) });
134+
let end_bracket = futures::stream::once(async { Ok("\n]}".to_string()) });
135+
let rows_stream = start_bracket.chain(rows_stream).chain(end_bracket);
131136

132137
s3.client
133138
.upload_s3_file(
134139
s3.workspace_id.as_str(),
135140
s3.object_key.clone(),
136141
s3.storage.clone(),
137-
rows,
142+
rows_stream,
138143
)
139144
.await?;
140145

0 commit comments

Comments
 (0)