Skip to content

Commit 9298177

Browse files
committed
add userCodeClassLoader
1 parent cb7c58f commit 9298177

File tree

5 files changed

+24
-11
lines changed

5 files changed

+24
-11
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkSink.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,19 @@ class FlinkSink<InputT> implements Sink<InputT>, SupportsPreWriteTopology<InputT
5959
public SinkWriter<InputT> createWriter(InitContext context) throws IOException {
6060
FlinkSinkWriter<InputT> flinkSinkWriter =
6161
builder.createWriter(context.getMailboxExecutor());
62-
flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup()));
62+
flinkSinkWriter.initialize(
63+
InternalSinkWriterMetricGroup.wrap(context.metricGroup()),
64+
context.getUserCodeClassLoader());
6365
return flinkSinkWriter;
6466
}
6567

6668
@Override
6769
public SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException {
6870
FlinkSinkWriter<InputT> flinkSinkWriter =
6971
builder.createWriter(context.getMailboxExecutor());
70-
flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup()));
72+
flinkSinkWriter.initialize(
73+
InternalSinkWriterMetricGroup.wrap(context.metricGroup()),
74+
context.getUserCodeClassLoader());
7175
return flinkSinkWriter;
7276
}
7377

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/writer/AppendSinkWriter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
2828
import org.apache.flink.table.types.logical.RowType;
2929
import org.apache.flink.types.RowKind;
30+
import org.apache.flink.util.UserCodeClassLoader;
3031

3132
import java.io.IOException;
3233
import java.util.concurrent.CompletableFuture;
@@ -55,8 +56,9 @@ public AppendSinkWriter(
5556
}
5657

5758
@Override
58-
public void initialize(SinkWriterMetricGroup metricGroup) {
59-
super.initialize(metricGroup);
59+
public void initialize(
60+
SinkWriterMetricGroup metricGroup, UserCodeClassLoader userCodeClassLoader) {
61+
super.initialize(metricGroup, userCodeClassLoader);
6062

6163
appendWriter = table.newAppend().createWriter();
6264
LOG.info("Finished opening Fluss {}.", this.getClass().getSimpleName());

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/writer/FlinkSinkWriter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public FlinkSinkWriter(
8282
RowType tableRowType,
8383
boolean ignoreDelete,
8484
MailboxExecutor mailboxExecutor,
85-
FlussSerializationSchema serializationSchema) {
85+
FlussSerializationSchema<InputT> serializationSchema) {
8686
this(
8787
tablePath,
8888
flussConfig,
@@ -110,7 +110,8 @@ public FlinkSinkWriter(
110110
this.serializationSchema = serializationSchema;
111111
}
112112

113-
public void initialize(SinkWriterMetricGroup metricGroup) {
113+
public void initialize(
114+
SinkWriterMetricGroup metricGroup, UserCodeClassLoader userCodeClassLoader) {
114115
LOG.info(
115116
"Opening Fluss {}, database: {} and table: {}",
116117
this.getClass().getSimpleName(),
@@ -137,7 +138,7 @@ public MetricGroup getMetricGroup() {
137138

138139
@Override
139140
public UserCodeClassLoader getUserCodeClassLoader() {
140-
return null; // TODO: add user code class loader here
141+
return userCodeClassLoader;
141142
}
142143

143144
@Override

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/writer/UpsertSinkWriter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
2929
import org.apache.flink.table.types.logical.RowType;
3030
import org.apache.flink.types.RowKind;
31+
import org.apache.flink.util.UserCodeClassLoader;
3132

3233
import javax.annotation.Nullable;
3334

@@ -58,8 +59,9 @@ public UpsertSinkWriter(
5859
}
5960

6061
@Override
61-
public void initialize(SinkWriterMetricGroup metricGroup) {
62-
super.initialize(metricGroup);
62+
public void initialize(
63+
SinkWriterMetricGroup metricGroup, UserCodeClassLoader userCodeClassLoader) {
64+
super.initialize(metricGroup, userCodeClassLoader);
6365
Upsert upsert = table.newUpsert();
6466
if (targetColumnIndexes != null) {
6567
upsert = upsert.partialUpdate(targetColumnIndexes);

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/writer/FlinkSinkWriterTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ void testSinkMetrics(String clientId) throws Exception {
8888
FlinkSinkWriter flinkSinkWriter =
8989
createSinkWriter(flussConf, mockWriterInitContext.getMailboxExecutor());
9090

91-
flinkSinkWriter.initialize(mockWriterInitContext.metricGroup());
91+
flinkSinkWriter.initialize(
92+
mockWriterInitContext.metricGroup(),
93+
mockWriterInitContext.getUserCodeClassLoader());
9294
flinkSinkWriter.write(
9395
GenericRowData.of(1, StringData.fromString("a")), new MockSinkWriterContext());
9496
flinkSinkWriter.flush(false);
@@ -188,7 +190,9 @@ private void testExceptionWhenFlussUnavailable(
188190
// test fluss unavailable.
189191
try (FlinkSinkWriter writer =
190192
createSinkWriter(clientConfig, mockWriterInitContext.getMailboxExecutor())) {
191-
writer.initialize(mockWriterInitContext.metricGroup());
193+
writer.initialize(
194+
mockWriterInitContext.metricGroup(),
195+
mockWriterInitContext.getUserCodeClassLoader());
192196
flussClusterExtension.close();
193197
writer.write(
194198
GenericRowData.of(1, StringData.fromString("a")),

0 commit comments

Comments
 (0)