Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions destination/parquet/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ type Config struct {
Prefix string `json:"s3_path,omitempty"`
// S3 endpoint for custom S3-compatible services (like MinIO)
S3Endpoint string `json:"s3_endpoint,omitempty"`
// Continuous streaming thresholds
TargetFileSizeMB int `json:"target_file_size,omitempty"`
MaxLatencySeconds int `json:"max_latency,omitempty"`
}

func (c *Config) Validate() error {
Expand Down
114 changes: 112 additions & 2 deletions destination/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
Expand Down Expand Up @@ -40,6 +42,12 @@ type Parquet struct {
partitionedFiles map[string]*FileMetadata // mapping of basePath/{regex} -> pqFiles
s3Client *s3.S3
schema typeutils.Fields
mu sync.Mutex
lastFlushAt time.Time
targetBytes int64
maxLatency time.Duration
flushTicker *time.Ticker
stopCh chan struct{}
}

// GetConfigRef returns the config reference for the parquet writer.
Expand Down Expand Up @@ -119,6 +127,7 @@ func (p *Parquet) Setup(_ context.Context, stream types.StreamInterface, schema
p.partitionedFiles = make(map[string]*FileMetadata)
p.basePath = filepath.Join(p.stream.GetDestinationDatabase(nil), p.stream.GetDestinationTable())
p.schema = make(typeutils.Fields)
p.stopCh = make(chan struct{})

// for s3 p.config.path may not be provided
if p.config.Path == "" {
Expand All @@ -130,6 +139,21 @@ func (p *Parquet) Setup(_ context.Context, stream types.StreamInterface, schema
return nil, err
}

// configure continuous thresholds and start background flusher if enabled
// Note: target_file_size is provided in KB; convert to bytes
if p.config.TargetFileSizeMB > 0 {
p.targetBytes = int64(p.config.TargetFileSizeMB) * 1024
}
if p.config.MaxLatencySeconds > 0 {
p.maxLatency = time.Duration(p.config.MaxLatencySeconds) * time.Second
}
if p.targetBytes > 0 || p.maxLatency > 0 {
p.lastFlushAt = time.Now()
p.flushTicker = time.NewTicker(time.Second)
go p.backgroundFlusher()
logger.Infof("Thread[%s]: background flusher started (target_bytes=%d, max_latency=%s)", p.options.ThreadID, p.targetBytes, p.maxLatency)
}

if !p.stream.NormalizationEnabled() {
return p.schema, nil
}
Expand All @@ -155,16 +179,19 @@ func (p *Parquet) Write(_ context.Context, records []types.RawRecord) error {
for _, record := range records {
record.OlakeTimestamp = time.Now().UTC()
partitionedPath := p.getPartitionedFilePath(record.Data, record.OlakeTimestamp)
p.mu.Lock()
partitionFile, exists := p.partitionedFiles[partitionedPath]
if !exists {
err := p.createNewPartitionFile(partitionedPath)
if err != nil {
p.mu.Unlock()
return fmt.Errorf("failed to create parititon file: %s", err)
}
partitionFile = p.partitionedFiles[partitionedPath]
}

if partitionFile == nil {
p.mu.Unlock()
return fmt.Errorf("failed to create partition file for path[%s]", partitionedPath)
}

Expand All @@ -174,6 +201,7 @@ func (p *Parquet) Write(_ context.Context, records []types.RawRecord) error {
} else {
_, err = partitionFile.writer.(*pqgo.GenericWriter[types.RawRecord]).Write([]types.RawRecord{record})
}
p.mu.Unlock()
if err != nil {
return fmt.Errorf("failed to write in parquet file: %s", err)
}
Expand Down Expand Up @@ -240,6 +268,8 @@ func (p *Parquet) closePqFiles() error {
logger.Debugf("Thread[%s]: Deleted file [%s], reason (%s).", p.options.ThreadID, filePath, reason)
}

p.mu.Lock()
defer p.mu.Unlock()
for basePath, parquetFile := range p.partitionedFiles {
// construct full file path
filePath := filepath.Join(p.config.Path, basePath, parquetFile.fileName)
Expand All @@ -252,7 +282,7 @@ func (p *Parquet) closePqFiles() error {
err = parquetFile.writer.(*pqgo.GenericWriter[types.RawRecord]).Close()
}
if err != nil {
return fmt.Errorf("failed to close writer: %s", err)
logger.Warnf("Thread[%s]: Failed to close parquet writer: %s", p.options.ThreadID, err)
}

// Close file
Expand Down Expand Up @@ -293,11 +323,20 @@ func (p *Parquet) closePqFiles() error {
}
}
// make map empty
p.partitionedFiles = make(map[string]*FileMetadata)
// p.partitionedFiles = make(map[string]*FileMetadata)
for k := range p.partitionedFiles {
delete(p.partitionedFiles, k)
}
return nil
}

func (p *Parquet) Close(_ context.Context) error {
if p.flushTicker != nil {
p.flushTicker.Stop()
}
if p.stopCh != nil {
close(p.stopCh)
}
return p.closePqFiles()
}

Expand Down Expand Up @@ -508,3 +547,74 @@ func init() {
return new(Parquet)
}
}

// backgroundFlusher periodically checks size and latency thresholds to rotate files.
// func (p *Parquet) backgroundFlusher() {
// for {
// select {
// case <-p.stopCh:
// return
// case <-p.flushTicker.C:
// shouldFlush := false
// if p.maxLatency > 0 && time.Since(p.lastFlushAt) >= p.maxLatency {
// shouldFlush = true
// }
// if !shouldFlush && p.targetBytes > 0 {
// p.mu.Lock()
// for basePath, parquetFile := range p.partitionedFiles {
// filePath := filepath.Join(p.config.Path, basePath, parquetFile.fileName)
// info, err := os.Stat(filePath)
// if err == nil && info.Size() >= p.targetBytes {
// shouldFlush = true
// break
// }
// }
// p.mu.Unlock()
// }
// if shouldFlush {
// if err := p.closePqFiles(); err != nil {
// logger.Warnf("Thread[%s]: failed to flush parquet files: %s", p.options.ThreadID, err)
// continue
// }
// p.lastFlushAt = time.Now()
// }
// }
// }
// }

func (p *Parquet) backgroundFlusher() {
for {
select {
case <-p.stopCh:
return
case <-p.flushTicker.C:
shouldFlush := false
if p.maxLatency > 0 && time.Since(p.lastFlushAt) >= p.maxLatency {
shouldFlush = true
}
if !shouldFlush && p.targetBytes > 0 {
p.mu.Lock()
for basePath, parquetFile := range p.partitionedFiles {
filePath := filepath.Join(p.config.Path, basePath, parquetFile.fileName)
info, err := os.Stat(filePath)
if err == nil && info.Size() >= p.targetBytes {
shouldFlush = true
logger.Debugf("Thread[%s]: File size threshold reached (%d >= %d), should flush",
p.options.ThreadID, info.Size(), p.targetBytes)
break
}
}
p.mu.Unlock()
}
if shouldFlush {
logger.Infof("Thread[%s]: Flushing parquet files", p.options.ThreadID)
if err := p.closePqFiles(); err != nil {
logger.Warnf("Thread[%s]: failed to flush parquet files: %s", p.options.ThreadID, err)
continue
}
p.lastFlushAt = time.Now()
logger.Infof("Thread[%s]: background flusher rotated files", p.options.ThreadID)
}
}
}
}
2 changes: 1 addition & 1 deletion drivers/abstract/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (a *AbstractDriver) Discover(ctx context.Context) ([]*types.Stream, error)
}

if a.driver.CDCSupported() {
convStream.WithSyncMode(types.CDC, types.STRICTCDC)
convStream.WithSyncMode(types.CDC, types.STRICTCDC, types.CONTINUOUS)
convStream.SyncMode = types.CDC
} else {
// remove cdc column as it is not supported
Expand Down
9 changes: 9 additions & 0 deletions drivers/postgres/internal/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,21 @@ func (p *Postgres) prepareWALJSConfig(streams ...types.StreamInterface) (*waljs.
return nil, fmt.Errorf("invalid call; %s not running in CDC mode", p.Type())
}

continuous := false
for _, s := range streams {
if s.GetStream().SyncMode == types.CONTINUOUS {
continuous = true
break
}
}

return &waljs.Config{
Connection: *p.config.Connection,
ReplicationSlotName: p.cdcConfig.ReplicationSlot,
InitialWaitTime: time.Duration(p.cdcConfig.InitialWaitTime) * time.Second,
Tables: types.NewSet[types.StreamInterface](streams...),
BatchSize: p.config.BatchSize,
Continuous: continuous,
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/waljs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type Config struct {
InitialWaitTime time.Duration
TLSConfig *tls.Config
BatchSize int
// Continuous indicates we should not exit when we reach current WAL end;
// instead, keep streaming and wait for new WAL records.
Continuous bool
}

type WALState struct {
Expand Down
10 changes: 9 additions & 1 deletion pkg/waljs/waljs.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Socket struct {
replicationSlot string
// initialWaitTime is the duration to wait for initial data before timing out
initialWaitTime time.Duration
// continuous indicates whether to keep streaming beyond current WAL end
continuous bool
}

func NewConnection(ctx context.Context, db *sqlx.DB, config *Config, typeConverter func(value interface{}, columnType string) (interface{}, error)) (*Socket, error) {
Expand Down Expand Up @@ -104,6 +106,7 @@ func NewConnection(ctx context.Context, db *sqlx.DB, config *Config, typeConvert
CurrentWalPosition: slot.CurrentLSN,
replicationSlot: config.ReplicationSlotName,
initialWaitTime: config.InitialWaitTime,
continuous: config.Continuous,
}, nil
}

Expand Down Expand Up @@ -174,14 +177,19 @@ func (s *Socket) StreamMessages(ctx context.Context, db *sqlx.DB, callback abstr
return nil
}

if s.ClientXLogPos >= s.CurrentWalPosition {
// Stop at current WAL end only when not running in continuous mode.
if s.ClientXLogPos >= s.CurrentWalPosition && !s.continuous {
logger.Infof("finishing sync, reached wal position: %s", s.CurrentWalPosition)
return nil
}

msg, err := s.pgConn.ReceiveMessage(ctx)
if err != nil {
if strings.Contains(err.Error(), "EOF") {
if s.continuous {
time.Sleep(200 * time.Millisecond)
continue
}
return nil
}
return fmt.Errorf("failed to receive message from wal logs: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion protocol/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ var syncCmd = &cobra.Command{

selectedStreams = append(selectedStreams, elem.ID())
switch elem.Stream.SyncMode {
case types.CDC, types.STRICTCDC:
case types.CDC, types.STRICTCDC, types.CONTINUOUS:
cdcStreams = append(cdcStreams, elem)
streamState, exists := stateStreamMap[fmt.Sprintf("%s.%s", elem.Namespace(), elem.Name())]
if exists {
Expand Down
1 change: 1 addition & 0 deletions types/sync_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ const (
INCREMENTAL SyncMode = "incremental"
CDC SyncMode = "cdc"
STRICTCDC SyncMode = "strict_cdc"
CONTINUOUS SyncMode = "continuous"
)
Loading