3030#include  " velox/exec/OperatorUtils.h" 
3131#include  " velox/exec/SortBuffer.h" 
3232
33- #include  < boost/lexical_cast.hpp> 
34- #include  < boost/uuid/uuid_generators.hpp> 
35- #include  < boost/uuid/uuid_io.hpp> 
36- 
3733using  facebook::velox::common::testutil::TestValue;
3834
3935namespace  facebook ::velox::connector::hive {
@@ -95,14 +91,12 @@ std::vector<column_index_t> getPartitionChannels(
9591
9692//  Returns the column indices of non-partition data columns.
9793std::vector<column_index_t > getNonPartitionChannels (
98-     const  std::vector<column_index_t >& partitionChannels,
99-     const  column_index_t  childrenSize) {
94+     const  std::shared_ptr<const  HiveInsertTableHandle>& insertTableHandle) {
10095  std::vector<column_index_t > dataChannels;
101-   dataChannels.reserve (childrenSize - partitionChannels.size ());
10296
103-   for  (column_index_t  i = 0 ; i < childrenSize; i++) { 
104-     if  ( std::find (partitionChannels. cbegin (), partitionChannels. cend (), i) == 
105-         partitionChannels. cend ()) {
97+   for  (column_index_t  i = 0 ; i < insertTableHandle-> inputColumns (). size (); 
98+        i++) { 
99+     if  (!insertTableHandle-> inputColumns ()[i]-> isPartitionKey ()) {
106100      dataChannels.push_back (i);
107101    }
108102  }
@@ -119,10 +113,6 @@ std::string makePartitionDirectory(
119113  return  tableDirectory;
120114}
121115
122- std::string makeUuid () {
123-   return  boost::lexical_cast<std::string>(boost::uuids::random_generator ()());
124- }
125- 
126116std::unordered_map<LocationHandle::TableType, std::string> tableTypeNames () {
127117  return  {
128118      {LocationHandle::TableType::kNew , " kNew"  },
@@ -383,7 +373,8 @@ HiveDataSink::HiveDataSink(
383373              ? createBucketFunction(
384374                    *insertTableHandle->bucketProperty (),
385375                    inputType)
386-               : nullptr) {}
376+               : nullptr,
377+           getNonPartitionChannels(insertTableHandle)) {}
387378
388379HiveDataSink::HiveDataSink (
389380    RowTypePtr inputType,
@@ -392,7 +383,8 @@ HiveDataSink::HiveDataSink(
392383    CommitStrategy commitStrategy,
393384    const  std::shared_ptr<const  HiveConfig>& hiveConfig,
394385    uint32_t  bucketCount,
395-     std::unique_ptr<core::PartitionFunction> bucketFunction)
386+     std::unique_ptr<core::PartitionFunction> bucketFunction,
387+     const  std::vector<column_index_t >& dataChannels)
396388    : inputType_(std::move(inputType)),
397389      insertTableHandle_(std::move(insertTableHandle)),
398390      connectorQueryCtx_(connectorQueryCtx),
@@ -412,8 +404,7 @@ HiveDataSink::HiveDataSink(
412404                    hiveConfig_->isPartitionPathAsLowerCase(
413405                        connectorQueryCtx->sessionProperties ()))
414406              : nullptr),
415-       dataChannels_(
416-           getNonPartitionChannels (partitionChannels_, inputType_->size ())),
407+       dataChannels_(dataChannels),
417408      bucketCount_(static_cast <int32_t >(bucketCount)),
418409      bucketFunction_(std::move(bucketFunction)),
419410      writerFactory_(
@@ -489,6 +480,8 @@ void HiveDataSink::appendData(RowVectorPtr input) {
489480    input->childAt (i)->loadedVector ();
490481  }
491482
483+   splitInputRowsAndEnsureWriters (input);
484+ 
492485  //  All inputs belong to a single non-bucketed partition. The partition id
493486  //  must be zero.
494487  if  (!isBucketed () && partitionIdGenerator_->numPartitions () == 1 ) {
@@ -497,8 +490,6 @@ void HiveDataSink::appendData(RowVectorPtr input) {
497490    return ;
498491  }
499492
500-   splitInputRowsAndEnsureWriters ();
501- 
502493  for  (auto  index = 0 ; index < writers_.size (); ++index) {
503494    const  vector_size_t  partitionSize = partitionSizes_[index];
504495    if  (partitionSize == 0 ) {
@@ -670,30 +661,33 @@ bool HiveDataSink::finish() {
670661std::vector<std::string> HiveDataSink::close () {
671662  setState (State::kClosed );
672663  closeInternal ();
664+   return  commitMessage ();
665+ }
673666
667+ std::vector<std::string> HiveDataSink::commitMessage () const  {
674668  std::vector<std::string> partitionUpdates;
675669  partitionUpdates.reserve (writerInfo_.size ());
676670  for  (int  i = 0 ; i < writerInfo_.size (); ++i) {
677671    const  auto & info = writerInfo_.at (i);
678672    VELOX_CHECK_NOT_NULL (info);
679673    //  clang-format off
680-        auto  partitionUpdateJson = folly::toJson (
681-         folly::dynamic::object
682-            (" name"  , info->writerParameters .partitionName ().value_or (" "  ))
683-            (" updateMode"  ,
684-              HiveWriterParameters::updateModeToString (
685-                info->writerParameters .updateMode ()))
686-            (" writePath"  , info->writerParameters .writeDirectory ())
687-            (" targetPath"  , info->writerParameters .targetDirectory ())
688-            (" fileWriteInfos"  , folly::dynamic::array (
689-              folly::dynamic::object
690-                (" writeFileName"  , info->writerParameters .writeFileName ())
691-                (" targetFileName"  , info->writerParameters .targetFileName ())
692-                (" fileSize"  , ioStats_.at (i)->rawBytesWritten ())))
693-            (" rowCount"  , info->numWrittenRows )
694-            (" inMemoryDataSizeInBytes"  , info->inputSizeInBytes )
695-            (" onDiskDataSizeInBytes"  , ioStats_.at (i)->rawBytesWritten ())
696-            (" containsNumberedFileNames"  , true ));
674+     auto  partitionUpdateJson = folly::toJson (
675+      folly::dynamic::object
676+         (" name"  , info->writerParameters .partitionName ().value_or (" "  ))
677+         (" updateMode"  ,
678+           HiveWriterParameters::updateModeToString (
679+             info->writerParameters .updateMode ()))
680+         (" writePath"  , info->writerParameters .writeDirectory ())
681+         (" targetPath"  , info->writerParameters .targetDirectory ())
682+         (" fileWriteInfos"  , folly::dynamic::array (
683+           folly::dynamic::object
684+             (" writeFileName"  , info->writerParameters .writeFileName ())
685+             (" targetFileName"  , info->writerParameters .targetFileName ())
686+             (" fileSize"  , ioStats_.at (i)->rawBytesWritten ())))
687+         (" rowCount"  , info->numWrittenRows )
688+         (" inMemoryDataSizeInBytes"  , info->inputSizeInBytes )
689+         (" onDiskDataSizeInBytes"  , ioStats_.at (i)->rawBytesWritten ())
690+         (" containsNumberedFileNames"  , true ));
697691    //  clang-format on
698692    partitionUpdates.push_back (partitionUpdateJson);
699693  }
@@ -740,11 +734,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
740734  VELOX_CHECK_EQ (writers_.size (), writerInfo_.size ());
741735  VELOX_CHECK_EQ (writerIndexMap_.size (), writerInfo_.size ());
742736
743-   std::optional<std::string> partitionName;
744-   if  (isPartitioned ()) {
745-     partitionName =
746-         partitionIdGenerator_->partitionName (id.partitionId .value ());
747-   }
737+   std::optional<std::string> partitionName = getPartitionName (id);
748738
749739  //  Without explicitly setting flush policy, the default memory based flush
750740  //  policy is used.
@@ -831,15 +821,23 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
831821      options);
832822  writer = maybeCreateBucketSortWriter (std::move (writer));
833823  writers_.emplace_back (std::move (writer));
834-   //  Extends the buffer used for partition rows calculations.
835-   partitionSizes_.emplace_back (0 );
836-   partitionRows_.emplace_back (nullptr );
837-   rawPartitionRows_.emplace_back (nullptr );
824+ 
825+   extendBuffersForPartitionedTables ();
838826
839827  writerIndexMap_.emplace (id, writers_.size () - 1 );
840828  return  writerIndexMap_[id];
841829}
842830
831+ std::optional<std::string> HiveDataSink::getPartitionName (
832+     const  HiveWriterId& id) const  {
833+   std::optional<std::string> partitionName;
834+   if  (isPartitioned ()) {
835+     partitionName =
836+         partitionIdGenerator_->partitionName (id.partitionId .value ());
837+   }
838+   return  partitionName;
839+ }
840+ 
843841std::unique_ptr<facebook::velox::dwio::common::Writer>
844842HiveDataSink::maybeCreateBucketSortWriter (
845843    std::unique_ptr<facebook::velox::dwio::common::Writer> writer) {
@@ -867,6 +865,13 @@ HiveDataSink::maybeCreateBucketSortWriter(
867865      sortWriterFinishTimeSliceLimitMs_);
868866}
869867
868+ void  HiveDataSink::extendBuffersForPartitionedTables () {
869+   //  Extends the buffer used for partition rows calculations.
870+   partitionSizes_.emplace_back (0 );
871+   partitionRows_.emplace_back (nullptr );
872+   rawPartitionRows_.emplace_back (nullptr );
873+ }
874+ 
870875HiveWriterId HiveDataSink::getWriterId (size_t  row) const  {
871876  std::optional<int32_t > partitionId;
872877  if  (isPartitioned ()) {
@@ -881,7 +886,25 @@ HiveWriterId HiveDataSink::getWriterId(size_t row) const {
881886  return  HiveWriterId{partitionId, bucketId};
882887}
883888
884- void  HiveDataSink::splitInputRowsAndEnsureWriters () {
889+ void  HiveDataSink::updatePartitionRows (
890+     uint32_t  index,
891+     vector_size_t  numRows,
892+     vector_size_t  row) {
893+   VELOX_DCHECK_LT (index, partitionSizes_.size ());
894+   VELOX_DCHECK_EQ (partitionSizes_.size (), partitionRows_.size ());
895+   VELOX_DCHECK_EQ (partitionRows_.size (), rawPartitionRows_.size ());
896+   if  (FOLLY_UNLIKELY (partitionRows_[index] == nullptr ) ||
897+       (partitionRows_[index]->capacity () < numRows * sizeof (vector_size_t ))) {
898+     partitionRows_[index] =
899+         allocateIndices (numRows, connectorQueryCtx_->memoryPool ());
900+     rawPartitionRows_[index] =
901+         partitionRows_[index]->asMutable <vector_size_t >();
902+   }
903+   rawPartitionRows_[index][partitionSizes_[index]] = row;
904+   ++partitionSizes_[index];
905+ }
906+ 
907+ void  HiveDataSink::splitInputRowsAndEnsureWriters (RowVectorPtr /*  input */  ) {
885908  VELOX_CHECK (isPartitioned () || isBucketed ());
886909  if  (isBucketed () && isPartitioned ()) {
887910    VELOX_CHECK_EQ (bucketIds_.size (), partitionIds_.size ());
@@ -895,18 +918,7 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
895918    const  auto  id = getWriterId (row);
896919    const  uint32_t  index = ensureWriter (id);
897920
898-     VELOX_DCHECK_LT (index, partitionSizes_.size ());
899-     VELOX_DCHECK_EQ (partitionSizes_.size (), partitionRows_.size ());
900-     VELOX_DCHECK_EQ (partitionRows_.size (), rawPartitionRows_.size ());
901-     if  (FOLLY_UNLIKELY (partitionRows_[index] == nullptr ) ||
902-         (partitionRows_[index]->capacity () < numRows * sizeof (vector_size_t ))) {
903-       partitionRows_[index] =
904-           allocateIndices (numRows, connectorQueryCtx_->memoryPool ());
905-       rawPartitionRows_[index] =
906-           partitionRows_[index]->asMutable <vector_size_t >();
907-     }
908-     rawPartitionRows_[index][partitionSizes_[index]] = row;
909-     ++partitionSizes_[index];
921+     updatePartitionRows (index, numRows, row);
910922  }
911923
912924  for  (uint32_t  i = 0 ; i < partitionSizes_.size (); ++i) {
@@ -917,6 +929,15 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
917929  }
918930}
919931
932+ std::string HiveDataSink::makePartitionDirectory (
933+     const  std::string& tableDirectory,
934+     const  std::optional<std::string>& partitionSubdirectory) const  {
935+   if  (partitionSubdirectory.has_value ()) {
936+     return  fs::path (tableDirectory) / partitionSubdirectory.value ();
937+   }
938+   return  tableDirectory;
939+ }
940+ 
920941HiveWriterParameters HiveDataSink::getWriterParameters (
921942    const  std::optional<std::string>& partition,
922943    std::optional<uint32_t > bucketId) const  {
0 commit comments