Skip to content
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ dependencies {
implementation(group: 'com.google.cloud', name: 'google-cloud-bigtable', version: '2.24.1') {
exclude group: "io.grpc"
}
implementation (group: 'com.aliyun.odps', name: 'odps-sdk-core', version: '0.52.3-public') {
implementation (group: 'com.aliyun.odps', name: 'odps-sdk-core', version: '0.53.0-streaming.2-SNAPSHOT') {
exclude group: "com.google"
exclude group: "io.grpc"
exclude group: "org.slf4j"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.gotocompany.depot.config.converter.LocalDateTimeConverter;
import com.gotocompany.depot.config.converter.MaxComputeOdpsGlobalSettingsConverter;
import com.gotocompany.depot.config.converter.ZoneIdConverter;
import com.gotocompany.depot.maxcompute.enumeration.StreamingInsertPartitioningType;
import com.gotocompany.depot.maxcompute.enumeration.MaxComputeTimestampDataType;
import org.aeonbits.owner.Config;

Expand Down Expand Up @@ -60,6 +61,10 @@ public interface MaxComputeSinkConfig extends Config {
@DefaultValue("false")
Boolean isTablePartitioningEnabled();

@Key("SINK_MAXCOMPUTE_STREAMING_INSERT_PARTITIONING_TYPE")
@DefaultValue("DEFAULT")
StreamingInsertPartitioningType getStreamingInsertPartitioningType();

@Key("SINK_MAXCOMPUTE_TABLE_PARTITION_KEY")
String getTablePartitionKey();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.gotocompany.depot.maxcompute.client.insert;

import com.aliyun.odps.exceptions.SchemaMismatchException;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.io.DynamicPartitionRecordPack;
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.exception.NonRetryableException;
import com.gotocompany.depot.maxcompute.client.insert.session.StreamingSessionManager;
import com.gotocompany.depot.maxcompute.model.RecordWrapper;
import com.gotocompany.depot.metrics.Instrumentation;
import com.gotocompany.depot.metrics.MaxComputeMetrics;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Slf4j
public class DynamicPartitionedInsertManager extends InsertManager {

private static final String DYNAMIC_PARTITIONED_INSERT_MANAGER = "dynamic-partitioned";

protected DynamicPartitionedInsertManager(MaxComputeSinkConfig maxComputeSinkConfig,
Instrumentation instrumentation,
MaxComputeMetrics maxComputeMetrics,
StreamingSessionManager streamingSessionManager) {
super(maxComputeSinkConfig, instrumentation, maxComputeMetrics, streamingSessionManager);
}

@Override
public void insert(List<RecordWrapper> recordWrappers) throws TunnelException, IOException {
Map<String, List<RecordWrapper>> partitionSpecRecordWrapperMap = recordWrappers.stream()
.collect(Collectors.groupingBy(record -> record.getPartitionSpec().toString()));
for (Map.Entry<String, List<RecordWrapper>> entry : partitionSpecRecordWrapperMap.entrySet()) {
TableTunnel.StreamUploadSession streamUploadSession = super.getStreamingSessionManager().getSession(DYNAMIC_PARTITIONED_INSERT_MANAGER);
TableTunnel.StreamRecordPack recordPack = newRecordPack(streamUploadSession);
for (RecordWrapper recordWrapper : entry.getValue()) {
appendRecord(recordPack, recordWrapper, recordWrapper.getPartitionSpec().toString());
}
super.flushRecordPack(recordPack);
}
}

@Override
protected void appendRecord(TableTunnel.StreamRecordPack recordPack, RecordWrapper recordWrapper, String sessionKey) throws IOException {
try {
DynamicPartitionRecordPack dynamicPartitionRecordPack = (DynamicPartitionRecordPack) recordPack;
dynamicPartitionRecordPack.append(
recordWrapper.getRecord(),
recordWrapper.getPartitionSpec().toString(false, false)
);
} catch (SchemaMismatchException e) {
log.error("Record pack schema Mismatch", e);
throw new NonRetryableException("Record pack schema Mismatch", e);
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ protected InsertManager(MaxComputeSinkConfig maxComputeSinkConfig, Instrumentati

/**
* Insert records into MaxCompute.
*
* @param recordWrappers list of records to insert
* @throws TunnelException if there is an error with the tunnel service, typically due to network issues
* @throws IOException typically thrown when issues such as schema mismatch occur
* @throws IOException typically thrown when issues such as schema mismatch occur
*/
public abstract void insert(List<RecordWrapper> recordWrappers) throws TunnelException, IOException;

Expand All @@ -55,7 +56,7 @@ protected InsertManager(MaxComputeSinkConfig maxComputeSinkConfig, Instrumentati
*
* @param streamUploadSession session for streaming insert
* @return TableTunnel.StreamRecordPack
* @throws IOException typically thrown when issues such as schema mismatch occur
* @throws IOException typically thrown when issues such as schema mismatch occur
* @throws TunnelException if there is an error with the tunnel service, typically due to network issues
*/
protected TableTunnel.StreamRecordPack newRecordPack(TableTunnel.StreamUploadSession streamUploadSession) throws IOException, TunnelException {
Expand All @@ -70,7 +71,7 @@ protected TableTunnel.StreamRecordPack newRecordPack(TableTunnel.StreamUploadSes
/**
* Instrument the insert operation.
*
* @param start start time of the operation
* @param start start time of the operation
* @param flushResult result of the flush operation
*/
private void instrument(Instant start, TableTunnel.FlushResult flushResult) {
Expand All @@ -89,9 +90,9 @@ private void instrument(Instant start, TableTunnel.FlushResult flushResult) {
* When schema mismatch occurs, wrap the exception in a NonRetryableException. It is not possible to recover from schema mismatch.
* When network partition occurs, refresh the schema and rethrow the exception.
*
* @param recordPack recordPack to append the record to
* @param recordPack recordPack to append the record to
* @param recordWrapper record to append
* @param sessionKey key to identify the session, used for refreshing the schema
* @param sessionKey key to identify the session, used for refreshing the schema
* @throws IOException typically thrown when issues such as network partition occur
*/
protected void appendRecord(TableTunnel.StreamRecordPack recordPack, RecordWrapper recordWrapper, String sessionKey) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.aliyun.odps.tunnel.TableTunnel;
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.maxcompute.client.insert.session.StreamingSessionManager;
import com.gotocompany.depot.maxcompute.enumeration.StreamingInsertPartitioningType;
import com.gotocompany.depot.metrics.Instrumentation;
import com.gotocompany.depot.metrics.MaxComputeMetrics;

Expand All @@ -26,9 +27,12 @@ public static InsertManager createInsertManager(MaxComputeSinkConfig maxComputeS
TableTunnel tableTunnel,
Instrumentation instrumentation,
MaxComputeMetrics maxComputeMetrics) {
if (maxComputeSinkConfig.isTablePartitioningEnabled()) {
if (maxComputeSinkConfig.isTablePartitioningEnabled() && StreamingInsertPartitioningType.DEFAULT.equals(maxComputeSinkConfig.getStreamingInsertPartitioningType())) {
StreamingSessionManager partitionedStreamingSessionManager = StreamingSessionManager.createPartitioned(tableTunnel, maxComputeSinkConfig, instrumentation, maxComputeMetrics);
return new PartitionedInsertManager(maxComputeSinkConfig, instrumentation, maxComputeMetrics, partitionedStreamingSessionManager);
} else if (maxComputeSinkConfig.isTablePartitioningEnabled() && StreamingInsertPartitioningType.DYNAMIC.equals(maxComputeSinkConfig.getStreamingInsertPartitioningType())) {
StreamingSessionManager partitionedStreamingSessionManager = StreamingSessionManager.createDynamicPartitioned(tableTunnel, maxComputeSinkConfig, instrumentation, maxComputeMetrics);
return new DynamicPartitionedInsertManager(maxComputeSinkConfig, instrumentation, maxComputeMetrics, partitionedStreamingSessionManager);
} else {
StreamingSessionManager nonPartitionedStreamingSessionManager = StreamingSessionManager.createNonPartitioned(tableTunnel, maxComputeSinkConfig, instrumentation, maxComputeMetrics);
return new NonPartitionedInsertManager(maxComputeSinkConfig, instrumentation, maxComputeMetrics, nonPartitionedStreamingSessionManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.metrics.Instrumentation;
import com.gotocompany.depot.metrics.MaxComputeMetrics;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Field;
import java.time.Instant;

/**
* StreamingSessionManager manages the streaming insert sessions for MaxCompute.
* Streaming Insert Sessions are reused when the partition spec is the same.
* Streaming sessions are created by TableTunnel service. Read more about it here: <a href="https://www.alibabacloud.com/help/en/maxcompute/user-guide/tabletunnel">Alibaba MaxCompute Table Tunnel</a>
*/
public final class StreamingSessionManager {
@Slf4j
public final class StreamingSessionManager {

private final LoadingCache<String, TableTunnel.StreamUploadSession> sessionCache;

Expand Down Expand Up @@ -80,6 +84,30 @@ public TableTunnel.StreamUploadSession load(String partitionSpecKey) throws Tunn
.build(cacheLoader));
}

public static StreamingSessionManager createDynamicPartitioned(TableTunnel tableTunnel,
MaxComputeSinkConfig maxComputeSinkConfig,
Instrumentation instrumentation,
MaxComputeMetrics maxComputeMetrics) {

CacheLoader<String, TableTunnel.StreamUploadSession> cacheLoader = new CacheLoader<String, TableTunnel.StreamUploadSession>() {
@SneakyThrows
@Override
public TableTunnel.StreamUploadSession load(String partitionSpecKey) throws TunnelException {
log.info("Creating dynamic partitioned streaming session for partition spec: {}", partitionSpecKey);
TableTunnel.StreamUploadSession.Builder streamUploadSessionBuilder = getBaseStreamSessionBuilder(tableTunnel, maxComputeSinkConfig)
.setCreatePartition(true)
.setDynamicPartition(true);
Field field = TableTunnel.StreamUploadSession.Builder.class.getDeclaredField("dynamicPartition");
field.setAccessible(true);
log.info("Dynamic partition: {}", field.get(streamUploadSessionBuilder));
return buildStreamSession(streamUploadSessionBuilder, instrumentation, maxComputeMetrics);
}
};
return new StreamingSessionManager(CacheBuilder.newBuilder()
.maximumSize(maxComputeSinkConfig.getStreamingInsertMaximumSessionCount())
.build(cacheLoader));
}

/**
* Get the session for the given cache key.
* If the session is not present in the cache, a new session is created and returned.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.gotocompany.depot.maxcompute.enumeration;

public enum StreamingInsertPartitioningType {
DEFAULT,
DYNAMIC
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.aliyun.odps.tunnel.TableTunnel;
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.maxcompute.enumeration.StreamingInsertPartitioningType;
import com.gotocompany.depot.metrics.Instrumentation;
import com.gotocompany.depot.metrics.MaxComputeMetrics;
import org.junit.Test;
Expand All @@ -16,6 +17,7 @@ public class InsertManagerFactoryTest {
public void shouldCreatePartitionedInsertManager() {
MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class);
when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(true);
when(maxComputeSinkConfig.getStreamingInsertPartitioningType()).thenReturn(StreamingInsertPartitioningType.DEFAULT);

InsertManager insertManager = InsertManagerFactory.createInsertManager(maxComputeSinkConfig,
Mockito.mock(TableTunnel.class), Mockito.mock(Instrumentation.class), Mockito.mock(MaxComputeMetrics.class));
Expand All @@ -34,4 +36,16 @@ public void shouldCreateNonPartitionedInsertManager() {
assertTrue(insertManager instanceof NonPartitionedInsertManager);
}

@Test
public void shouldCreateDynamicPartitionedInsertManager() {
MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class);
when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(true);
when(maxComputeSinkConfig.getStreamingInsertPartitioningType()).thenReturn(StreamingInsertPartitioningType.DYNAMIC);

InsertManager insertManager = InsertManagerFactory.createInsertManager(maxComputeSinkConfig,
Mockito.mock(TableTunnel.class), Mockito.mock(Instrumentation.class), Mockito.mock(MaxComputeMetrics.class));

assertTrue(insertManager instanceof DynamicPartitionedInsertManager);
}

}
Loading