@@ -28,9 +28,11 @@ import (
2828)
2929
3030type FileMetadata struct {
31- fileName string
32- writer any
33- file source.ParquetFile
31+ fileName string
32+ writer any
33+ file source.ParquetFile
34+ recordCount int // Track number of records written
35+ fileSize int64 // Track approximate file size
3436}
3537
3638// Parquet destination writes Parquet files to a local path and optionally uploads them to S3.
@@ -55,6 +57,42 @@ func (p *Parquet) Spec() any {
5557 return Config {}
5658}
5759
60+ // getCompressionCodec returns the compression codec based on config, defaults to Snappy
61+ func (p * Parquet ) getCompressionCodec () pqgo.Compress {
62+ if p .config .Compression == "" || p .config .Compression == "snappy" {
63+ return & pqgo .Snappy
64+ }
65+ switch p .config .Compression {
66+ case "gzip" :
67+ return & pqgo .Gzip
68+ case "zstd" :
69+ return & pqgo .Zstd
70+ case "lz4" :
71+ return & pqgo .Lz4Raw
72+ case "none" , "uncompressed" :
73+ return & pqgo .Uncompressed
74+ default :
75+ logger .Warnf ("Thread[%s]: unknown compression codec %s, using snappy" , p .options .ThreadID , p .config .Compression )
76+ return & pqgo .Snappy
77+ }
78+ }
79+
80+ // getMaxFileSize returns the max file size in bytes, defaults to 128MB
81+ func (p * Parquet ) getMaxFileSize () int64 {
82+ if p .config .MaxFileSize > 0 {
83+ return p .config .MaxFileSize
84+ }
85+ return 128 * 1024 * 1024 // 128MB default
86+ }
87+
88+ // getRowGroupSize returns the row group size, defaults to parquet-go's default
89+ func (p * Parquet ) getRowGroupSize () int {
90+ if p .config .RowGroupSize > 0 {
91+ return p .config .RowGroupSize
92+ }
93+ return 0 // Let parquet-go use its default
94+ }
95+
5896// setup s3 client if credentials provided
5997func (p * Parquet ) initS3Writer () error {
6098 if p .config .Bucket == "" || p .config .Region == "" {
@@ -97,20 +135,35 @@ func (p *Parquet) createNewPartitionFile(basePath string) error {
97135 return fmt .Errorf ("failed to create parquet file writer: %s" , err )
98136 }
99137
138+ // Build writer options
139+ writerOptions := []pqgo.WriterOption {
140+ pqgo .Compression (p .getCompressionCodec ()),
141+ }
142+
143+ // Add row group size option if configured
144+ if p .getRowGroupSize () > 0 {
145+ writerOptions = append (writerOptions , pqgo .PageBufferSize (p .getRowGroupSize ()))
146+ }
147+
148+ // TODO: Implement column sorting once we have the sort columns configuration
149+ // This would require schema analysis and potentially buffering data for sorting
150+
100151 writer := func () any {
101152 if p .stream .NormalizationEnabled () {
102- return pqgo .NewGenericWriter [any ](pqFile , p .schema .ToTypeSchema ().ToParquet (), pqgo . Compression ( & pqgo . Snappy ) )
153+ return pqgo .NewGenericWriter [any ](pqFile , p .schema .ToTypeSchema ().ToParquet (), writerOptions ... )
103154 }
104- return pqgo .NewGenericWriter [types.RawRecord ](pqFile , types .GetParquetRawSchema (), pqgo . Compression ( & pqgo . Snappy ) )
155+ return pqgo .NewGenericWriter [types.RawRecord ](pqFile , types .GetParquetRawSchema (), writerOptions ... )
105156 }()
106157
107158 p .partitionedFiles [basePath ] = & FileMetadata {
108- fileName : fileName ,
109- file : pqFile ,
110- writer : writer ,
159+ fileName : fileName ,
160+ file : pqFile ,
161+ writer : writer ,
162+ recordCount : 0 ,
163+ fileSize : 0 ,
111164 }
112165
113- logger .Infof ("Thread[%s]: created new partition file[%s]" , p .options .ThreadID , filePath )
166+ logger .Infof ("Thread[%s]: created new partition file[%s] with compression[%s] " , p .options .ThreadID , filePath , p . config . Compression )
114167 return nil
115168}
116169
@@ -154,10 +207,28 @@ func (p *Parquet) Setup(_ context.Context, stream types.StreamInterface, schema
154207// Write writes a record to the Parquet file.
155208func (p * Parquet ) Write (_ context.Context , records []types.RawRecord ) error {
156209 // TODO: use batch writing feature of pq writer
210+ maxFileSize := p .getMaxFileSize ()
211+
157212 for _ , record := range records {
158213 record .OlakeTimestamp = time .Now ().UTC ()
159214 partitionedPath := p .getPartitionedFilePath (record .Data , record .OlakeTimestamp )
160215 partitionFile , exists := p .partitionedFiles [partitionedPath ]
216+
217+ // Check if file exists and if it has exceeded max file size
218+ if exists && maxFileSize > 0 && partitionFile .fileSize >= maxFileSize {
219+ logger .Infof ("Thread[%s]: File size limit reached (%d >= %d), rotating file for partition[%s]" ,
220+ p .options .ThreadID , partitionFile .fileSize , maxFileSize , partitionedPath )
221+
222+ // Close and upload the current file
223+ if err := p .closeAndUploadPartitionFile (partitionedPath , partitionFile ); err != nil {
224+ return fmt .Errorf ("failed to rotate partition file: %s" , err )
225+ }
226+
227+ // Remove from map so a new file will be created
228+ delete (p .partitionedFiles , partitionedPath )
229+ exists = false
230+ }
231+
161232 if ! exists {
162233 err := p .createNewPartitionFile (partitionedPath )
163234 if err != nil {
@@ -179,6 +250,12 @@ func (p *Parquet) Write(_ context.Context, records []types.RawRecord) error {
179250 if err != nil {
180251 return fmt .Errorf ("failed to write in parquet file: %s" , err )
181252 }
253+
254+ // Update record count and approximate file size
255+ partitionFile .recordCount ++
256+ // Rough estimate: assume each record is ~1KB on average
257+ // This is a simplification; actual size would need file stat checks
258+ partitionFile .fileSize += 1024
182259 }
183260
184261 return nil
@@ -232,7 +309,8 @@ func (p *Parquet) Check(_ context.Context) error {
232309 return nil
233310}
234311
235- func (p * Parquet ) closePqFiles () error {
312+ // closeAndUploadPartitionFile closes a single partition file and uploads to S3 if configured
313+ func (p * Parquet ) closeAndUploadPartitionFile (basePath string , parquetFile * FileMetadata ) error {
236314 removeLocalFile := func (filePath , reason string ) {
237315 err := os .Remove (filePath )
238316 if err != nil {
@@ -242,56 +320,64 @@ func (p *Parquet) closePqFiles() error {
242320 logger .Debugf ("Thread[%s]: Deleted file [%s], reason (%s)." , p .options .ThreadID , filePath , reason )
243321 }
244322
245- for basePath , parquetFile := range p .partitionedFiles {
246- // construct full file path
247- filePath := filepath .Join (p .config .Path , basePath , parquetFile .fileName )
323+ // construct full file path
324+ filePath := filepath .Join (p .config .Path , basePath , parquetFile .fileName )
248325
249- // Close writers
250- var err error
251- if p .stream .NormalizationEnabled () {
252- err = parquetFile .writer .(* pqgo.GenericWriter [any ]).Close ()
253- } else {
254- err = parquetFile .writer .(* pqgo.GenericWriter [types.RawRecord ]).Close ()
255- }
326+ // Close writers
327+ var err error
328+ if p .stream .NormalizationEnabled () {
329+ err = parquetFile .writer .(* pqgo.GenericWriter [any ]).Close ()
330+ } else {
331+ err = parquetFile .writer .(* pqgo.GenericWriter [types.RawRecord ]).Close ()
332+ }
333+ if err != nil {
334+ return fmt .Errorf ("failed to close writer: %s" , err )
335+ }
336+
337+ // Close file
338+ if err := parquetFile .file .Close (); err != nil {
339+ return fmt .Errorf ("failed to close file: %s" , err )
340+ }
341+
342+ logger .Infof ("Thread[%s]: Finished writing file [%s] with %d records." , p .options .ThreadID , filePath , parquetFile .recordCount )
343+
344+ if p .s3Client != nil {
345+ // Open file for S3 upload
346+ file , err := os .Open (filePath )
256347 if err != nil {
257- return fmt .Errorf ("failed to close writer : %s" , err )
348+ return fmt .Errorf ("failed to open file : %s" , err )
258349 }
350+ defer file .Close ()
259351
260- // Close file
261- if err := parquetFile .file .Close (); err != nil {
262- return fmt .Errorf ("failed to close file: %s" , err )
352+ // Construct S3 key path
353+ s3KeyPath := basePath
354+ if p .config .Prefix != "" {
355+ s3KeyPath = filepath .Join (p .config .Prefix , s3KeyPath )
263356 }
357+ s3KeyPath = filepath .Join (s3KeyPath , parquetFile .fileName )
264358
265- logger .Infof ("Thread[%s]: Finished writing file [%s]." , p .options .ThreadID , filePath )
359+ // Upload to S3
360+ _ , err = p .s3Client .PutObject (& s3.PutObjectInput {
361+ Bucket : aws .String (p .config .Bucket ),
362+ Key : aws .String (s3KeyPath ),
363+ Body : file ,
364+ })
365+ if err != nil {
366+ return fmt .Errorf ("failed to put object into s3: %s" , err )
367+ }
266368
267- if p .s3Client != nil {
268- // Open file for S3 upload
269- file , err := os .Open (filePath )
270- if err != nil {
271- return fmt .Errorf ("failed to open file: %s" , err )
272- }
273- defer file .Close ()
369+ // Remove local file after successful upload
370+ removeLocalFile (filePath , "uploaded to S3" )
371+ logger .Infof ("Thread[%s]: successfully uploaded file to S3: s3://%s/%s" , p .options .ThreadID , p .config .Bucket , s3KeyPath )
372+ }
274373
275- // Construct S3 key path
276- s3KeyPath := basePath
277- if p .config .Prefix != "" {
278- s3KeyPath = filepath .Join (p .config .Prefix , s3KeyPath )
279- }
280- s3KeyPath = filepath .Join (s3KeyPath , parquetFile .fileName )
281-
282- // Upload to S3
283- _ , err = p .s3Client .PutObject (& s3.PutObjectInput {
284- Bucket : aws .String (p .config .Bucket ),
285- Key : aws .String (s3KeyPath ),
286- Body : file ,
287- })
288- if err != nil {
289- return fmt .Errorf ("failed to put object into s3: %s" , err )
290- }
374+ return nil
375+ }
291376
292- // Remove local file after successful upload
293- removeLocalFile (filePath , "uploaded to S3" )
294- logger .Infof ("Thread[%s]: successfully uploaded file to S3: s3://%s/%s" , p .options .ThreadID , p .config .Bucket , s3KeyPath )
377+ func (p * Parquet ) closePqFiles () error {
378+ for basePath , parquetFile := range p .partitionedFiles {
379+ if err := p .closeAndUploadPartitionFile (basePath , parquetFile ); err != nil {
380+ return err
295381 }
296382 }
297383 // make map empty
0 commit comments