Skip to content

Commit b96d064

Browse files
authored
Merge pull request #166 from jbrand-dsp/frankfurter-update
Frankfurter update
2 parents 8556a9b + 3e12ef7 commit b96d064

File tree

5 files changed

+187
-62
lines changed

5 files changed

+187
-62
lines changed

docs/supported-sources/frankfurter.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ ingestr ingest \
4141
### **`--interval-end` (Optional)**
4242
- **Description**: The end date for fetching historical exchange rates.
4343
- **Value**: A date in the format `YYYY-MM-DD` (e.g., `'2025-03-28'`).
44-
- **Purpose**: Defines the ending point for fetching historical data. If not provided, it defaults to the value of `--interval-start`.
44+
- **Purpose**: Defines the end point for fetching historical data.
45+
- If `--interval-start` is provided without `--interval-end`, `--interval-end` defaults to the current date and retrieves up until the latest published data.
46+
- If `--interval-end` is provided without `--interval-start`, it will be ignored and the call will retrieve the last published data.
4547
- For `latest` and `currencies` this parameter is ignored.
4648

4749
---
@@ -104,7 +106,7 @@ ingestr ingest \
104106
- **Primary Key**: Composite key of `date` and `currency_name`.
105107
- **Notes**:
106108
- An optional start and end date can be added via the arguments `--interval-start` and optionally `--interval-end` to define the date range (see examples below). If no start date is specified, the date will default today's date (and thus return the latest exchange rates).
107-
- If a start date but no end date is specified, then the end date will default to the start date and ingestr will retrieve data for the specified start date only.
109+
- If a start date but no end date is specified, then the end date will default to today's date and ingestr will retrieve data up until the latest published data.
108110
- Note that the [Frankfurter API](https://www.frankfurter.dev/) only publishes updates Monday-Friday. If the given date is on the weekend, the date will default to the previous Friday. The source is however implemented in ingestr in such a way as to avoid duplicating rows of data in this case (see [Incremental Loading - Replace](https://bruin-data.github.io/ingestr/getting-started/incremental-loading.html)).
109111

110112
#### **Example Table: Handling Weekend Dates**

ingestr/main_test.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3020,6 +3020,125 @@ def test_applovin_source(testcase):
30203020
testcase()
30213021

30223022

3023+
def frankfurter_test_cases() -> Iterable[Callable]:
3024+
def invalid_source_table(dest_uri):
3025+
schema = f"testschema_frankfurter_{get_random_string(5)}"
3026+
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
3027+
result = invoke_ingest_command(
3028+
"frankfurter://",
3029+
"invalid table",
3030+
dest_uri,
3031+
dest_table,
3032+
)
3033+
assert result.exit_code != 0
3034+
assert has_exception(result.exception, UnsupportedResourceError)
3035+
3036+
def interval_start_does_not_exceed_interval_end(dest_uri):
3037+
schema = f"testschema_frankfurter_{get_random_string(5)}"
3038+
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
3039+
result = invoke_ingest_command(
3040+
"frankfurter://",
3041+
"exchange_rates",
3042+
dest_uri,
3043+
dest_table,
3044+
interval_start="2025-04-11",
3045+
interval_end="2025-04-10",
3046+
)
3047+
assert result.exit_code != 0
3048+
assert has_exception(result.exception, ValueError)
3049+
assert "Interval-end cannot be before interval-start." in str(result.exception)
3050+
3051+
def interval_start_can_equal_interval_end(dest_uri):
3052+
schema = f"testschema_frankfurter_{get_random_string(5)}"
3053+
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
3054+
result = invoke_ingest_command(
3055+
"frankfurter://",
3056+
"exchange_rates",
3057+
dest_uri,
3058+
dest_table,
3059+
interval_start="2025-04-10",
3060+
interval_end="2025-04-10",
3061+
)
3062+
assert result.exit_code == 0
3063+
3064+
def interval_start_does_not_exceed_current_date(dest_uri):
3065+
schema = f"testschema_frankfurter_{get_random_string(5)}"
3066+
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
3067+
start_date = pendulum.now().add(days=1).format("YYYY-MM-DD")
3068+
result = invoke_ingest_command(
3069+
"frankfurter://",
3070+
"exchange_rates",
3071+
dest_uri,
3072+
dest_table,
3073+
interval_start=start_date,
3074+
)
3075+
assert result.exit_code != 0
3076+
assert has_exception(result.exception, ValueError)
3077+
assert "Interval-start cannot be in the future." in str(result.exception)
3078+
3079+
def interval_end_does_not_exceed_current_date(dest_uri):
3080+
schema = f"testschema_frankfurter_{get_random_string(5)}"
3081+
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
3082+
start_date = pendulum.now().subtract(days=1).format("YYYY-MM-DD")
3083+
end_date = pendulum.now().add(days=1).format("YYYY-MM-DD")
3084+
result = invoke_ingest_command(
3085+
"frankfurter://",
3086+
"exchange_rates",
3087+
dest_uri,
3088+
dest_table,
3089+
interval_start=start_date,
3090+
interval_end=end_date,
3091+
)
3092+
assert result.exit_code != 0
3093+
assert has_exception(result.exception, ValueError)
3094+
assert "Interval-end cannot be in the future." in str(result.exception)
3095+
3096+
def exchange_rate_on_specific_date(dest_uri):
3097+
schema = f"testschema_frankfurter_{get_random_string(5)}"
3098+
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
3099+
start_date = "2025-01-03"
3100+
end_date = "2025-01-03"
3101+
result = invoke_ingest_command(
3102+
"frankfurter://",
3103+
"exchange_rates",
3104+
dest_uri,
3105+
dest_table,
3106+
interval_start=start_date,
3107+
interval_end=end_date,
3108+
)
3109+
assert result.exit_code == 0, f"Ingestion failed: {result.output}"
3110+
3111+
dest_engine = sqlalchemy.create_engine(dest_uri)
3112+
3113+
query = f"SELECT rate FROM {dest_table} WHERE currency_code = 'GBP'"
3114+
with dest_engine.connect() as conn:
3115+
rows = conn.execute(query).fetchall()
3116+
3117+
# Assert that the rate for GBP is 0.82993
3118+
assert len(rows) > 0, "No data found for GBP"
3119+
assert abs(rows[0][0] - 0.82993) <= 1e-6, (
3120+
f"Expected rate 0.82993, but got {rows[0][0]}"
3121+
)
3122+
3123+
return [
3124+
invalid_source_table,
3125+
interval_start_does_not_exceed_interval_end,
3126+
interval_start_can_equal_interval_end,
3127+
interval_start_does_not_exceed_current_date,
3128+
interval_end_does_not_exceed_current_date,
3129+
exchange_rate_on_specific_date,
3130+
]
3131+
3132+
3133+
@pytest.mark.parametrize(
3134+
"dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys())
3135+
)
3136+
@pytest.mark.parametrize("test_case", frankfurter_test_cases())
3137+
def test_frankfurter(dest, test_case):
3138+
test_case(dest.start())
3139+
dest.stop()
3140+
3141+
30233142
def test_version_cmd():
30243143
"""
30253144
This should always be 0.0.0-dev.
Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any, Iterator, Optional
1+
from typing import Any, Iterator
22

33
import dlt
44
from dlt.common.pendulum import pendulum
@@ -13,25 +13,28 @@
1313
max_table_nesting=0,
1414
)
1515
def frankfurter_source(
16-
table: str,
17-
start_date: Optional[TAnyDateTime] = None,
18-
end_date: Optional[TAnyDateTime] = None,
16+
start_date: TAnyDateTime,
17+
end_date: TAnyDateTime,
1918
) -> Any:
2019
"""
2120
A dlt source for the frankfurter.dev API. It groups several resources (in this case frankfurter.dev API endpoints) containing
2221
various types of data: currencies, latest rates, historical rates.
23-
24-
Returns the appropriate resource based on the provided parameters.
2522
"""
26-
# Determine which resource to return based on the `table` parameter
27-
if table == "currencies":
28-
return currencies()
23+
date_time = dlt.sources.incremental(
24+
25+
"date",
26+
initial_value=start_date,
27+
end_value=end_date,
28+
range_start="closed",
29+
range_end="closed",
30+
)
2931

30-
elif table == "latest":
31-
return latest()
32+
return (
33+
currencies(),
34+
latest(),
35+
exchange_rates(start_date=date_time, end_date=end_date),
3236

33-
elif table == "exchange_rates":
34-
return exchange_rates(start_date=start_date, end_date=end_date)
37+
)
3538

3639

3740
@dlt.resource(
@@ -53,13 +56,13 @@ def currencies() -> Iterator[dict]:
5356

5457

5558
@dlt.resource(
56-
write_disposition="replace",
59+
write_disposition="merge",
5760
columns={
5861
"date": {"data_type": "text"},
59-
"currency_name": {"data_type": "text"},
62+
"currency_code": {"data_type": "text"},
6063
"rate": {"data_type": "double"},
6164
},
62-
primary_key=["date", "currency_name"], # Composite primary key
65+
primary_key=["date", "currency_code"], # Composite primary key
6366
)
6467
def latest() -> Iterator[dict]:
6568
"""
@@ -69,50 +72,54 @@ def latest() -> Iterator[dict]:
6972
url = "latest?"
7073

7174
# Fetch data
72-
latest_data = get_path_with_retry(url)
75+
data = get_path_with_retry(url)
7376

7477
# Extract rates and base currency
75-
rates = latest_data["rates"]
78+
rates = data["rates"]
7679

77-
# Prepare the date
78-
date = pendulum.now().to_date_string()
80+
date = pendulum.parse(data["date"])
7981

8082
# Add the base currency (EUR) with a rate of 1.0
8183
yield {
8284
"date": date,
83-
"currency_name": "EUR",
85+
"currency_code": "EUR",
8486
"rate": 1.0,
8587
}
8688

8789
# Add all currencies and their rates
88-
for currency_name, rate in rates.items():
90+
for currency_code, rate in rates.items():
8991
yield {
9092
"date": date,
91-
"currency_name": currency_name,
93+
"currency_code": currency_code,
9294
"rate": rate,
9395
}
9496

9597

9698
@dlt.resource(
97-
write_disposition="replace",
99+
write_disposition="merge",
98100
columns={
99101
"date": {"data_type": "text"},
100-
"currency_name": {"data_type": "text"},
102+
"currency_code": {"data_type": "text"},
101103
"rate": {"data_type": "double"},
102104
},
103-
primary_key=["date", "currency_name"], # Composite primary key
105+
primary_key=("date", "currency_code"), # Composite primary key
104106
)
105107
def exchange_rates(
106-
start_date: TAnyDateTime,
107108
end_date: TAnyDateTime,
109+
start_date: dlt.sources.incremental[TAnyDateTime] = dlt.sources.incremental("date"),
108110
) -> Iterator[dict]:
109111
"""
110112
Fetches exchange rates for a specified date range.
111-
If only start_date is provided, fetches data for that date.
113+
If only start_date is provided, fetches data until now.
112114
If both start_date and end_date are provided, fetches data for each day in the range.
113115
"""
114-
start_date_str = ensure_pendulum_datetime(start_date).format("YYYY-MM-DD")
115-
end_date_str = ensure_pendulum_datetime(end_date).format("YYYY-MM-DD")
116+
# Ensure start_date.last_value is a pendulum.DateTime object
117+
start_date_obj = ensure_pendulum_datetime(start_date.last_value) # type: ignore
118+
start_date_str = start_date_obj.format("YYYY-MM-DD")
119+
120+
# Ensure end_date is a pendulum.DateTime object
121+
end_date_obj = ensure_pendulum_datetime(end_date)
122+
end_date_str = end_date_obj.format("YYYY-MM-DD")
116123

117124
# Compose the URL
118125
url = f"{start_date_str}..{end_date_str}?"
@@ -121,22 +128,23 @@ def exchange_rates(
121128
data = get_path_with_retry(url)
122129

123130
# Extract base currency and rates from the API response
124-
base_currency = data["base"]
125131
rates = data["rates"]
126132

127133
# Iterate over the rates dictionary (one entry per date)
128134
for date, daily_rates in rates.items():
135+
formatted_date = pendulum.parse(date)
136+
129137
# Add the base currency with a rate of 1.0
130138
yield {
131-
"date": date,
132-
"currency_name": base_currency,
139+
"date": formatted_date,
140+
"currency_code": "EUR",
133141
"rate": 1.0,
134142
}
135143

136144
# Add all other currencies and their rates
137-
for currency_name, rate in daily_rates.items():
145+
for currency_code, rate in daily_rates.items():
138146
yield {
139-
"date": date,
140-
"currency_name": currency_name,
147+
"date": formatted_date,
148+
"currency_code": currency_code,
141149
"rate": rate,
142150
}

ingestr/src/frankfurter/helpers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99

1010
def get_url_with_retry(url: str) -> StrAny:
11-
r = requests.get(url)
11+
r = requests.get(url, timeout=5)
1212
return r.json() # type: ignore
1313

1414

@@ -19,7 +19,7 @@ def get_path_with_retry(path: str) -> StrAny:
1919
def validate_dates(start_date: datetime, end_date: datetime) -> None:
2020
current_date = pendulum.now()
2121

22-
# Check if start_date is in the future
22+
# Check if start_date is in the futurep
2323
if start_date > current_date:
2424
raise ValueError("Interval-start cannot be in the future.")
2525

ingestr/src/sources.py

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2173,36 +2173,32 @@ def handles_incrementality(self) -> bool:
21732173
return True
21742174

21752175
def dlt_source(self, uri: str, table: str, **kwargs):
2176-
# start and end dates only assigned and validated for exchange_rates table
2177-
# Note: if an end date but no start date is provided, start date and end date will be set to current date
2178-
from ingestr.src.frankfurter import frankfurter_source
2179-
from ingestr.src.frankfurter.helpers import validate_dates
2176+
if kwargs.get("incremental_key"):
2177+
raise ValueError(
2178+
"Frankfurter takes care of incrementality on its own, you should not provide incremental_key"
2179+
)
21802180

2181-
if table == "exchange_rates":
2182-
if kwargs.get("interval_start"):
2183-
start_date = ensure_pendulum_datetime(str(kwargs.get("interval_start")))
2184-
if kwargs.get("interval_end"):
2185-
end_date = ensure_pendulum_datetime(str(kwargs.get("interval_end")))
2186-
else:
2187-
end_date = start_date
2181+
if kwargs.get("interval_start"):
2182+
start_date = ensure_pendulum_datetime(str(kwargs.get("interval_start")))
2183+
if kwargs.get("interval_end"):
2184+
end_date = ensure_pendulum_datetime(str(kwargs.get("interval_end")))
21882185
else:
2189-
start_date = pendulum.now()
21902186
end_date = pendulum.now()
2191-
validate_dates(start_date=start_date, end_date=end_date)
2192-
2193-
# For currencies and latest tables, set start and end dates to current date
21942187
else:
21952188
start_date = pendulum.now()
21962189
end_date = pendulum.now()
21972190

2198-
# Validate table
2199-
if table not in ["currencies", "latest", "exchange_rates"]:
2200-
raise ValueError(
2201-
f"Table '{table}' is not supported for Frankfurter source."
2202-
)
2191+
from ingestr.src.frankfurter import frankfurter_source
2192+
from ingestr.src.frankfurter.helpers import validate_dates
2193+
2194+
validate_dates(start_date=start_date, end_date=end_date)
22032195

2204-
return frankfurter_source(
2205-
table=table,
2196+
src = frankfurter_source(
22062197
start_date=start_date,
22072198
end_date=end_date,
22082199
)
2200+
2201+
if table not in src.resources:
2202+
raise UnsupportedResourceError(table, "Frankfurter")
2203+
2204+
return src.with_resources(table)

0 commit comments

Comments
 (0)