Skip to content

Commit 060eece

Browse files
committed
reformatting and integration of feedback from PR review
1 parent dc0e1dc commit 060eece

File tree

5 files changed

+65
-53
lines changed

5 files changed

+65
-53
lines changed

docs/supported-sources/frankfurter.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ ingestr ingest \
106106
- **Primary Key**: Composite key of `date` and `currency_name`.
107107
- **Notes**:
108108
- An optional start and end date can be added via the arguments `--interval-start` and optionally `--interval-end` to define the date range (see examples below). If no start date is specified, the date will default today's date (and thus return the latest exchange rates).
109-
- If a start date but no end date is specified, then the end date will default to the start date and ingestr will retrieve data for the specified start date only.
109+
- If a start date but no end date is specified, then the end date will default to today's date and ingestr will retrieve data up until the latest published data.
110110
- Note that the [Frankfurter API](https://www.frankfurter.dev/) only publishes updates Monday-Friday. If the given date is on the weekend, the date will default to the previous Friday. The source is however implemented in ingestr in such a way as to avoid duplicating rows of data in this case (see [Incremental Loading - Replace](https://bruin-data.github.io/ingestr/getting-started/incremental-loading.html)).
111111

112112
#### **Example Table: Handling Weekend Dates**

ingestr/main_test.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3030,23 +3030,26 @@ def exchange_rate_on_specific_date(dest_uri):
30303030
schema = f"testschema_frankfurter_{get_random_string(5)}"
30313031
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
30323032
start_date = "2025-01-03"
3033+
end_date = "2025-01-03"
30333034
result = invoke_ingest_command(
30343035
"frankfurter://",
30353036
"exchange_rates",
30363037
dest_uri,
30373038
dest_table,
30383039
interval_start=start_date,
3040+
interval_end=end_date,
30393041
)
30403042
assert result.exit_code == 0, f"Ingestion failed: {result.output}"
30413043

30423044
dest_engine = sqlalchemy.create_engine(dest_uri)
3043-
query = f"SELECT rate FROM {dest_table} WHERE currency_name = 'GBP'"
3045+
3046+
query = f"SELECT rate FROM {dest_table} WHERE currency_code = 'GBP'"
30443047
with dest_engine.connect() as conn:
30453048
rows = conn.execute(query).fetchall()
30463049

30473050
# Assert that the rate for GBP is 0.82993
30483051
assert len(rows) > 0, "No data found for GBP"
3049-
assert rows[0][0] == 0.82993, f"Expected rate 0.82993, but got {rows[0][0]}"
3052+
assert abs(rows[0][0] - 0.82993) <= 1e-6, f"Expected rate 0.82993, but got {rows[0][0]}"
30503053

30513054
return [
30523055
invalid_source_table,

ingestr/src/frankfurter/__init__.py

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any, Iterator, Optional
1+
from typing import Any, Iterator
22

33
import dlt
44
from dlt.common.pendulum import pendulum
@@ -13,25 +13,26 @@
1313
max_table_nesting=0,
1414
)
1515
def frankfurter_source(
16-
table: str,
17-
start_date: Optional[TAnyDateTime] = None,
18-
end_date: Optional[TAnyDateTime] = None,
16+
start_date: TAnyDateTime,
17+
end_date: TAnyDateTime,
1918
) -> Any:
2019
"""
2120
A dlt source for the frankfurter.dev API. It groups several resources (in this case frankfurter.dev API endpoints) containing
2221
various types of data: currencies, latest rates, historical rates.
23-
24-
Returns the appropriate resource based on the provided parameters.
2522
"""
26-
# Determine which resource to return based on the `table` parameter
27-
if table == "currencies":
28-
return currencies()
29-
30-
elif table == "latest":
31-
return latest()
23+
dateTime = dlt.sources.incremental(
24+
"date",
25+
initial_value=start_date,
26+
end_value=end_date,
27+
range_start="closed",
28+
range_end="closed",
29+
)
3230

33-
elif table == "exchange_rates":
34-
return exchange_rates(start_date=start_date, end_date=end_date)
31+
return (
32+
currencies(),
33+
latest(),
34+
exchange_rates(start_date=dateTime, end_date=end_date),
35+
)
3536

3637

3738
@dlt.resource(
@@ -53,7 +54,7 @@ def currencies() -> Iterator[dict]:
5354

5455

5556
@dlt.resource(
56-
write_disposition="replace",
57+
write_disposition="merge",
5758
columns={
5859
"date": {"data_type": "text"},
5960
"currency_code": {"data_type": "text"},
@@ -69,13 +70,12 @@ def latest() -> Iterator[dict]:
6970
url = "latest?"
7071

7172
# Fetch data
72-
latest_data = get_path_with_retry(url)
73+
data = get_path_with_retry(url)
7374

7475
# Extract rates and base currency
75-
rates = latest_data["rates"]
76+
rates = data["rates"]
7677

77-
# Prepare the date
78-
date = pendulum.now("Europe/Berlin").to_date_string()
78+
date = pendulum.parse(data["date"])
7979

8080
# Add the base currency (EUR) with a rate of 1.0
8181
yield {
@@ -94,25 +94,34 @@ def latest() -> Iterator[dict]:
9494

9595

9696
@dlt.resource(
97-
write_disposition="replace",
97+
write_disposition="merge",
9898
columns={
9999
"date": {"data_type": "text"},
100100
"currency_code": {"data_type": "text"},
101101
"rate": {"data_type": "double"},
102102
},
103-
primary_key=["date", "currency_code"], # Composite primary key
103+
primary_key=("date", "currency_code"), # Composite primary key
104104
)
105105
def exchange_rates(
106-
start_date: TAnyDateTime,
107106
end_date: TAnyDateTime,
107+
start_date: dlt.sources.incremental[TAnyDateTime] = dlt.sources.incremental("date"),
108108
) -> Iterator[dict]:
109109
"""
110110
Fetches exchange rates for a specified date range.
111-
If only start_date is provided, fetches data for that date.
111+
If only start_date is provided, fetches data until now.
112112
If both start_date and end_date are provided, fetches data for each day in the range.
113113
"""
114-
start_date_str = ensure_pendulum_datetime(start_date).format("YYYY-MM-DD")
115-
end_date_str = ensure_pendulum_datetime(end_date).format("YYYY-MM-DD")
114+
# Ensure start_date.last_value is not None
115+
if start_date.last_value is None:
116+
raise ValueError("start_date.last_value cannot be None")
117+
118+
# Ensure start_date.last_value is a pendulum.DateTime object
119+
start_date_obj = ensure_pendulum_datetime(start_date.last_value)
120+
start_date_str = start_date_obj.format("YYYY-MM-DD")
121+
122+
# Ensure end_date is a pendulum.DateTime object
123+
end_date_obj = ensure_pendulum_datetime(end_date)
124+
end_date_str = end_date_obj.format("YYYY-MM-DD")
116125

117126
# Compose the URL
118127
url = f"{start_date_str}..{end_date_str}?"
@@ -121,22 +130,23 @@ def exchange_rates(
121130
data = get_path_with_retry(url)
122131

123132
# Extract base currency and rates from the API response
124-
base_currency = data["base"]
125133
rates = data["rates"]
126134

127135
# Iterate over the rates dictionary (one entry per date)
128136
for date, daily_rates in rates.items():
137+
formatted_date = pendulum.parse(date)
138+
129139
# Add the base currency with a rate of 1.0
130140
yield {
131-
"date": date,
132-
"currency_code": base_currency,
141+
"date": formatted_date,
142+
"currency_code": "EUR",
133143
"rate": 1.0,
134144
}
135145

136146
# Add all other currencies and their rates
137147
for currency_code, rate in daily_rates.items():
138148
yield {
139-
"date": date,
149+
"date": formatted_date,
140150
"currency_code": currency_code,
141151
"rate": rate,
142152
}

ingestr/src/frankfurter/helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99

1010
def get_url_with_retry(url: str) -> StrAny:
11-
r = requests.get(url)
11+
r = requests.get(url, timeout=5)
1212
return r.json() # type: ignore
1313

1414

ingestr/src/sources.py

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@
6767
from ingestr.src.facebook_ads import facebook_ads_source, facebook_insights_source
6868
from ingestr.src.filesystem import readers
6969
from ingestr.src.filters import table_adapter_exclude_columns
70-
from ingestr.src.frankfurter import frankfurter_source
71-
from ingestr.src.frankfurter.helpers import validate_dates
7270
from ingestr.src.github import github_reactions, github_repo_events, github_stargazers
7371
from ingestr.src.google_ads import google_ads
7472
from ingestr.src.google_analytics import google_analytics
@@ -2050,31 +2048,32 @@ def handles_incrementality(self) -> bool:
20502048
return True
20512049

20522050
def dlt_source(self, uri: str, table: str, **kwargs):
2053-
# start and end dates only assigned and validated for exchange_rates table
2054-
# Note: if an end date but no start date is provided, start date and end date will be set to current date
2055-
if table == "exchange_rates":
2056-
if kwargs.get("interval_start"):
2057-
start_date = ensure_pendulum_datetime(str(kwargs.get("interval_start")))
2058-
if kwargs.get("interval_end"):
2059-
end_date = ensure_pendulum_datetime(str(kwargs.get("interval_end")))
2060-
else:
2061-
end_date = start_date
2051+
if kwargs.get("incremental_key"):
2052+
raise ValueError(
2053+
"Frankfurter takes care of incrementality on its own, you should not provide incremental_key"
2054+
)
2055+
2056+
if kwargs.get("interval_start"):
2057+
start_date = ensure_pendulum_datetime(str(kwargs.get("interval_start")))
2058+
if kwargs.get("interval_end"):
2059+
end_date = ensure_pendulum_datetime(str(kwargs.get("interval_end")))
20622060
else:
2063-
start_date = pendulum.now("Europe/Berlin")
20642061
end_date = pendulum.now("Europe/Berlin")
2065-
validate_dates(start_date=start_date, end_date=end_date)
2066-
2067-
# For currencies and latest tables, set start and end dates to current date
20682062
else:
20692063
start_date = pendulum.now("Europe/Berlin")
20702064
end_date = pendulum.now("Europe/Berlin")
20712065

2072-
# Validate table
2073-
if table not in ["currencies", "latest", "exchange_rates"]:
2074-
raise UnsupportedResourceError(table, "Frankfurter")
2066+
from ingestr.src.frankfurter import frankfurter_source
2067+
from ingestr.src.frankfurter.helpers import validate_dates
2068+
2069+
validate_dates(start_date=start_date, end_date=end_date)
20752070

2076-
return frankfurter_source(
2077-
table=table,
2071+
src = frankfurter_source(
20782072
start_date=start_date,
20792073
end_date=end_date,
20802074
)
2075+
2076+
if table not in src.resources:
2077+
raise UnsupportedResourceError(table, "Frankfurter")
2078+
2079+
return src.with_resources(table)

0 commit comments

Comments
 (0)