-
Notifications
You must be signed in to change notification settings - Fork 50
/
Copy pathrun_sync_to_destination_wo_cache.py
76 lines (60 loc) · 1.83 KB
/
run_sync_to_destination_wo_cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""Test a sync to an Airbyte destination.
Usage:
```
poetry run python examples/run_sync_to_destination_wo_cache.py
```
"""
from __future__ import annotations
import datetime
import airbyte as ab
SCALE = 200_000
def get_my_source() -> ab.Source:
github_pat = ab.get_secret("GITHUB_PERSONAL_ACCESS_TOKEN")
assert str(github_pat)
source = ab.get_source(
"source-github",
config={
"repositories": ["airbytehq/PyAirbyte"],
"credentials": {
"personal_access_token": github_pat,
},
},
# streams=["issues"],
)
source.check()
source.select_streams(["issues"])
return source
def get_cache() -> ab.DuckDBCache:
return ab.new_local_cache(
cache_name="state_cache",
)
def get_my_destination() -> ab.Destination:
return ab.get_destination(
name="destination-duckdb",
config={
# This path is relative to the container:
"destination_path": "/local/temp/db.duckdb",
},
docker_image="airbyte/destination-duckdb:latest",
# OR:
# pip_url="git+https://github.com/airbytehq/airbyte.git#subdirectory=airbyte-integrations/connectors/destination-duckdb",
)
def main() -> None:
"""Test writing from the source to the destination."""
source = get_my_source()
source.check()
destination = get_my_destination()
destination.check()
state_cache = get_cache()
write_result: ab.WriteResult = destination.write(
source,
cache=False,
state_cache=state_cache,
)
print(
f"Completed writing {write_result.processed_records:,} records "
f"to destination at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}."
)
if __name__ == "__main__":
main()