Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ RUN go build -o /olake main.go
# Final Runtime Stage
FROM alpine:3.18

# Install Java 17 instead of Java 11
RUN apk add --no-cache openjdk17
# Install Java 17 and iproute2 for ss command
RUN apk add --no-cache openjdk17 iproute2

# Copy the binary from the build stage
COPY --from=base /olake /home/olake
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@

| Source | Full Load | CDC | Incremental | Notes | Documentation |
|---------------|--------------|---------------|-------------------|-----------------------------|-----------------------------|
| PostgreSQL | ✅ | ✅ `wal2json` | ✅ |`pgoutput` support WIP |[Postgres Docs](https://olake.io/docs/connectors/postgres/overview) |
| PostgreSQL | ✅ | ✅ `pgoutput` | ✅ |`wal2json` deprecated |[Postgres Docs](https://olake.io/docs/connectors/postgres/overview) |
| MySQL | ✅ | ✅ | ✅ | Binlog-based CDC | [MySQL Docs](https://olake.io/docs/connectors/mysql/overview) |
| MongoDB | ✅ | ✅ | ✅ | Oplog-based CDC |[MongoDB Docs](https://olake.io/docs/connectors/mongodb/overview) |
| Oracle | ✅ | WIP | ✅ | JDBC based Full Load & Incremental | [Oracle Docs](https://olake.io/docs/connectors/oracle/overview) |
Expand Down
268 changes: 169 additions & 99 deletions destination/iceberg/java_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"os"
"os/exec"
"strconv"
Expand All @@ -14,13 +13,20 @@ import (

"github.com/datazip-inc/olake/destination/iceberg/proto"
"github.com/datazip-inc/olake/utils"
"github.com/datazip-inc/olake/utils/backoff"
"github.com/datazip-inc/olake/utils/logger"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var portMap sync.Map
var (
portStatus sync.Map // map[int]*portState - tracks port usage and cooldown state
cooldownPeriod = 180 * time.Second
)

type portState struct {
inUse bool
releasedAt time.Time
}

type serverInstance struct {
port int
Expand Down Expand Up @@ -124,39 +130,52 @@ func getServerConfigJSON(config *Config, partitionInfo []PartitionInfo, port int
}

// setup java client

func newIcebergClient(config *Config, partitionInfo []PartitionInfo, threadID string, check, upsert bool, destinationDatabase string) (*serverInstance, error) {
// validate configuration
err := config.Validate()
if err != nil {
return nil, fmt.Errorf("failed to validate config: %s", err)
}

const maxAttempts = 10
var (
port int
serverCmd *exec.Cmd
)

shouldRetry := func(err error) bool {
low := strings.ToLower(err.Error())
return strings.Contains(low, "bind") || strings.Contains(low, "address already in use") || strings.Contains(low, "failed to bind")
}
// nextStartPort controls from where the port scan should begin on each attempt
nextStartPort := 50051

startErr := backoff.Retry(5, time.Second, func() error {
// choose a fresh port each attempt
p, findErr := FindAvailablePort(config.ServerHost)
if findErr != nil {
return findErr
addEnvIfSet := func(key, value string) {
if value != "" {
keyPrefix := fmt.Sprintf("%s=", key)
for idx := range serverCmd.Env {
// if prefix exist through env, override it with config
if strings.HasPrefix(serverCmd.Env[idx], keyPrefix) {
serverCmd.Env[idx] = fmt.Sprintf("%s=%s", key, value)
return
}
}
// if prefix does not exist add it
serverCmd.Env = append(serverCmd.Env, fmt.Sprintf("%s=%s", key, value))
}
}
for attempt := 0; attempt < maxAttempts; attempt++ {
// get available port
port, err = FindAvailablePort(nextStartPort)
if err != nil {
return nil, fmt.Errorf("failed to find available ports: %s", err)
}
port = p

// Get the server configuration JSON for this port
configJSON, cfgErr := getServerConfigJSON(config, partitionInfo, port, upsert, destinationDatabase)
if cfgErr != nil {
portMap.Delete(port)
return cfgErr
// Build server configuration with selected port
configJSON, err := getServerConfigJSON(config, partitionInfo, port, upsert, destinationDatabase)
if err != nil {
return nil, fmt.Errorf("failed to create server config: %s", err)
}

// setup command
// If debug mode is enabled and it is not check command
if os.Getenv("OLAKE_DEBUG_MODE") != "" && !check {
serverCmd = exec.Command("java", "-XX:+UseG1GC", "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005", "-jar", config.JarPath, string(configJSON))
} else {
Expand All @@ -165,18 +184,6 @@ func newIcebergClient(config *Config, partitionInfo []PartitionInfo, threadID st

// Get current environment
serverCmd.Env = os.Environ()
addEnvIfSet := func(key, value string) {
if value != "" {
keyPrefix := fmt.Sprintf("%s=", key)
for idx := range serverCmd.Env {
if strings.HasPrefix(serverCmd.Env[idx], keyPrefix) {
serverCmd.Env[idx] = fmt.Sprintf("%s=%s", key, value)
return
}
}
serverCmd.Env = append(serverCmd.Env, fmt.Sprintf("%s=%s", key, value))
}
}
addEnvIfSet("AWS_ACCESS_KEY_ID", config.AccessKey)
addEnvIfSet("AWS_SECRET_ACCESS_KEY", config.SecretKey)
addEnvIfSet("AWS_REGION", config.Region)
Expand All @@ -185,45 +192,54 @@ func newIcebergClient(config *Config, partitionInfo []PartitionInfo, threadID st

// Set up and start the process with logging
if err := logger.SetupAndStartProcess(fmt.Sprintf("Thread[%s:%d]", threadID, port), serverCmd); err != nil {
// ensure process is not left running
if serverCmd != nil && serverCmd.Process != nil {
_ = serverCmd.Process.Kill()
// Mark port for cooldown since it failed to start
portStatus.Store(port, &portState{
inUse: false,
releasedAt: time.Now(),
})
// If this was a bind error (EADDRINUSE), retry with the next available port
// This is necessary because port can collide with system ephemeral ports, which we can not detect/kill, so we skip
errLower := strings.ToLower(err.Error())
if strings.Contains(errLower, "address in use") || strings.Contains(errLower, "failed to bind") || strings.Contains(errLower, "bindexception") || strings.Contains(errLower, "eaddrinuse") {
logger.Warnf("Thread[%s]: Port %d bind failed, retrying with next available port", threadID, port)
// advance the start port to the next one for the subsequent scan
nextStartPort = port + 1
continue
}
// release reserved port
portMap.Delete(port)
return err
return nil, fmt.Errorf("failed to setup logger: %s", err)
}
return nil
}, shouldRetry)

if startErr != nil {
return nil, fmt.Errorf("failed to start iceberg java writer process after retries: %s", startErr)
}

// Connect to gRPC server
conn, err := grpc.NewClient(fmt.Sprintf("%s:%s", config.ServerHost, strconv.Itoa(port)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)))
// Connect to gRPC server
conn, err := grpc.NewClient(fmt.Sprintf("%s:%s", config.ServerHost, strconv.Itoa(port)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)))

if err != nil {
// If connection fails, clean up the process
if serverCmd != nil && serverCmd.Process != nil {
if killErr := serverCmd.Process.Kill(); killErr != nil {
logger.Errorf("Thread[%s]: Failed to kill process: %s", threadID, killErr)
if err != nil {
// If connection fails, clean up the process
if serverCmd != nil && serverCmd.Process != nil {
if killErr := serverCmd.Process.Kill(); killErr != nil {
logger.Errorf("Thread[%s]: Failed to kill process: %s", threadID, killErr)
}
}
// Mark port for cooldown since connection failed
portStatus.Store(port, &portState{
inUse: false,
releasedAt: time.Now(),
})
return nil, fmt.Errorf("failed to create new grpc client: %s", err)
}
portMap.Delete(port)
return nil, fmt.Errorf("failed to create new grpc client: %s", err)

logger.Infof("Thread[%s]: Connected to new iceberg writer on port %d", threadID, port)
return &serverInstance{
port: port,
cmd: serverCmd,
client: proto.NewRecordIngestServiceClient(conn),
conn: conn,
serverID: threadID,
}, nil
}

logger.Infof("Thread[%s]: Connected to new iceberg writer on port %d", threadID, port)
return &serverInstance{
port: port,
cmd: serverCmd,
client: proto.NewRecordIngestServiceClient(conn),
conn: conn,
serverID: threadID,
}, nil
return nil, fmt.Errorf("failed to start iceberg writer after %d attempts due to port binding conflicts", maxAttempts)
}

func (s *serverInstance) sendClientRequest(ctx context.Context, reqPayload *proto.IcebergPayload) (string, error) {
Expand All @@ -245,54 +261,108 @@ func (s *serverInstance) closeIcebergClient() error {
logger.Errorf("Thread[%s]: Failed to kill Iceberg server: %s", s.serverID, err)
}
}
portMap.Delete(s.port)
// Mark port as released (cooldown period to allow OS to clean up TIME_WAIT state)
portStatus.Store(s.port, &portState{
inUse: false,
releasedAt: time.Now(),
})
return nil
}

// findAvailablePort finds an available port for the RPC server
func FindAvailablePort(serverHost string) (int, error) {
for p := 50051; p <= 59051; p++ {
// Try to store port in map - returns false if already exists
if _, loaded := portMap.LoadOrStore(p, true); !loaded {
// Check if the port is already in use by another process
conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", serverHost, p), time.Second)
if err == nil {
// Port is in use, close our test connection
conn.Close()

// Find the process using this port
cmd := exec.Command("lsof", "-i", fmt.Sprintf(":%d", p), "-t")
output, err := cmd.Output()
if err != nil {
// Failed to find process, continue to next port
portMap.Delete(p)
continue
}

// Get the PID
pid := strings.TrimSpace(string(output))
if pid == "" {
// No process found, continue to next port
portMap.Delete(p)
continue
}
// findAvailablePort finds an available port for the RPC server starting from startPort
func FindAvailablePort(startPort int) (int, error) {
if startPort < 50051 {
startPort = 50051
}
if startPort > 59051 {
return 0, fmt.Errorf("startPort out of range")
}
for p := startPort; p <= 59051; p++ {
// Check port state
if state, exists := portStatus.Load(p); exists {
ps := state.(*portState)
if ps.inUse {
// Port is currently in use by our process
continue
}
// Port was released, check if cooldown period has elapsed
if time.Since(ps.releasedAt) < cooldownPeriod {
// Still in cooldown, skip this port
continue
}
// Cooldown period expired, remove from map and allow reuse
portStatus.Delete(p)
}

// Port not tracked or cooldown expired - try to acquire it
// Use LoadOrStore to atomically claim the port
if _, loaded := portStatus.LoadOrStore(p, &portState{inUse: true}); !loaded {
// Successfully claimed the port, try to kill any process using it
pid := findProcessUsingPort(p)
if pid != "" {
// Kill the process
killCmd := exec.Command("kill", "-9", pid)
err = killCmd.Run()
if err != nil {
logger.Warnf("Failed to kill process using port %d: %v", p, err)
portMap.Delete(p)
continue
if killErr := killCmd.Run(); killErr == nil {
logger.Infof("Killed process %s that was using port %d", pid, p)
// Wait for the port to be released
time.Sleep(time.Second * 5)
} else {
logger.Warnf("Failed to kill process %s using port %d: %v", pid, p, killErr)
}

logger.Infof("Killed process %s that was using port %d", pid, p)

// Wait a moment for the port to be released
time.Sleep(time.Second * 5)
}
// Return the port (either it was free, or we attempted to free it)
return p, nil
}
}
return 0, fmt.Errorf("no available ports found between 50051 and 59051")
}

// findProcessUsingPort finds the PID of a process using the specified port
// Tries ss first (preferred for Alpine), falls back to lsof
func findProcessUsingPort(port int) string {
// Prefer ss if available. If ss exists, do NOT fall back to lsof.
if _, lookErr := exec.LookPath("ss"); lookErr == nil {
// Use a valid filter expression: sport = :<port>
cmd := exec.Command("ss", "-H", "-ltnp", fmt.Sprintf("sport = :%d", port))
output, err := cmd.Output()
if err == nil {
// Parse ss output to extract PID
lines := strings.Split(strings.TrimSpace(string(output)), "\n")
for _, line := range lines {
// ss output format: State Recv-Q Send-Q Local Address:Port Peer Address:Port Process
// Look for the process part at the end (e.g., "users:((\"java\",pid=123,fd=123))")
if strings.Contains(line, "users:") {
// Extract PID from the process info
parts := strings.Split(line, "pid=")
if len(parts) > 1 {
pidPart := strings.Split(parts[1], ",")[0]
if pid := strings.TrimSpace(pidPart); pid != "" {
logger.Infof("Found process %s using port %d using ss", pid, port)
return pid
}
}
}
}
// No users: match found; return empty without falling back
return ""
}
// ss failed to run (syntax/permissions/etc.). Log and return empty.
logger.Warnf("Failed to find process using port %d using ss: %v", port, err)
return ""
}

// ss not available: fall back to lsof if present
if _, lookErr := exec.LookPath("lsof"); lookErr == nil {
cmd := exec.Command("lsof", "-nP", fmt.Sprintf("-iTCP:%d", port), "-sTCP:LISTEN", "-t")
output, err := cmd.Output()
if err == nil {
pid := strings.TrimSpace(string(output))
if pid != "" {
logger.Infof("Found process %s using port %d using lsof", pid, port)
return pid
}
}
}

return ""
}
Loading
Loading