@@ -30,6 +30,9 @@ namespace facebook::velox::connector::hive::iceberg {
3030
3131namespace  {
3232
33+ constexpr  std::string_view kNotClusteredRowsErrorMsg  =
34+     " Incoming records violate the writer assumption that records are clustered by spec and \n  by partition within each spec. Either cluster the incoming records or switch to fanout writers.\n Encountered records that belong to already closed files:\n "  ;
35+ 
3336#define  WRITER_NON_RECLAIMABLE_SECTION_GUARD (index )       \
3437  memory::NonReclaimableSectionGuard nonReclaimableGuard ( \
3538      writerInfo_[(index)]->nonReclaimableSectionHolder.get())
@@ -213,7 +216,10 @@ IcebergDataSink::IcebergDataSink(
213216                    insertTableHandle->columnTransforms(),
214217                    hiveConfig->isPartitionPathAsLowerCase(
215218                        connectorQueryCtx->sessionProperties ()))
216-               : nullptr) {
219+               : nullptr),
220+       fanoutEnabled_(
221+           hiveConfig_->fanoutEnabled (connectorQueryCtx_->sessionProperties ())),
222+       currentWriterId_(0 ) {
217223  if  (isPartitioned ()) {
218224    partitionData_.resize (maxOpenWriters_);
219225  }
@@ -332,8 +338,6 @@ std::vector<std::string> IcebergDataSink::commitMessage() const {
332338}
333339
334340void  IcebergDataSink::splitInputRowsAndEnsureWriters (RowVectorPtr input) {
335-   VELOX_CHECK (isPartitioned ());
336- 
337341  std::fill (partitionSizes_.begin (), partitionSizes_.end (), 0 );
338342
339343  const  auto  numRows = partitionIds_.size ();
@@ -346,26 +350,7 @@ void IcebergDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) {
346350    if  (!partitionData_[index].empty ()) {
347351      continue ;
348352    }
349- 
350-     std::vector<folly::dynamic> partitionValues (partitionChannels_.size ());
351-     auto  icebergPartitionIdGenerator =
352-         dynamic_cast <const  IcebergPartitionIdGenerator*>(
353-             partitionIdGenerator_.get ());
354-     VELOX_CHECK_NOT_NULL (icebergPartitionIdGenerator);
355-     const  RowVectorPtr transformedValues =
356-         icebergPartitionIdGenerator->partitionValues ();
357-     for  (auto  i = 0 ; i < partitionChannels_.size (); ++i) {
358-       auto  block = transformedValues->childAt (i);
359-       if  (block->isNullAt (index)) {
360-         partitionValues[i] = nullptr ;
361-       } else  {
362-         DecodedVector decoded (*block);
363-         partitionValues[i] = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH (
364-             extractPartitionValue, block->typeKind (), &decoded, index);
365-       }
366-     }
367- 
368-     partitionData_[index] = partitionValues;
353+     buildPartitionData (index);
369354  }
370355
371356  for  (auto  i = 0 ; i < partitionSizes_.size (); ++i) {
@@ -376,6 +361,11 @@ void IcebergDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) {
376361  }
377362}
378363
364+ void  IcebergDataSink::computePartition (const  RowVectorPtr& input) {
365+   VELOX_CHECK (isPartitioned ());
366+   partitionIdGenerator_->run (input, partitionIds_);
367+ }
368+ 
379369void  IcebergDataSink::appendData (RowVectorPtr input) {
380370  checkRunning ();
381371  if  (!isPartitioned ()) {
@@ -384,22 +374,79 @@ void IcebergDataSink::appendData(RowVectorPtr input) {
384374    return ;
385375  }
386376
387-   //  Compute partition and bucket numbers.
388-   computePartitionAndBucketIds (input);
377+   computePartition (input);
389378
390-   splitInputRowsAndEnsureWriters (input);
379+   if  (fanoutEnabled_) {
380+     splitInputRowsAndEnsureWriters (input);
391381
392-   for  (auto  index = 0 ; index < writers_.size (); ++index) {
393-     const  vector_size_t  partitionSize = partitionSizes_[index];
394-     if  (partitionSize == 0 ) {
395-       continue ;
382+     for  (auto  index = 0 ; index < writers_.size (); ++index) {
383+       const  vector_size_t  partitionSize = partitionSizes_[index];
384+       if  (partitionSize == 0 ) {
385+         continue ;
386+       }
387+ 
388+       const  RowVectorPtr writerInput = partitionSize == input->size ()
389+           ? input
390+           : exec::wrap (partitionSize, partitionRows_[index], input);
391+       write (index, writerInput);
392+     }
393+   } else  { //  Clustered mode.
394+     std::fill (partitionSizes_.begin (), partitionSizes_.end (), 0 );
395+     const  auto  numRows = input->size ();
396+     uint32_t  index = 0 ;
397+     for  (auto  row = 0 ; row < numRows; ++row) {
398+       auto  id = getIcebergWriterId (row);
399+       index = ensureWriter (id);
400+       if  (currentWriterId_ != index) {
401+         clusteredWrite (input, currentWriterId_);
402+         closeWriter (currentWriterId_);
403+         completedWriterIds_.insert (currentWriterId_);
404+         VELOX_USER_CHECK_EQ (
405+             completedWriterIds_.count (index),
406+             0 ,
407+             " {}"  ,
408+             kNotClusteredRowsErrorMsg );
409+         currentWriterId_ = index;
410+       }
411+       updatePartitionRows (index, numRows, row);
412+       buildPartitionData (index);
396413    }
414+     clusteredWrite (input, index);
415+   }
416+ }
397417
398-     const  RowVectorPtr writerInput = partitionSize == input->size ()
399-         ? input
400-         : exec::wrap (partitionSize, partitionRows_[index], input);
401-     write (index, writerInput);
418+ void  IcebergDataSink::buildPartitionData (int32_t  index) {
419+   std::vector<folly::dynamic> partitionValues (partitionChannels_.size ());
420+   auto  icebergPartitionIdGenerator =
421+       dynamic_cast <const  IcebergPartitionIdGenerator*>(
422+           partitionIdGenerator_.get ());
423+   VELOX_CHECK_NOT_NULL (icebergPartitionIdGenerator);
424+   const  RowVectorPtr transformedValues =
425+       icebergPartitionIdGenerator->partitionValues ();
426+   for  (auto  i = 0 ; i < partitionChannels_.size (); ++i) {
427+     auto  block = transformedValues->childAt (i);
428+     if  (block->isNullAt (index)) {
429+       partitionValues[i] = nullptr ;
430+     } else  {
431+       DecodedVector decoded (*block);
432+       partitionValues[i] = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH (
433+           extractPartitionValue, block->typeKind (), &decoded, index);
434+     }
402435  }
436+   partitionData_[index] = partitionValues;
437+ }
438+ 
439+ void  IcebergDataSink::clusteredWrite (RowVectorPtr input, int32_t  writerIdx) {
440+   if  (partitionSizes_[writerIdx] != 0 ) {
441+     VELOX_CHECK_NOT_NULL (partitionRows_[writerIdx]);
442+     partitionRows_[writerIdx]->setSize (
443+         partitionSizes_[writerIdx] * sizeof (vector_size_t ));
444+   }
445+   const  vector_size_t  partitionSize = partitionSizes_[writerIdx];
446+   const  RowVectorPtr writerInput = partitionSize == input->size ()
447+       ? input
448+       : exec::wrap (partitionSize, partitionRows_[writerIdx], input);
449+   write (writerIdx, writerInput);
403450}
404451
405452HiveWriterId IcebergDataSink::getIcebergWriterId (size_t  row) const  {
@@ -470,9 +517,11 @@ void IcebergDataSink::closeInternal() {
470517
471518  if  (state_ == State::kClosed ) {
472519    for  (int  i = 0 ; i < writers_.size (); ++i) {
473-       WRITER_NON_RECLAIMABLE_SECTION_GUARD (i);
474-       writers_[i]->close ();
475-       dataFileStats_.push_back (writers_[i]->dataFileStats ());
520+       if  (writers_[i]) {
521+         WRITER_NON_RECLAIMABLE_SECTION_GUARD (i);
522+         writers_[i]->close ();
523+         dataFileStats_.push_back (writers_[i]->dataFileStats ());
524+       }
476525    }
477526  } else  {
478527    for  (int  i = 0 ; i < writers_.size (); ++i) {
@@ -482,6 +531,63 @@ void IcebergDataSink::closeInternal() {
482531  }
483532}
484533
534+ void  IcebergDataSink::closeWriter (int32_t  index) {
535+   common::testutil::TestValue::adjust (
536+       " facebook::velox::connector::hive::iceberg::IcebergDataSink::closeWriter"  ,
537+       this );
538+ 
539+   if  (writers_[index]) {
540+     WRITER_NON_RECLAIMABLE_SECTION_GUARD (index);
541+     if  (sortWrite ()) {
542+       finishWriter (index);
543+     }
544+     writers_[index]->close ();
545+     dataFileStats_.push_back (writers_[index]->dataFileStats ());
546+     writers_[index] = nullptr ;
547+   }
548+ }
549+ 
550+ bool  IcebergDataSink::finishWriter (int32_t  index) {
551+   if  (!sortWrite ()) {
552+     return  true ;
553+   }
554+ 
555+   if  (writers_[index]) {
556+     const  uint64_t  startTimeMs = getCurrentTimeMs ();
557+     if  (!writers_[index]->finish ()) {
558+       return  false ;
559+     }
560+     if  (getCurrentTimeMs () - startTimeMs > sortWriterFinishTimeSliceLimitMs_) {
561+       return  false ;
562+     }
563+   }
564+   return  true ;
565+ }
566+ 
567+ bool  IcebergDataSink::finish () {
568+   //  Flush is reentry state.
569+   setState (State::kFinishing );
570+ 
571+   //  As for now, only sorted writer needs flush buffered data. For non-sorted
572+   //  writer, data is directly written to the underlying file writer.
573+   if  (!sortWrite ()) {
574+     return  true ;
575+   }
576+ 
577+   //  TODO: we might refactor to move the data sorting logic into hive data sink.
578+   const  uint64_t  startTimeMs = getCurrentTimeMs ();
579+   for  (auto  i = 0 ; i < writers_.size (); ++i) {
580+     WRITER_NON_RECLAIMABLE_SECTION_GUARD (i);
581+     if  (writers_[i] && !writers_[i]->finish ()) {
582+       return  false ;
583+     }
584+     if  (getCurrentTimeMs () - startTimeMs > sortWriterFinishTimeSliceLimitMs_) {
585+       return  false ;
586+     }
587+   }
588+   return  true ;
589+ }
590+ 
485591std::unique_ptr<facebook::velox::dwio::common::Writer>
486592IcebergDataSink::maybeCreateBucketSortWriter (
487593    std::unique_ptr<facebook::velox::dwio::common::Writer> writer) {
0 commit comments