Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ olake-iceberg-java-writer.jar
local-releaser.sh
destination/iceberg/olake-iceberg-java-writer/.idea/
destination/iceberg/olake-iceberg-java-writer/dependency-reduced-pom.xml
!destination/iceberg/olake-iceberg-java-writer/src/main/java/io/olake/
43 changes: 19 additions & 24 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,41 +1,36 @@
# Build Stage
FROM golang:1.23-alpine AS base
# Runtime base (shared across all drivers): OS + Java + JAR + destination specs
FROM alpine:3.18 AS runtime-base

WORKDIR /home/app
COPY . .
RUN apk add --no-cache openjdk17 iproute2

# Copy the pre-built JAR and destination specs (these rarely change)
COPY destination/iceberg/olake-iceberg-java-writer/target/olake-iceberg-java-writer-0.0.1-SNAPSHOT.jar /home/olake-iceberg-java-writer.jar
COPY destination/iceberg/resources/spec.json /destination/iceberg/resources/spec.json
COPY destination/parquet/resources/spec.json /destination/parquet/resources/spec.json

WORKDIR /home

# Build the Go driver binary (driver-specific)
FROM golang:1.23-alpine AS builder

ARG DRIVER_NAME=olake
# Build the Go binary
WORKDIR /home/app
COPY . .
WORKDIR /home/app/drivers/${DRIVER_NAME}
RUN go build -o /olake main.go

# Final Runtime Stage
FROM alpine:3.18

# Install Java 17 and iproute2 for ss command
RUN apk add --no-cache openjdk17 iproute2

# Copy the binary from the build stage
COPY --from=base /olake /home/olake
# Final image: reuse runtime base, then add only driver-specific layers
FROM runtime-base

ARG DRIVER_VERSION=dev
ARG DRIVER_NAME=olake

# Copy the pre-built JAR file from Maven
# First try to copy from the source location (works after Maven build)
COPY destination/iceberg/olake-iceberg-java-writer/target/olake-iceberg-java-writer-0.0.1-SNAPSHOT.jar /home/olake-iceberg-java-writer.jar

# Copy the spec files for driver and destinations
COPY --from=base /home/app/drivers/${DRIVER_NAME}/resources/spec.json /drivers/${DRIVER_NAME}/resources/spec.json
COPY --from=base /home/app/destination/iceberg/resources/spec.json /destination/iceberg/resources/spec.json
COPY --from=base /home/app/destination/parquet/resources/spec.json /destination/parquet/resources/spec.json
COPY --from=builder /olake /home/olake
COPY drivers/${DRIVER_NAME}/resources/spec.json /drivers/${DRIVER_NAME}/resources/spec.json

# Metadata
LABEL io.eggwhite.version=${DRIVER_VERSION}
LABEL io.eggwhite.name=olake/source-${DRIVER_NAME}

# Set working directory
WORKDIR /home

# Entrypoint
ENTRYPOINT ["./olake"]
2 changes: 1 addition & 1 deletion destination/iceberg/olake-iceberg-java-writer/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Java Iceberg Sink

