Skip to content

Commit 824785e

Browse files
committed
feat: own destination JVM + config lifecycle in WriterPool
1 parent 8caa4da commit 824785e

8 files changed

Lines changed: 109 additions & 140 deletions

File tree

destination/iceberg/iceberg.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ func (i *Iceberg) GetConfigRef() destination.Config {
6262
return i.config
6363
}
6464

65+
// SetConfig points this writer at a shared, already-parsed config instance.
66+
func (i *Iceberg) SetConfig(c destination.Config) {
67+
i.config = c.(*Config)
68+
}
69+
6570
func (i *Iceberg) Spec() any {
6671
return Config{}
6772
}
@@ -108,7 +113,7 @@ func (i *Iceberg) Setup(ctx context.Context, stream types.StreamInterface, globa
108113
// writer thread runs, so here we only read the running instance.
109114
server := getServer()
110115
if server == nil {
111-
return nil, nil, fmt.Errorf("iceberg server not initialized; destination.Initialize must run before Setup")
116+
return nil, nil, fmt.Errorf("iceberg server not initialized; the writer pool must initialize the destination before Setup")
112117
}
113118

114119
// persist server details
@@ -262,7 +267,7 @@ func (i *Iceberg) Check(ctx context.Context) error {
262267
// Get-only: the JVM is provisioned once by the protocol layer before Check.
263268
server := getServer()
264269
if server == nil {
265-
return fmt.Errorf("iceberg server not initialized; destination.Initialize must run before Check")
270+
return fmt.Errorf("iceberg server not initialized; the writer pool must initialize the destination before Check")
266271
}
267272

268273
// Stash for releaseSession to find on defer.
@@ -621,7 +626,7 @@ func (i *Iceberg) DropStreams(ctx context.Context, dropStreams []types.StreamInt
621626
// Get-only: the JVM is provisioned once by the protocol layer before clear.
622627
server := getServer()
623628
if server == nil {
624-
return fmt.Errorf("iceberg server not initialized; destination.Initialize must run before DropStreams")
629+
return fmt.Errorf("iceberg server not initialized; the writer pool must initialize the destination before DropStreams")
625630
}
626631

627632
// Stash so releaseSession can drop the drop-thread state on the JVM side.

destination/iceberg/java_client.go

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"os/exec"
99
"strconv"
1010
"strings"
11-
"sync"
1211
"sync/atomic"
1312
"syscall"
1413
"time"
@@ -34,13 +33,9 @@ type serverInstance struct {
3433

3534
// Shared single-JVM state for the lifetime of the process. Every Iceberg writer
3635
// connects to this one JVM; per-stream context rides on each gRPC payload.
37-
// initializeServer starts it once (guarded by startOnce), getServer loads the
38-
// pointer lock-free, and shutdownSharedServer swaps it out lock-free.
39-
var (
40-
sharedServer atomic.Pointer[serverInstance]
41-
startOnce sync.Once
42-
startErr error
43-
)
36+
// initializeServer starts it, getServer loads the pointer lock-free, and
37+
// shutdownSharedServer swaps it out lock-free.
38+
var sharedServer atomic.Pointer[serverInstance]
4439

4540
// getServerConfigJSON builds the catalog/storage-level config the JVM consumes
4641
// at startup. Per-stream concepts (namespace, upsert, identifier-fields,
@@ -122,26 +117,21 @@ func getServerConfigJSON(config *Config, port int, arrowWriterEnabled bool) ([]b
122117
return json.Marshal(serverConfig)
123118
}
124119

125-
// initializeServer launches the shared JVM exactly once and returns it. This is
126-
// the single place a JVM is started — invoked from the protocol layer via
127-
// Iceberg.Initialize before any sync/check/clear work begins. Concurrent/repeat
128-
// callers all observe the same instance. The catalog/storage portion of `config`
129-
// is what drives the JVM CLI; later callers that pass a different config still
130-
// receive the already-running JVM. This is intentional: in a single OLake sync
131-
// the destination config is fixed.
120+
// initializeServer launches the shared JVM and returns it. This is the single
121+
// place a JVM is started — invoked from the protocol layer via Iceberg.Initialize
122+
// (WriterPool.NewWriterPool) before any sync/check/clear work begins, so it runs
123+
// exactly once with no concurrency. The nil-check keeps it idempotent if called
124+
// again, returning the already-running instance.
132125
func initializeServer(config *Config) (*serverInstance, error) {
133-
startOnce.Do(func() {
134-
inst, err := startSharedServer(config)
135-
if err != nil {
136-
startErr = err
137-
return
138-
}
139-
sharedServer.Store(inst)
140-
})
141-
if startErr != nil {
142-
return nil, startErr
126+
if inst := sharedServer.Load(); inst != nil {
127+
return inst, nil
128+
}
129+
inst, err := startSharedServer(config)
130+
if err != nil {
131+
return nil, err
143132
}
144-
return sharedServer.Load(), nil
133+
sharedServer.Store(inst)
134+
return inst, nil
145135
}
146136

147137
// getServer returns the running shared JVM lock-free (read path for Check,

destination/interface.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ type Write = func(ctx context.Context, channel <-chan types.Record) error
1414
type FlattenFunction = func(record types.Record) (types.Record, error)
1515

1616
type Writer interface {
17+
// GetConfigRef returns a fresh config pointer to unmarshal destination config into.
1718
GetConfigRef() Config
19+
// SetConfig assigns a shared, pre-parsed read-only config instance provided by the writer pool.
20+
SetConfig(Config)
1821
Spec() any
1922
Type() string
2023
// Sets up connections and perform checks; doesn't load Streams

destination/parquet/parquet.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ func (p *Parquet) GetConfigRef() destination.Config {
5555
return p.config
5656
}
5757

58+
// SetConfig points this writer at a shared, already-parsed config instance.
59+
func (p *Parquet) SetConfig(c destination.Config) {
60+
p.config = c.(*Config)
61+
}
62+
5863
// Spec returns a new Config instance.
5964
func (p *Parquet) Spec() any {
6065
return Config{}
@@ -129,11 +134,6 @@ func (p *Parquet) Setup(_ context.Context, stream types.StreamInterface, schema
129134
p.basePath = filepath.Join(p.stream.GetDestinationDatabase(nil), p.stream.GetDestinationTable())
130135
p.schema = make(typeutils.Fields)
131136

132-
// for s3 p.config.path may not be provided
133-
if p.config.Path == "" {
134-
p.config.Path = os.TempDir()
135-
}
136-
137137
err := p.initS3Writer()
138138
if err != nil {
139139
return nil, nil, err

destination/writers.go

Lines changed: 55 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ type (
3939
}
4040

4141
WriterPool struct {
42-
configMutex sync.Mutex
4342
stats *Stats
44-
config any
43+
lifecycle Writer // adapter owning destination-level resources; reused for Clear/Close (no re-parse)
44+
parsedConfig Config // destination config parsed once; shared by every writer thread (read-only after parse)
4545
init NewFunc
4646
writerSchema sync.Map
4747
batchSize int64
@@ -90,19 +90,39 @@ func WithApplyFilter(applyFilter bool) ThreadOptions {
9090
}
9191
}
9292

93+
// NewWriterPool builds a writer pool for a destination. It owns the lifecycle of
94+
// destination-level process resources: it starts them up front (e.g. the Iceberg
95+
// shared JVM, via Initializable), validates the connection (Check), and exposes
96+
// Close to tear them down. Call pool.Close() when done (defer it right after a
97+
// successful NewWriterPool).
9398
func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams []string, batchSize int64) (*WriterPool, error) {
9499
newfunc, found := RegisteredWriters[config.Type]
95100
if !found {
96101
return nil, fmt.Errorf("invalid destination type has been passed [%s]", config.Type)
97102
}
98103

104+
// Parse the destination config exactly once into a shared instance. Check may
105+
// finalize it (e.g. Parquet sets a temp staging Path for S3); every writer
106+
// thread then reuses this same instance read-only via SetConfig.
99107
adapter := newfunc()
100-
if err := utils.Unmarshal(config.WriterConfig, adapter.GetConfigRef()); err != nil {
101-
return nil, err
108+
parsedConfig := adapter.GetConfigRef()
109+
if err := utils.Unmarshal(config.WriterConfig, parsedConfig); err != nil {
110+
return nil, fmt.Errorf("failed to unmarshal destination config: %s", err)
102111
}
103112

104-
err := adapter.Check(ctx)
105-
if err != nil {
113+
// Start long-lived destination resources before any check/setup work.
114+
if in, ok := adapter.(Initializable); ok {
115+
if err := in.Initialize(ctx); err != nil {
116+
return nil, fmt.Errorf("failed to initialize destination: %s", err)
117+
}
118+
}
119+
120+
if err := adapter.Check(ctx); err != nil {
121+
// Roll back anything Initialize started so we don't orphan it; there is
122+
// no pool object for the caller to Close on this error path.
123+
if s, ok := adapter.(Shutdownable); ok {
124+
_ = s.Shutdown(ctx)
125+
}
106126
return nil, fmt.Errorf("failed to test destination: %s", err)
107127
}
108128

@@ -113,9 +133,10 @@ func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams
113133
ReadCount: atomic.Int64{},
114134
RecordsFiltered: atomic.Int64{},
115135
},
116-
config: config.WriterConfig,
117-
init: newfunc,
118-
batchSize: batchSize,
136+
lifecycle: adapter,
137+
parsedConfig: parsedConfig,
138+
init: newfunc,
139+
batchSize: batchSize,
119140
}
120141

121142
for _, stream := range syncStreams {
@@ -128,6 +149,28 @@ func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams
128149
return pool, nil
129150
}
130151

152+
// Close tears down destination-owned process resources (e.g. the Iceberg shared
153+
// JVM) via the pool's lifecycle adapter. Idempotent and safe to defer right
154+
// after a successful NewWriterPool.
155+
func (w *WriterPool) Close(ctx context.Context) {
156+
s, ok := w.lifecycle.(Shutdownable)
157+
if !ok {
158+
return
159+
}
160+
if err := s.Shutdown(ctx); err != nil {
161+
logger.Warnf("WriterPool.Close: %s", err)
162+
}
163+
}
164+
165+
// Clear drops the given streams from the destination, reusing the pool's
166+
// already-initialized lifecycle adapter (and its parsed config).
167+
func (w *WriterPool) Clear(ctx context.Context, dropStreams []types.StreamInterface) error {
168+
if len(dropStreams) == 0 {
169+
return nil
170+
}
171+
return w.lifecycle.DropStreams(ctx, dropStreams)
172+
}
173+
131174
func (w *WriterPool) AddRecordsToSyncStats(count int64) {
132175
w.stats.TotalRecordsToSync.Add(count)
133176
}
@@ -156,14 +199,10 @@ func (w *WriterPool) NewWriter(ctx context.Context, stream types.StreamInterface
156199

157200
var writerThread Writer
158201
prevStreamState, err := func() (*types.MetadataState, error) {
159-
// init writer with configurations
202+
// init writer and point it at the config parsed once at pool creation,
203+
// shared read-only across all writer threads.
160204
writerThread = w.init()
161-
w.configMutex.Lock()
162-
err := utils.Unmarshal(w.config, writerThread.GetConfigRef())
163-
w.configMutex.Unlock()
164-
if err != nil {
165-
return nil, err
166-
}
205+
writerThread.SetConfig(w.parsedConfig)
167206

168207
// setup table and schema
169208
streamArtifact.mu.Lock()
@@ -296,68 +335,3 @@ func (wt *WriterThread) Close(ctx context.Context, finalMetadataState any) (err
296335
}
297336
}
298337

299-
func ClearDestination(ctx context.Context, config *types.WriterConfig, dropStreams []types.StreamInterface) error {
300-
newfunc, found := RegisteredWriters[config.Type]
301-
if !found {
302-
return fmt.Errorf("invalid destination type has been passed [%s]", config.Type)
303-
}
304-
305-
adapter := newfunc()
306-
if err := utils.Unmarshal(config.WriterConfig, adapter.GetConfigRef()); err != nil {
307-
return err
308-
}
309-
310-
if dropStreams != nil {
311-
if err := adapter.DropStreams(ctx, dropStreams); err != nil {
312-
return fmt.Errorf("failed to drop the streams: %s", err)
313-
}
314-
}
315-
return nil
316-
}
317-
318-
// Initialize starts destination-level process resources once, up front, if the
319-
// registered writer implements Initializable. No-op for destinations without
320-
// long-lived resources (parquet). Pair it with a deferred Shutdown.
321-
func Initialize(ctx context.Context, config *types.WriterConfig) error {
322-
if config == nil {
323-
return nil
324-
}
325-
newfunc, found := RegisteredWriters[config.Type]
326-
if !found {
327-
return fmt.Errorf("invalid destination type has been passed [%s]", config.Type)
328-
}
329-
adapter := newfunc()
330-
if err := utils.Unmarshal(config.WriterConfig, adapter.GetConfigRef()); err != nil {
331-
return fmt.Errorf("failed to unmarshal destination config: %s", err)
332-
}
333-
in, ok := adapter.(Initializable)
334-
if !ok {
335-
return nil
336-
}
337-
return in.Initialize(ctx)
338-
}
339-
340-
// Shutdown invokes destination-level cleanup if the registered writer
341-
// implements Shutdownable. No-op for destinations without long-lived
342-
// resources (parquet).
343-
func Shutdown(ctx context.Context, config *types.WriterConfig) {
344-
if config == nil {
345-
return
346-
}
347-
newfunc, found := RegisteredWriters[config.Type]
348-
if !found {
349-
return
350-
}
351-
adapter := newfunc()
352-
if err := utils.Unmarshal(config.WriterConfig, adapter.GetConfigRef()); err != nil {
353-
logger.Warnf("destination.Shutdown: failed to unmarshal config: %s", err)
354-
return
355-
}
356-
s, ok := adapter.(Shutdownable)
357-
if !ok {
358-
return
359-
}
360-
if err := s.Shutdown(ctx); err != nil {
361-
logger.Warnf("destination.Shutdown: %s", err)
362-
}
363-
}

protocol/check.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@ var checkCmd = &cobra.Command{
3737
err := func() error {
3838
// If connector is not set, we are checking the destination
3939
if destinationConfigPath != "not-set" {
40-
defer destination.Shutdown(cmd.Context(), destinationConfig)
41-
if err := destination.Initialize(cmd.Context(), destinationConfig); err != nil {
42-
return fmt.Errorf("failed to initialize destination: %s", err)
40+
// NewWriterPool initializes destination resources and runs Check;
41+
// close immediately since a check has no further work.
42+
pool, err := destination.NewWriterPool(cmd.Context(), destinationConfig, nil, batchSize)
43+
if err != nil {
44+
return err
4345
}
44-
_, err := destination.NewWriterPool(cmd.Context(), destinationConfig, nil, batchSize)
45-
return err
46+
pool.Close(cmd.Context())
47+
return nil
4648
}
4749

4850
if configPath != "not-set" {

protocol/clear.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,14 @@ var clearCmd = &cobra.Command{
6262
// Setup new state after clear for connector
6363
connector.SetupState(newState)
6464

65-
// drop/clear streams from destination
66-
defer destination.Shutdown(cmd.Context(), destinationConfig)
67-
if err := destination.Initialize(cmd.Context(), destinationConfig); err != nil {
68-
return fmt.Errorf("failed to initialize destination: %s", err)
65+
// drop/clear streams from destination. The pool starts destination
66+
// resources (e.g. the Iceberg JVM) and Close tears them down.
67+
pool, err := destination.NewWriterPool(cmd.Context(), destinationConfig, nil, batchSize)
68+
if err != nil {
69+
return err
6970
}
70-
cerr := destination.ClearDestination(cmd.Context(), destinationConfig, dropStreams)
71-
if cerr != nil {
71+
defer pool.Close(cmd.Context())
72+
if cerr := pool.Clear(cmd.Context(), dropStreams); cerr != nil {
7273
return fmt.Errorf("failed to clear destination: %s", cerr)
7374
}
7475
logger.Infof("Successfully cleared destination data for selected streams.")

0 commit comments

Comments
 (0)