Skip to content

Commit 0080e49

Browse files
fix(io): ensure that topic is created in PubSub writer
* Update documentation for PubSub & Kafka writers * Add array handling, date handling, and prefix/suffix usage
1 parent bd45b72 commit 0080e49

6 files changed

Lines changed: 223 additions & 11 deletions

File tree

docs/usage/writers.md

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,103 @@ concrete_writer.write(sample_report, 'query')
9292

9393
## Configuration
9494

95+
### arrays
96+
9597
Each of writer also support two options for dealing with arrays:
9698

97-
* `WRITER.array-handling` - arrays handling method: "strings" (default) - store arrays as strings (items combined via a separator, e.g. "item1|item2"), "arrays" - store arrays as arrays.
98-
* `WRITER.array-separator` - a separator symbol for joining arrays as strings, by default '|'.
99+
* `WRITER.array_handling` - arrays handling method:
100+
* `strings` (default) - store arrays as strings (items combined via a separator, e.g. "item1|item2")
101+
* `arrays` - store arrays as arrays.
102+
* `WRITER.array_separator` - a separator symbol for joining arrays as strings, by default '|'.
103+
104+
/// tab | cli
105+
```bash
106+
garf query.sql --source API_SOURCE \
107+
--output json \
108+
--json.array-handling=arrays
109+
110+
garf query.sql --source API_SOURCE \
111+
--output json \
112+
--json.array-handling=strings --json.array-separator='*'
113+
```
114+
///
115+
116+
/// tab | python
117+
```python
118+
from garf.io.writers import json_writer
119+
120+
121+
array_writer = json_writer.JsonWriter(array_handling='arrays')
122+
string_writer = json_writer.JsonWriter(
123+
array_handling='strings', array_separator='*'
124+
)
125+
126+
```
127+
128+
### dates
129+
130+
By default `garf` writes all date objects as strings. You can overwrite this with two options:
131+
132+
* `WRITER.date_handling` - specifies ways of handling date object:
133+
* `strings` (default) - keeps date objects as strings .
134+
* `date` - formats date objects to proper dates.
135+
* `datetimes` - formats date objects to proper datetimes.
136+
* `timestamps` - formats date objects to proper timestamps.
137+
* `WRITER.date_format_string` - specifies [format string](https://docs.python.org/3/library/datetime.html#format-codes).
138+
139+
/// tab | cli
140+
```bash
141+
garf query.sql --source API_SOURCE \
142+
--output bq \
143+
--bq.date-handling=dates
144+
145+
garf query.sql --source API_SOURCE \
146+
--output bq \
147+
--bq.date-handling=dates --json.date-format-string='%d/%m/%y'
148+
```
149+
///
150+
151+
/// tab | python
152+
```python
153+
from garf.io.writers import bigquery_writer
154+
155+
156+
dates_writer = bigquery_writer.BigQueryWriter(date_handling='dates')
157+
dates_writer_with_format = bigquery_writer.BigQueryWriter(
158+
date_handling='dates',
159+
date_format_string='%d/%m/%y',
160+
)
161+
```
162+
163+
### prefix / suffix
164+
165+
When writing data with `garf` you can use `prefix` and `suffix` to dynamically
166+
update where (table / file / topic / index) data are written:
167+
168+
/// tab | cli
169+
```bash
170+
# Saves results to `my_prefix_query.csv'
171+
garf query.sql --source API_SOURCE \
172+
--output csv \
173+
--csv.prefix=my_prefix
174+
175+
# Saves results to `query_my_suffix.csv'
176+
garf query.sql --source API_SOURCE \
177+
--output csv \
178+
--csv.suffix=my_suffix
179+
180+
# Saves results to `my_prefix_query_my_suffix.csv'
181+
garf query.sql --source API_SOURCE \
182+
--output csv \
183+
--csv.prefix=my_prefix \
184+
--csv.suffix=my_suffix
185+
```
186+
///
187+
188+
/// tab | python
189+
```python
190+
from garf.io.writers import csv_writer
191+
192+
# Saves results to `my_prefix_query_my_suffix.csv'
193+
writer = csv_writer.CsvWriter(prefix='my_prefix', suffix='my_suffix')
194+
```

docs/usage/writers/kafka-writer.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,58 @@ writer = kafka_writer.KafkaWriter(bootstrap_servers="broker1:9092,broker2:9092")
5555
writer.write(sample_report, 'topic_name')
5656
```
5757
///
58+
59+
### push_strategy
60+
61+
By default Kafka writer pushes the whole report in a body.
62+
You can overwrite it with `push_strategy` parameter which supports three options:
63+
64+
* `report` - pushes the whole report as a message.
65+
* `batch` - pushes N rows of report into a message.
66+
* `row` - pushes each row of report into a separate message.
67+
68+
/// tab | cli
69+
```bash hl_lines="3"
70+
garf query.sql --source API_SOURCE \
71+
--output kafka \
72+
--kafka.push-strategy=row
73+
```
74+
///
75+
76+
/// tab | python
77+
```python hl_lines="7"
78+
from garf.core import report
79+
from garf.io.writers import kafka_writer
80+
81+
sample_report = report.GarfReport(results=[[1]], column_names=['one'])
82+
83+
writer = kafka_writer.KafkaWriter(push_strategy='row')
84+
writer.write(sample_report, 'topic_name')
85+
```
86+
///
87+
88+
### batch_size
89+
90+
For `batch` `push_strategy` the default number of messages in a batch is `10`.
91+
You can overwrite it with `batch_size` parameter.
92+
93+
/// tab | cli
94+
```bash hl_lines="3"
95+
garf query.sql --source API_SOURCE \
96+
--output kafka \
97+
--kafka.push-strategy=batch \
98+
--kafka.batch-size=5
99+
```
100+
///
101+
102+
/// tab | python
103+
```python hl_lines="7"
104+
from garf.core import report
105+
from garf.io.writers import kafka_writer
106+
107+
sample_report = report.GarfReport(results=[[1]], column_names=['one'])
108+
109+
writer = kafka_writer.KafkaWriter(push_strategy='batch', batch_size=5)
110+
writer.write(sample_report, 'topic_name')
111+
```
112+
///

docs/usage/writers/pubsub-writer.md

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ writer.write(sample_report, 'topic_name')
3030
///
3131

3232
## Parameters
33-
### Project
33+
### project
3434

3535
By default it uses `GOOGLE_CLOUD_PROJECT` environment variable.
3636
You can overwrite it with `project` parameter.
@@ -48,10 +48,64 @@ garf query.sql --source API_SOURCE \
4848
from garf.core import report
4949
from garf.io.writers import pubsub_writer
5050

51-
# Create example report
5251
sample_report = report.GarfReport(results=[[1]], column_names=['one'])
5352

5453
writer = pubsub_writer.PubSubWriter(project="ANOTHER_PROJECT_ID")
5554
writer.write(sample_report, 'topic_name')
5655
```
5756
///
57+
58+
### push_strategy
59+
60+
By default PubSub writer pushes the whole report in a body.
61+
You can overwrite it with `push_strategy` parameter which supports three options:
62+
63+
* `report` - pushes the whole report as a message.
64+
* `batch` - pushes N rows of report into a message.
65+
* `row` - pushes each row of report into a separate message.
66+
67+
/// tab | cli
68+
```bash hl_lines="3"
69+
garf query.sql --source API_SOURCE \
70+
--output pubsub \
71+
--pubsub.push-strategy=row
72+
```
73+
///
74+
75+
/// tab | python
76+
```python hl_lines="7"
77+
from garf.core import report
78+
from garf.io.writers import pubsub_writer
79+
80+
sample_report = report.GarfReport(results=[[1]], column_names=['one'])
81+
82+
writer = pubsub_writer.PubSubWriter(push_strategy='row')
83+
writer.write(sample_report, 'topic_name')
84+
```
85+
///
86+
87+
### batch_size
88+
89+
For `batch` `push_strategy` the default number of messages in a batch is `10`.
90+
You can overwrite it with `batch_size` parameter.
91+
92+
/// tab | cli
93+
```bash hl_lines="3"
94+
garf query.sql --source API_SOURCE \
95+
--output pubsub \
96+
--pubsub.push-strategy=batch \
97+
--pubsub.batch-size=5
98+
```
99+
///
100+
101+
/// tab | python
102+
```python hl_lines="7"
103+
from garf.core import report
104+
from garf.io.writers import pubsub_writer
105+
106+
sample_report = report.GarfReport(results=[[1]], column_names=['one'])
107+
108+
writer = pubsub_writer.PubSubWriter(push_strategy='batch', batch_size=5)
109+
writer.write(sample_report, 'topic_name')
110+
```
111+
///

libs/io/garf/io/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@
1313
# limitations under the License.
1414
"""Write GarfReport to anywhere."""
1515

16-
__version__ = '1.3.0'
16+
__version__ = '1.3.1'

libs/io/garf/io/writers/pubsub_writer.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
'Please install garf-io with PubSub support - `pip install garf-io[pubsub]`'
2626
) from e
2727

28+
from google.api_core.exceptions import NotFound
29+
2830
logger = logging.getLogger(__name__)
2931

3032

@@ -56,8 +58,13 @@ def _init_producer(self):
5658

5759
def create_topic(self, topic: str) -> str:
5860
topic_path = self.publisher.topic_path(self.project, topic)
59-
if not self.publisher.get_topic(request={'topic': topic_path}):
60-
self.publisher.create_topic(request={'name': topic_path})
61+
request = {'name': topic_path}
62+
try:
63+
if not self.publisher.get_topic(request={'topic': topic_path}):
64+
self.publisher.create_topic(request=request)
65+
except NotFound:
66+
self.publisher.create_topic(request=request)
67+
6168
return topic_path
6269

6370
def _send(self, data: bytes, topic: str) -> None:

libs/io/garf/io/writers/topic_writer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def __init__(
4646
batch_size: int = 10,
4747
**kwargs: str,
4848
) -> None:
49-
"""Initializes KafkaWriter."""
49+
"""Initializes TopicWriter."""
5050
super().__init__(**kwargs)
5151
self.provider = provider
5252
self.push_strategy = push_strategy
@@ -62,14 +62,14 @@ def _init_producer(self):
6262
raise NotImplementedError
6363

6464
def write(self, report: garf_report.GarfReport, destination: str) -> str:
65-
"""Writes report to Kafka topic.
65+
"""Writes report to topic.
6666
6767
Args:
6868
report: GarfReport to write.
69-
destination: Kafka topic name.
69+
destination: Topic name.
7070
"""
7171
self._init_producer()
72-
with tracer.start_as_current_span('f{self.provider}.write') as span:
72+
with tracer.start_as_current_span(f'{self.provider}.write') as span:
7373
span.set_attribute('writer.type', str(self.push_strategy))
7474
destination = formatter.format_extension(
7575
destination,

0 commit comments

Comments
 (0)