This project is a fork and modified version of the [debezium-server-iceberg](https://github.com/memiiso/debezium-server-iceberg) project, originally used to dump data from Debezium Server into Iceberg. The modifications make it compatible with Olake by sending data in Debezium format.
This project is a fork and modified version of the [debezium-server-iceberg](https://github.com/memiiso/debezium-server-iceberg) project, originally used to dump data from Debezium Server into Iceberg.

## Architecture

Expand Down
113 changes: 9 additions & 104 deletions destination/iceberg/olake-iceberg-java-writer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.debezium</groupId>
<groupId>io.olake</groupId>
<artifactId>olake-iceberg-java-writer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Debezium Server Iceberg Consumer</name>
<name>OLake Iceberg Java Writer</name>
<packaging>jar</packaging>

<properties>
<!-- Debezium Version -->
<version.debezium>2.5.2.Final</version.debezium>

<!-- Instruct the build to use only UTF-8 encoding for source code -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand All @@ -41,7 +39,6 @@
<version.protobuf>3.25.5</version.protobuf>
<version.netty>4.1.118.Final</version.netty>
<version.log4j2>2.21.1</version.log4j2>
<version.quarkus>3.19.1</version.quarkus>
<version.commons.io>2.14.0</version.commons.io>
<version.avro>1.11.4</version.avro>
<version.snappy>1.1.10.4</version.snappy>
Expand All @@ -61,21 +58,6 @@

<dependencyManagement>
<dependencies>
<!-- debezium server -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-bom</artifactId>
<version>${version.debezium}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-bom</artifactId>
<version>${version.debezium}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-bom</artifactId>
Expand All @@ -100,74 +82,6 @@
</dependencyManagement>

<dependencies>
<!-- Debezium -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
<exclusions>
<exclusion>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager</artifactId>
</exclusion>
<exclusion>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core</artifactId>
</exclusion>
<exclusion>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
</exclusion>
<exclusion>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</exclusion>
<exclusion>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes-config</artifactId>
</exclusion>
<exclusion>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jackson</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Quarkus dependencies with fixed versions -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core</artifactId>
<version>${version.quarkus}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
<version>${version.quarkus}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
<version>${version.quarkus}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes-config</artifactId>
<version>${version.quarkus}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jackson</artifactId>
<version>${version.quarkus}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-http</artifactId>
<version>${version.quarkus}</version>
</dependency>

<!-- Log4j2 implementation -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down Expand Up @@ -196,6 +110,7 @@
<dependency>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
<version>2.0.1.Final</version>
<scope>provided</scope>
</dependency>

Expand Down Expand Up @@ -420,15 +335,6 @@
</exclusions>
</dependency>

<!-- Testing -->
<!-- spark for tests -->
<!-- jakarta version compatible with spark, for testing only -->
<dependency>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
<version>5.0.0</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down Expand Up @@ -492,12 +398,6 @@
<artifactId>iceberg-spark-extensions-${version.spark.major.iceebrgtemp}_${version.spark.scala}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<version>${version.quarkus}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
Expand All @@ -524,26 +424,31 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.14.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>minio</artifactId>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -819,7 +724,7 @@
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>io.debezium.server.iceberg.OlakeRpcServer</mainClass>
<mainClass>io.olake.server.iceberg.OlakeRpcServer</mainClass>
<manifestEntries>
<Multi-Release>true</Multi-Release>
</manifestEntries>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
*
*/

package io.debezium.server.iceberg;
package io.olake.server.iceberg;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.primitives.Ints;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.literal.NamedLiteral;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -26,9 +24,6 @@
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.ConfigValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -58,59 +53,6 @@ public class IcebergUtil {
protected static final DateTimeFormatter dtFormater = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneOffset.UTC);


public static Map<String, String> getConfigSubset(Config config, String prefix) {
final Map<String, String> ret = new HashMap<>();

for (String propName : config.getPropertyNames()) {
if (propName.startsWith(prefix)) {
final String newPropName = propName.substring(prefix.length());
ret.put(newPropName, config.getValue(propName, String.class));
}
}

return ret;
}


public static boolean configIncludesUnwrapSmt() {
return configIncludesUnwrapSmt(ConfigProvider.getConfig());
}

//@TestingOnly
static boolean configIncludesUnwrapSmt(Config config) {
// first lets find the config value for debezium statements
ConfigValue stms = config.getConfigValue("debezium.transforms");
if (stms == null || stms.getValue() == null || stms.getValue().isEmpty() || stms.getValue().isBlank()) {
return false;
}

String[] stmsList = stms.getValue().split(",");
final String regexVal = "^io\\.debezium\\..*transforms\\.ExtractNew.*State$";
// we have debezium statements configured! let's check if we have event flattening config is set.
for (String stmName : stmsList) {
ConfigValue stmVal = config.getConfigValue("debezium.transforms." + stmName + ".type");
if (stmVal != null && stmVal.getValue() != null && !stmVal.getValue().isEmpty() && !stmVal.getValue().isBlank() && stmVal.getValue().matches(regexVal)) {
return true;
}
}

return false;
}


public static <T> T selectInstance(Instance<T> instances, String name) {

Instance<T> instance = instances.select(NamedLiteral.of(name));
if (instance.isAmbiguous()) {
throw new RuntimeException("Multiple batch size wait class named '" + name + "' were found");
} else if (instance.isUnsatisfied()) {
throw new RuntimeException("No batch size wait class named '" + name + "' is available");
}

LOGGER.info("Using {}", instance.getClass().getName());
return instance.get();
}

public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier, Schema schema) {

if (!((SupportsNamespaces) icebergCatalog).namespaceExists(tableIdentifier.namespace())) {
Expand Down
Loading
Loading