Skip to content

Commit 2093d6f

Browse files
authored
Merge branch 'main' into li0k/clean_up_expired_files
2 parents 5b8b7e3 + 7977e20 commit 2093d6f

File tree

6 files changed

+549
-446
lines changed

6 files changed

+549
-446
lines changed

java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
package com.risingwave.connector;
1818

1919
import com.fasterxml.jackson.annotation.JsonCreator;
20+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
2021
import com.fasterxml.jackson.annotation.JsonProperty;
2122
import com.risingwave.connector.api.sink.CommonSinkConfig;
2223
import java.sql.Connection;
2324
import java.sql.SQLException;
2425

26+
@JsonIgnoreProperties(ignoreUnknown = true)
2527
public class JDBCSinkConfig extends CommonSinkConfig {
2628
private String jdbcUrl;
2729

src/connector/src/sink/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,6 +1089,12 @@ pub enum SinkError {
10891089
#[backtrace]
10901090
anyhow::Error,
10911091
),
1092+
#[error("Redshift error: {0}")]
1093+
Redshift(
1094+
#[source]
1095+
#[backtrace]
1096+
anyhow::Error,
1097+
),
10921098
}
10931099

10941100
impl From<sea_orm::DbErr> for SinkError {

src/connector/src/sink/snowflake_redshift/mod.rs

Lines changed: 126 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ use std::time::{SystemTime, UNIX_EPOCH};
1919
use bytes::BytesMut;
2020
use opendal::Operator;
2121
use risingwave_common::array::{ArrayImpl, DataChunk, Op, PrimitiveArray, StreamChunk, Utf8Array};
22-
use risingwave_common::catalog::Schema;
22+
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
2323
use risingwave_common::row::Row;
24+
use risingwave_common::types::DataType;
2425
use serde_json::{Map, Value};
2526
use thiserror_ext::AsReport;
2627
use uuid::Uuid;
@@ -31,7 +32,9 @@ use crate::sink::encoder::{
3132
};
3233
use crate::sink::file_sink::opendal_sink::FileSink;
3334
use crate::sink::file_sink::s3::{S3Common, S3Sink};
34-
use crate::sink::{Result, SinkError, SinkWriterParam};
35+
use crate::sink::remote::CoordinatedRemoteSinkWriter;
36+
use crate::sink::writer::SinkWriter;
37+
use crate::sink::{Result, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam};
3538

3639
pub mod redshift;
3740
pub mod snowflake;
@@ -150,44 +153,15 @@ pub struct SnowflakeRedshiftSinkS3Writer {
150153
s3_operator: Operator,
151154
augmented_row: AugmentedRow,
152155
opendal_writer_path: Option<(opendal::Writer, String)>,
153-
target_table_name: Option<String>,
154-
}
155-
156-
pub async fn build_opendal_writer_path(
157-
s3_config: &S3Common,
158-
operator: &Operator,
159-
target_table_name: &Option<String>,
160-
) -> Result<(opendal::Writer, String)> {
161-
let mut base_path = s3_config.path.clone().unwrap_or("".to_owned());
162-
if !base_path.ends_with('/') {
163-
base_path.push('/');
164-
}
165-
if let Some(table_name) = &target_table_name {
166-
base_path.push_str(&format!("{}/", table_name));
167-
}
168-
let create_time = SystemTime::now()
169-
.duration_since(UNIX_EPOCH)
170-
.expect("Time went backwards");
171-
let object_name = format!(
172-
"{}{}_{}.{}",
173-
base_path,
174-
Uuid::new_v4(),
175-
create_time.as_secs(),
176-
"json",
177-
);
178-
let all_path = format!("s3://{}/{}", s3_config.bucket_name, object_name);
179-
Ok((
180-
operator.writer_with(&object_name).concurrent(8).await?,
181-
all_path,
182-
))
156+
target_table_name: String,
183157
}
184158

185159
impl SnowflakeRedshiftSinkS3Writer {
186160
pub fn new(
187161
s3_config: S3Common,
188162
schema: Schema,
189163
is_append_only: bool,
190-
target_table_name: Option<String>,
164+
target_table_name: String,
191165
) -> Result<Self> {
192166
let s3_operator = FileSink::<S3Sink>::new_s3_sink(&s3_config)?;
193167
Ok(Self {
@@ -209,6 +183,7 @@ impl SnowflakeRedshiftSinkS3Writer {
209183
let opendal_writer_path = build_opendal_writer_path(
210184
&self.s3_config,
211185
&self.s3_operator,
186+
None,
212187
&self.target_table_name,
213188
)
214189
.await?;
@@ -240,3 +215,121 @@ impl SnowflakeRedshiftSinkS3Writer {
240215
}
241216
}
242217
}
218+
219+
pub async fn build_opendal_writer_path(
220+
s3_config: &S3Common,
221+
operator: &Operator,
222+
dir: Option<&str>,
223+
target_table_name: &str,
224+
) -> Result<(opendal::Writer, String)> {
225+
let mut base_path = s3_config.path.clone().unwrap_or("".to_owned());
226+
if !base_path.ends_with('/') {
227+
base_path.push('/');
228+
}
229+
base_path.push_str(&format!("{}/", target_table_name));
230+
if let Some(dir) = dir {
231+
base_path.push_str(&format!("{}/", dir));
232+
}
233+
let create_time = SystemTime::now()
234+
.duration_since(UNIX_EPOCH)
235+
.expect("Time went backwards");
236+
let object_name = format!(
237+
"{}{}_{}.{}",
238+
base_path,
239+
Uuid::new_v4(),
240+
create_time.as_millis(),
241+
"json",
242+
);
243+
let all_path = format!("s3://{}/{}", s3_config.bucket_name, object_name);
244+
Ok((
245+
operator.writer_with(&object_name).concurrent(8).await?,
246+
all_path,
247+
))
248+
}
249+
250+
/// Generic JDBC writer for both Redshift and Snowflake sinks
251+
pub struct SnowflakeRedshiftSinkJdbcWriter {
252+
augmented_row: AugmentedChunk,
253+
jdbc_sink_writer: CoordinatedRemoteSinkWriter,
254+
}
255+
256+
impl SnowflakeRedshiftSinkJdbcWriter {
257+
pub async fn new(
258+
is_append_only: bool,
259+
writer_param: SinkWriterParam,
260+
mut param: SinkParam,
261+
full_table_name: String,
262+
) -> Result<Self> {
263+
let metrics = SinkWriterMetrics::new(&writer_param);
264+
let column_descs = &mut param.columns;
265+
266+
// Build full table name based on connector type
267+
if !is_append_only {
268+
// Add CDC-specific columns for upsert mode
269+
let max_column_id = column_descs
270+
.iter()
271+
.map(|column| column.column_id.get_id())
272+
.max()
273+
.unwrap_or(0);
274+
275+
(*column_descs).push(ColumnDesc::named(
276+
__ROW_ID,
277+
ColumnId::new(max_column_id + 1),
278+
DataType::Varchar,
279+
));
280+
(*column_descs).push(ColumnDesc::named(
281+
__OP,
282+
ColumnId::new(max_column_id + 2),
283+
DataType::Int32,
284+
));
285+
};
286+
287+
if let Some(schema_name) = param.properties.remove("schema") {
288+
param
289+
.properties
290+
.insert("schema.name".to_owned(), schema_name);
291+
}
292+
if let Some(database_name) = param.properties.remove("database") {
293+
param
294+
.properties
295+
.insert("database.name".to_owned(), database_name);
296+
}
297+
param
298+
.properties
299+
.insert("table.name".to_owned(), full_table_name.clone());
300+
param
301+
.properties
302+
.insert("type".to_owned(), "append-only".to_owned());
303+
304+
let jdbc_sink_writer =
305+
CoordinatedRemoteSinkWriter::new(param.clone(), metrics.clone()).await?;
306+
307+
Ok(Self {
308+
augmented_row: AugmentedChunk::new(0, is_append_only),
309+
jdbc_sink_writer,
310+
})
311+
}
312+
313+
pub async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
314+
self.augmented_row.reset_epoch(epoch);
315+
self.jdbc_sink_writer.begin_epoch(epoch).await?;
316+
Ok(())
317+
}
318+
319+
pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
320+
let chunk = self.augmented_row.augmented_chunk(chunk)?;
321+
self.jdbc_sink_writer.write_batch(chunk).await?;
322+
Ok(())
323+
}
324+
325+
pub async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
326+
self.jdbc_sink_writer.barrier(is_checkpoint).await?;
327+
Ok(())
328+
}
329+
330+
pub async fn abort(&mut self) -> Result<()> {
331+
// TODO: abort should clean up all the data written in this epoch
332+
self.jdbc_sink_writer.abort().await?;
333+
Ok(())
334+
}
335+
}

0 commit comments

Comments
 (0)