feat(iceberg): single JVM per process instead of per stream-chunk#962
feat(iceberg): single JVM per process instead of per stream-chunk#962vikaxsh wants to merge 24 commits into
Conversation
… into feat/unified-jvm
…se using a cancelled flag
…eat/unified-jvm
hash-data
left a comment
There was a problem hiding this comment.
can we think of better RPC request structure? as well as think of more scenerios where things can break?
| // Shutdown invokes destination-level cleanup if the registered writer | ||
| // implements Shutdownable. No-op for destinations without long-lived | ||
| // resources (parquet). | ||
| func Shutdown(ctx context.Context, config *types.WriterConfig) { |
There was a problem hiding this comment.
why shutdown? it can be cleanup and it is part of writer pool
| // The catalog/storage portion of `config` is what drives the JVM CLI; later | ||
| // callers that pass a different config still receive the already-running JVM. | ||
| // This is intentional: in a single OLake sync the destination config is fixed. | ||
| func acquireServer(config *Config) (*serverInstance, error) { |
There was a problem hiding this comment.
think of function name right now it is not readable?
Example:
acquireServer is not what this function does, it create thread in java side so initJavaThread could be a good name
| sharedServerMu.Lock() | ||
| defer sharedServerMu.Unlock() | ||
|
|
There was a problem hiding this comment.
Inside Backfill, for a single stream, chunkProcessor runs concurrently for every chunk of the same stream
so not 2 chunk try to create jvm at same time
| for attempt := 0; attempt < maxAttempts; attempt++ { | ||
| // get available port | ||
| port, err = FindAvailablePort(threadID, nextStartPort) | ||
| port, err := FindAvailablePort(serverID, nextStartPort) |
There was a problem hiding this comment.
if a previous failed sync left behind an orphaned JVM that was still holding the port, this can now be handled easily. kept the existing behavior for now to avoid making significant changes
| // TODO: research the following flags in arrow writer and legacy writer | ||
| // need to do some research on the following flags | ||
| var serverCmd *exec.Cmd | ||
| if os.Getenv("OLAKE_DEBUG_MODE") != "" { |
There was a problem hiding this comment.
how this will work if server already started?
There was a problem hiding this comment.
in each sync ,server starts once, if provided before sync start, it will work
| } | ||
|
|
||
| public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier, Schema schema) { | ||
| ensureNamespace(icebergCatalog, tableIdentifier); |
There was a problem hiding this comment.
do we have exception handling for table already exist?
| BooleanSupplier cancelled) { | ||
| // Session already torn down before we started — don't create a writer that | ||
| // nothing will commit/close. | ||
| if (cancelled.getAsBoolean()) { |
There was a problem hiding this comment.
will think a better way of it, commenting for reminder
| // Only raise the flag — never touch the writer here. This runs on | ||
| // a different gRPC thread than the in-flight write, and closing it | ||
| // concurrently with writer.write() is what corrupts Parquet. The | ||
| // write loop sees `cancelled` and closes its own writer; we don't wait. |
There was a problem hiding this comment.
memory things need to be removed as there will be a lot of memory hogged by the writer even after close
|
|
||
| public Table loadIcebergTable(TableIdentifier tableId, Schema schema) { | ||
| private Table loadOrCreateTable(TableIdentifier tableId, Schema schema, List<Map<String, String>> partitionTransforms) { | ||
| return IcebergUtil.loadIcebergTable(icebergCatalog, tableId).orElseGet(() -> { |
There was a problem hiding this comment.
we are using iceberg utill directly need to check consequences of it
| import io.grpc.stub.StreamObserver; | ||
| import jakarta.enterprise.context.Dependent; | ||
|
|
||
| /** |
There was a problem hiding this comment.
not reviewing it as of now, it is going to be similar as olakeRowingestor. A review at end is fine of it
…art per thread session
…eat/unified-jvm
| // receive the already-running JVM. This is intentional: in a single OLake sync | ||
| // the destination config is fixed. | ||
| func initializeServer(config *Config) (*serverInstance, error) { | ||
| startOnce.Do(func() { |
| // Initialize starts destination-level process resources once, up front, if the | ||
| // registered writer implements Initializable. No-op for destinations without | ||
| // long-lived resources (parquet). Pair it with a deferred Shutdown. | ||
| func Initialize(ctx context.Context, config *types.WriterConfig) error { |
There was a problem hiding this comment.
can we do this in writer pool where we dont need to unmarshal config again, these we can do in this way
-> writer pool start the server
-> the object of server will be global in iceberg and can be used by all instances that get created
| // implements Shutdownable. No-op for destinations without long-lived | ||
| // resources (parquet). | ||
| func Shutdown(ctx context.Context, config *types.WriterConfig) { | ||
| if config == nil { |
There was a problem hiding this comment.
shutdown would be writer pool cleanup function where it cleans up the instance of server
| if err := s.Shutdown(ctx); err != nil { | ||
| logger.Warnf("destination.Shutdown: %s", err) | ||
| } | ||
| } |
There was a problem hiding this comment.
not related to above change:
But can we somehow unmarhsal config only once, right now we are unmarshaling it every time while creating new writer
… into feat/unified-jvm
Description
This PR refactors the Iceberg writer architecture to use a single shared JVM per OLake CLI invocation instead of creating one JVM per writer thread/chunk.
Previously, every Iceberg writer initialization spawned a dedicated JVM-backed Iceberg client and gRPC server, resulting in significant memory overhead under concurrent backfill and CDC workloads. Large syncs could create many JVM processes, leading to excessive memory consumption and potential OOM issues.
Changes
Introduced a shared JVM architecture where all streams and chunks communicate with a single JVM instance.
Added
ThreadSession-based isolation to maintain per-stream/per-chunk state within the shared JVM.Moved stream-specific configuration from JVM startup arguments to gRPC request metadata:
Added
StreamMetaCtxon the Go side to propagate stream-specific metadata with every request.Added
ThreadSessionmanagement in Java to maintain isolated:Added
CLOSE_SESSIONRPC operation for explicit session cleanup and resource release.Retained catalog initialization and other truly global resources at the JVM level.
Benefits
Fixes # (issue)
Type of change
How Has This Been Tested?
CLOSE_SESSION.Screenshots or Recordings
N/A
Documentation
Related PR's (If Any):
N/A