|
16 | 16 |
|
17 | 17 | #include "paimon/core/utils/nested_projection_utils.h" |
18 | 18 |
|
| 19 | +#include <set> |
19 | 20 | #include <string> |
20 | 21 | #include <utility> |
| 22 | +#include <vector> |
21 | 23 |
|
22 | 24 | #include "arrow/array/array_nested.h" |
| 25 | +#include "arrow/array/array_primitive.h" |
| 26 | +#include "arrow/array/builder_primitive.h" |
| 27 | +#include "arrow/array/concatenate.h" |
23 | 28 | #include "arrow/type.h" |
24 | 29 | #include "fmt/format.h" |
25 | 30 | #include "paimon/status.h" |
| 31 | +#include "rapidjson/document.h" |
26 | 32 |
|
27 | 33 | namespace paimon { |
28 | 34 |
|
29 | | -Result<std::optional<std::shared_ptr<arrow::DataType>>> PruneDataType( |
| 35 | +Result<std::optional<std::shared_ptr<arrow::DataType>>> NestedProjectionUtils::PruneDataType( |
30 | 36 | const std::shared_ptr<arrow::DataType>& read_type, |
31 | 37 | const std::shared_ptr<arrow::DataType>& data_type) { |
32 | 38 | // Identical types need no pruning. |
@@ -106,7 +112,7 @@ Result<std::optional<std::shared_ptr<arrow::DataType>>> PruneDataType( |
106 | 112 | // PruneArray — fallback for format readers that return extra nested columns |
107 | 113 | // --------------------------------------------------------------------------- |
108 | 114 |
|
109 | | -Result<std::shared_ptr<arrow::Array>> PruneArray( |
| 115 | +Result<std::shared_ptr<arrow::Array>> NestedProjectionUtils::PruneArray( |
110 | 116 | const std::shared_ptr<arrow::Array>& array, |
111 | 117 | const std::shared_ptr<arrow::DataType>& target_type) { |
112 | 118 | if (!array || array->type()->Equals(target_type)) { |
@@ -172,4 +178,141 @@ Result<std::shared_ptr<arrow::Array>> PruneArray( |
172 | 178 | } |
173 | 179 | } |
174 | 180 |
|
| 181 | +// --------------------------------------------------------------------------- |
| 182 | +// Map selected-keys support |
| 183 | +// --------------------------------------------------------------------------- |
| 184 | + |
| 185 | +std::set<std::string> NestedProjectionUtils::GetMapSelectedKeys( |
| 186 | + const std::shared_ptr<arrow::Field>& field) { |
| 187 | + std::set<std::string> result; |
| 188 | + if (!field || !field->HasMetadata() || !field->metadata()) { |
| 189 | + return result; |
| 190 | + } |
| 191 | + auto get_result = field->metadata()->Get(DataField::MAP_SELECTED_KEYS); |
| 192 | + if (!get_result.ok()) { |
| 193 | + return result; |
| 194 | + } |
| 195 | + const std::string& json_str = get_result.ValueUnsafe(); |
| 196 | + rapidjson::Document doc; |
| 197 | + doc.Parse(json_str.c_str()); |
| 198 | + if (doc.HasParseError() || !doc.IsArray()) { |
| 199 | + return result; |
| 200 | + } |
| 201 | + for (rapidjson::SizeType i = 0; i < doc.Size(); ++i) { |
| 202 | + if (doc[i].IsString()) { |
| 203 | + result.emplace(doc[i].GetString(), doc[i].GetStringLength()); |
| 204 | + } |
| 205 | + } |
| 206 | + return result; |
| 207 | +} |
| 208 | + |
| 209 | +Result<std::shared_ptr<arrow::Array>> NestedProjectionUtils::FilterMapArrayBySelectedKeys( |
| 210 | + const std::shared_ptr<arrow::Array>& array, |
| 211 | + const std::set<std::string>& selected_keys) { |
| 212 | + if (selected_keys.empty() || !array || array->length() == 0) { |
| 213 | + return array; |
| 214 | + } |
| 215 | + |
| 216 | + auto map_array = std::static_pointer_cast<arrow::MapArray>(array); |
| 217 | + auto map_type = std::static_pointer_cast<arrow::MapType>(array->type()); |
| 218 | + |
| 219 | + if (map_type->key_type()->id() != arrow::Type::STRING) { |
| 220 | + return Status::Invalid(fmt::format( |
| 221 | + "FilterMapArrayBySelectedKeys only supports string keys, got {}", |
| 222 | + map_type->key_type()->ToString())); |
| 223 | + } |
| 224 | + |
| 225 | + auto keys_array = std::static_pointer_cast<arrow::StringArray>(map_array->keys()); |
| 226 | + auto values_array = map_array->items(); |
| 227 | + int64_t total_entries = keys_array->length(); |
| 228 | + int64_t num_maps = map_array->length(); |
| 229 | + |
| 230 | + // Mark which flat entries to keep |
| 231 | + std::vector<bool> keep(total_entries, false); |
| 232 | + int64_t kept_count = 0; |
| 233 | + for (int64_t i = 0; i < total_entries; ++i) { |
| 234 | + if (!keys_array->IsNull(i)) { |
| 235 | + std::string_view key_view = keys_array->GetView(i); |
| 236 | + std::string key_str(key_view.data(), key_view.size()); |
| 237 | + if (selected_keys.count(key_str) > 0) { |
| 238 | + keep[i] = true; |
| 239 | + ++kept_count; |
| 240 | + } |
| 241 | + } |
| 242 | + } |
| 243 | + |
| 244 | + if (kept_count == total_entries) { |
| 245 | + return array; |
| 246 | + } |
| 247 | + |
| 248 | + // Collect kept slices as contiguous runs to build filtered key/value arrays |
| 249 | + // via Slice + Concatenate (avoids arrow::compute::Take dependency). |
| 250 | + arrow::ArrayVector key_slices; |
| 251 | + arrow::ArrayVector value_slices; |
| 252 | + key_slices.reserve(kept_count); |
| 253 | + value_slices.reserve(kept_count); |
| 254 | + |
| 255 | + std::vector<int32_t> new_offsets; |
| 256 | + new_offsets.reserve(num_maps + 1); |
| 257 | + int32_t running_offset = 0; |
| 258 | + |
| 259 | + for (int64_t map_idx = 0; map_idx < num_maps; ++map_idx) { |
| 260 | + new_offsets.push_back(running_offset); |
| 261 | + if (map_array->IsNull(map_idx)) { |
| 262 | + continue; |
| 263 | + } |
| 264 | + int64_t start = map_array->value_offset(map_idx); |
| 265 | + int64_t end = map_array->value_offset(map_idx + 1); |
| 266 | + // Collect contiguous runs of kept entries within this map |
| 267 | + int64_t run_start = -1; |
| 268 | + for (int64_t entry_idx = start; entry_idx <= end; ++entry_idx) { |
| 269 | + bool should_keep = (entry_idx < end) && keep[entry_idx]; |
| 270 | + if (should_keep && run_start < 0) { |
| 271 | + run_start = entry_idx; |
| 272 | + } else if (!should_keep && run_start >= 0) { |
| 273 | + int64_t run_len = entry_idx - run_start; |
| 274 | + key_slices.push_back(keys_array->Slice(run_start, run_len)); |
| 275 | + value_slices.push_back(values_array->Slice(run_start, run_len)); |
| 276 | + running_offset += static_cast<int32_t>(run_len); |
| 277 | + run_start = -1; |
| 278 | + } |
| 279 | + } |
| 280 | + } |
| 281 | + new_offsets.push_back(running_offset); |
| 282 | + |
| 283 | + // Build filtered key/value arrays |
| 284 | + std::shared_ptr<arrow::Array> filtered_keys; |
| 285 | + std::shared_ptr<arrow::Array> filtered_values; |
| 286 | + if (key_slices.empty()) { |
| 287 | + // All entries filtered out — create empty arrays |
| 288 | + filtered_keys = keys_array->Slice(0, 0); |
| 289 | + filtered_values = values_array->Slice(0, 0); |
| 290 | + } else if (key_slices.size() == 1) { |
| 291 | + filtered_keys = key_slices[0]; |
| 292 | + filtered_values = value_slices[0]; |
| 293 | + } else { |
| 294 | + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(filtered_keys, |
| 295 | + arrow::Concatenate(key_slices)); |
| 296 | + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(filtered_values, |
| 297 | + arrow::Concatenate(value_slices)); |
| 298 | + } |
| 299 | + |
| 300 | + // Build new offsets array |
| 301 | + arrow::Int32Builder offset_builder; |
| 302 | + PAIMON_RETURN_NOT_OK_FROM_ARROW(offset_builder.Reserve( |
| 303 | + static_cast<int64_t>(new_offsets.size()))); |
| 304 | + for (int32_t offset : new_offsets) { |
| 305 | + offset_builder.UnsafeAppend(offset); |
| 306 | + } |
| 307 | + std::shared_ptr<arrow::Array> new_offsets_array; |
| 308 | + PAIMON_RETURN_NOT_OK_FROM_ARROW(offset_builder.Finish(&new_offsets_array)); |
| 309 | + |
| 310 | + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( |
| 311 | + std::shared_ptr<arrow::Array> result_map, |
| 312 | + arrow::MapArray::FromArrays(new_offsets_array, filtered_keys, filtered_values, |
| 313 | + arrow::default_memory_pool(), |
| 314 | + map_array->null_bitmap())); |
| 315 | + return result_map; |
| 316 | +} |
| 317 | + |
175 | 318 | } // namespace paimon |
0 commit comments