|
| 1 | +################################################################################ |
| 2 | +# Licensed to the Apache Software Foundation (ASF) under one |
| 3 | +# or more contributor license agreements. See the NOTICE file |
| 4 | +# distributed with this work for additional information |
| 5 | +# regarding copyright ownership. The ASF licenses this file |
| 6 | +# to you under the Apache License, Version 2.0 (the |
| 7 | +# "License"); you may not use this file except in compliance |
| 8 | +# with the License. You may obtain a copy of the License at |
| 9 | +# |
| 10 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | +# |
| 12 | +# Unless required by applicable law or agreed to in writing, software |
| 13 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | +# See the License for the specific language governing permissions and |
| 16 | +# limitations under the License. |
| 17 | +################################################################################ |
| 18 | +""" |
| 19 | +StreamReadBuilder for building streaming table scans and reads. |
| 20 | +
|
| 21 | +This module provides a builder for configuring streaming reads from Paimon |
| 22 | +tables, similar to ReadBuilder but for continuous streaming use cases. |
| 23 | +""" |
| 24 | + |
| 25 | +from typing import Callable, List, Optional, Set |
| 26 | + |
| 27 | +from pypaimon.common.predicate import Predicate |
| 28 | +from pypaimon.common.predicate_builder import PredicateBuilder |
| 29 | +from pypaimon.read.streaming_table_scan import AsyncStreamingTableScan |
| 30 | +from pypaimon.read.table_read import TableRead |
| 31 | +from pypaimon.schema.data_types import DataField |
| 32 | +from pypaimon.table.special_fields import SpecialFields |
| 33 | + |
| 34 | + |
| 35 | +class StreamReadBuilder: |
| 36 | + """ |
| 37 | + Builder for streaming reads from Paimon tables. |
| 38 | +
|
| 39 | + Usage: |
| 40 | + stream_builder = table.new_stream_read_builder() |
| 41 | + stream_builder.with_poll_interval_ms(500) |
| 42 | +
|
| 43 | + scan = stream_builder.new_streaming_scan() |
| 44 | + table_read = stream_builder.new_read() |
| 45 | +
|
| 46 | + async for plan in scan.stream(): |
| 47 | + arrow_table = table_read.to_arrow(plan.splits()) |
| 48 | + process(arrow_table) |
| 49 | + """ |
| 50 | + |
| 51 | + def __init__(self, table): |
| 52 | + """Initialize the StreamReadBuilder.""" |
| 53 | + from pypaimon.table.file_store_table import FileStoreTable |
| 54 | + |
| 55 | + self.table: FileStoreTable = table |
| 56 | + self._predicate: Optional[Predicate] = None |
| 57 | + self._projection: Optional[List[str]] = None |
| 58 | + self._poll_interval_ms: int = 1000 |
| 59 | + self._include_row_kind: bool = False |
| 60 | + self._bucket_filter: Optional[Callable[[int], bool]] = None |
| 61 | + |
| 62 | + def with_filter(self, predicate: Predicate) -> 'StreamReadBuilder': |
| 63 | + """Set a filter predicate for the streaming read.""" |
| 64 | + self._predicate = predicate |
| 65 | + return self |
| 66 | + |
| 67 | + def with_projection(self, projection: List[str]) -> 'StreamReadBuilder': |
| 68 | + """Set column projection for the streaming read.""" |
| 69 | + self._projection = projection |
| 70 | + return self |
| 71 | + |
| 72 | + def with_poll_interval_ms(self, poll_interval_ms: int) -> 'StreamReadBuilder': |
| 73 | + """Set the poll interval in ms for checking new snapshots (default: 1000).""" |
| 74 | + self._poll_interval_ms = poll_interval_ms |
| 75 | + return self |
| 76 | + |
| 77 | + def with_include_row_kind(self, include: bool = True) -> 'StreamReadBuilder': |
| 78 | + """Include row kind column (_row_kind) in the output. |
| 79 | +
|
| 80 | + When enabled, the output will include a _row_kind column as the first |
| 81 | + column with values: +I (insert), -U (update before), +U (update after), |
| 82 | + -D (delete). |
| 83 | + """ |
| 84 | + self._include_row_kind = include |
| 85 | + return self |
| 86 | + |
| 87 | + def with_bucket_filter( |
| 88 | + self, |
| 89 | + bucket_filter: Callable[[int], bool] |
| 90 | + ) -> 'StreamReadBuilder': |
| 91 | + """Push bucket filter for parallel consumption. |
| 92 | +
|
| 93 | + Example: |
| 94 | + builder.with_bucket_filter(lambda b: b % 2 == 0) |
| 95 | + builder.with_bucket_filter(lambda b: b < 4) |
| 96 | + """ |
| 97 | + self._bucket_filter = bucket_filter |
| 98 | + return self |
| 99 | + |
| 100 | + def with_buckets(self, bucket_ids: List[int]) -> 'StreamReadBuilder': |
| 101 | + """Convenience method to read only specific buckets. |
| 102 | +
|
| 103 | + Example: |
| 104 | + builder.with_buckets([0, 1, 2]) |
| 105 | + builder.with_buckets([3, 4, 5]) |
| 106 | + """ |
| 107 | + bucket_set: Set[int] = set(bucket_ids) |
| 108 | + return self.with_bucket_filter(lambda bucket: bucket in bucket_set) |
| 109 | + |
| 110 | + def new_streaming_scan(self) -> AsyncStreamingTableScan: |
| 111 | + """Create a new AsyncStreamingTableScan with this builder's settings.""" |
| 112 | + return AsyncStreamingTableScan( |
| 113 | + table=self.table, |
| 114 | + predicate=self._predicate, |
| 115 | + poll_interval_ms=self._poll_interval_ms, |
| 116 | + bucket_filter=self._bucket_filter, |
| 117 | + ) |
| 118 | + |
| 119 | + def new_read(self) -> TableRead: |
| 120 | + """Create a new TableRead with this builder's settings.""" |
| 121 | + return TableRead( |
| 122 | + table=self.table, |
| 123 | + predicate=self._predicate, |
| 124 | + read_type=self.read_type(), |
| 125 | + include_row_kind=self._include_row_kind |
| 126 | + ) |
| 127 | + |
| 128 | + def new_predicate_builder(self) -> PredicateBuilder: |
| 129 | + """Create a PredicateBuilder for building filter predicates.""" |
| 130 | + return PredicateBuilder(self.read_type()) |
| 131 | + |
| 132 | + def read_type(self) -> List[DataField]: |
| 133 | + """Get the read schema fields, applying projection if set.""" |
| 134 | + table_fields = self.table.fields |
| 135 | + |
| 136 | + if not self._projection: |
| 137 | + return table_fields |
| 138 | + else: |
| 139 | + if self.table.options.row_tracking_enabled(): |
| 140 | + table_fields = SpecialFields.row_type_with_row_tracking(table_fields) |
| 141 | + field_map = {field.name: field for field in table_fields} |
| 142 | + return [field_map[name] for name in self._projection if name in field_map] |
0 commit comments