Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/supported-sources/frankfurter.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ ingestr ingest \
### **`--interval-end` (Optional)**
- **Description**: The end date for fetching historical exchange rates.
- **Value**: A date in the format `YYYY-MM-DD` (e.g., `'2025-03-28'`).
- **Purpose**: Defines the ending point for fetching historical data. If not provided, it defaults to the value of `--interval-start`.
- **Purpose**: Defines the end point for fetching historical data.
- If `--interval-start` is provided without `--interval-end`, `--interval-end` defaults to the current date and retrieves up until the latest published data.
- If `--interval-end` is provided without `--interval-start`, it will be ignored and the call will retrieve the last published data.
- For `latest` and `currencies` this parameter is ignored.

---
Expand Down Expand Up @@ -104,7 +106,7 @@ ingestr ingest \
- **Primary Key**: Composite key of `date` and `currency_name`.
- **Notes**:
- An optional start and end date can be added via the arguments `--interval-start` and optionally `--interval-end` to define the date range (see examples below). If no start date is specified, the date will default today's date (and thus return the latest exchange rates).
- If a start date but no end date is specified, then the end date will default to the start date and ingestr will retrieve data for the specified start date only.
- If a start date but no end date is specified, then the end date will default to today's date and ingestr will retrieve data up until the latest published data.
- Note that the [Frankfurter API](https://www.frankfurter.dev/) only publishes updates Monday-Friday. If the given date is on the weekend, the date will default to the previous Friday. The source is however implemented in ingestr in such a way as to avoid duplicating rows of data in this case (see [Incremental Loading - Replace](https://bruin-data.github.io/ingestr/getting-started/incremental-loading.html)).

#### **Example Table: Handling Weekend Dates**
Expand Down
119 changes: 119 additions & 0 deletions ingestr/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3020,6 +3020,125 @@ def test_applovin_source(testcase):
testcase()


def frankfurter_test_cases() -> Iterable[Callable]:
def invalid_source_table(dest_uri):
schema = f"testschema_frankfurter_{get_random_string(5)}"
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
result = invoke_ingest_command(
"frankfurter://",
"invalid table",
dest_uri,
dest_table,
)
assert result.exit_code != 0
assert has_exception(result.exception, UnsupportedResourceError)

def interval_start_does_not_exceed_interval_end(dest_uri):
schema = f"testschema_frankfurter_{get_random_string(5)}"
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
result = invoke_ingest_command(
"frankfurter://",
"exchange_rates",
dest_uri,
dest_table,
interval_start="2025-04-11",
interval_end="2025-04-10",
)
assert result.exit_code != 0
assert has_exception(result.exception, ValueError)
assert "Interval-end cannot be before interval-start." in str(result.exception)

def interval_start_can_equal_interval_end(dest_uri):
schema = f"testschema_frankfurter_{get_random_string(5)}"
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
result = invoke_ingest_command(
"frankfurter://",
"exchange_rates",
dest_uri,
dest_table,
interval_start="2025-04-10",
interval_end="2025-04-10",
)
assert result.exit_code == 0

def interval_start_does_not_exceed_current_date(dest_uri):
schema = f"testschema_frankfurter_{get_random_string(5)}"
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
start_date = pendulum.now().add(days=1).format("YYYY-MM-DD")
result = invoke_ingest_command(
"frankfurter://",
"exchange_rates",
dest_uri,
dest_table,
interval_start=start_date,
)
assert result.exit_code != 0
assert has_exception(result.exception, ValueError)
assert "Interval-start cannot be in the future." in str(result.exception)

def interval_end_does_not_exceed_current_date(dest_uri):
schema = f"testschema_frankfurter_{get_random_string(5)}"
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
start_date = pendulum.now().subtract(days=1).format("YYYY-MM-DD")
end_date = pendulum.now().add(days=1).format("YYYY-MM-DD")
result = invoke_ingest_command(
"frankfurter://",
"exchange_rates",
dest_uri,
dest_table,
interval_start=start_date,
interval_end=end_date,
)
assert result.exit_code != 0
assert has_exception(result.exception, ValueError)
assert "Interval-end cannot be in the future." in str(result.exception)

def exchange_rate_on_specific_date(dest_uri):
schema = f"testschema_frankfurter_{get_random_string(5)}"
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
start_date = "2025-01-03"
end_date = "2025-01-03"
result = invoke_ingest_command(
"frankfurter://",
"exchange_rates",
dest_uri,
dest_table,
interval_start=start_date,
interval_end=end_date,
)
assert result.exit_code == 0, f"Ingestion failed: {result.output}"

dest_engine = sqlalchemy.create_engine(dest_uri)

query = f"SELECT rate FROM {dest_table} WHERE currency_code = 'GBP'"
with dest_engine.connect() as conn:
rows = conn.execute(query).fetchall()

# Assert that the rate for GBP is 0.82993
assert len(rows) > 0, "No data found for GBP"
assert abs(rows[0][0] - 0.82993) <= 1e-6, (
f"Expected rate 0.82993, but got {rows[0][0]}"
)

return [
invalid_source_table,
interval_start_does_not_exceed_interval_end,
interval_start_can_equal_interval_end,
interval_start_does_not_exceed_current_date,
interval_end_does_not_exceed_current_date,
exchange_rate_on_specific_date,
]


@pytest.mark.parametrize(
"dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys())
)
@pytest.mark.parametrize("test_case", frankfurter_test_cases())
def test_frankfurter(dest, test_case):
test_case(dest.start())
dest.stop()


def test_version_cmd():
"""
This should always be 0.0.0-dev.
Expand Down
80 changes: 43 additions & 37 deletions ingestr/src/frankfurter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Iterator, Optional
from typing import Any, Iterator

import dlt
from dlt.common.pendulum import pendulum
Expand All @@ -13,25 +13,26 @@
max_table_nesting=0,
)
def frankfurter_source(
table: str,
start_date: Optional[TAnyDateTime] = None,
end_date: Optional[TAnyDateTime] = None,
start_date: TAnyDateTime,
end_date: TAnyDateTime,
) -> Any:
"""
A dlt source for the frankfurter.dev API. It groups several resources (in this case frankfurter.dev API endpoints) containing
various types of data: currencies, latest rates, historical rates.

Returns the appropriate resource based on the provided parameters.
"""
# Determine which resource to return based on the `table` parameter
if table == "currencies":
return currencies()

elif table == "latest":
return latest()
dateTime = dlt.sources.incremental(
"date",
initial_value=start_date,
end_value=end_date,
range_start="closed",
range_end="closed",
)

elif table == "exchange_rates":
return exchange_rates(start_date=start_date, end_date=end_date)
return (
currencies(),
latest(),
exchange_rates(start_date=dateTime, end_date=end_date),
)


@dlt.resource(
Expand All @@ -53,13 +54,13 @@ def currencies() -> Iterator[dict]:


@dlt.resource(
write_disposition="replace",
write_disposition="merge",
columns={
"date": {"data_type": "text"},
"currency_name": {"data_type": "text"},
"currency_code": {"data_type": "text"},
"rate": {"data_type": "double"},
},
primary_key=["date", "currency_name"], # Composite primary key
primary_key=["date", "currency_code"], # Composite primary key
)
def latest() -> Iterator[dict]:
"""
Expand All @@ -69,50 +70,54 @@ def latest() -> Iterator[dict]:
url = "latest?"

# Fetch data
latest_data = get_path_with_retry(url)
data = get_path_with_retry(url)

# Extract rates and base currency
rates = latest_data["rates"]
rates = data["rates"]

# Prepare the date
date = pendulum.now().to_date_string()
date = pendulum.parse(data["date"])

# Add the base currency (EUR) with a rate of 1.0
yield {
"date": date,
"currency_name": "EUR",
"currency_code": "EUR",
"rate": 1.0,
}

# Add all currencies and their rates
for currency_name, rate in rates.items():
for currency_code, rate in rates.items():
yield {
"date": date,
"currency_name": currency_name,
"currency_code": currency_code,
"rate": rate,
}


@dlt.resource(
write_disposition="replace",
write_disposition="merge",
columns={
"date": {"data_type": "text"},
"currency_name": {"data_type": "text"},
"currency_code": {"data_type": "text"},
"rate": {"data_type": "double"},
},
primary_key=["date", "currency_name"], # Composite primary key
primary_key=("date", "currency_code"), # Composite primary key
)
def exchange_rates(
start_date: TAnyDateTime,
end_date: TAnyDateTime,
start_date: dlt.sources.incremental[TAnyDateTime] = dlt.sources.incremental("date"),
) -> Iterator[dict]:
"""
Fetches exchange rates for a specified date range.
If only start_date is provided, fetches data for that date.
If only start_date is provided, fetches data until now.
If both start_date and end_date are provided, fetches data for each day in the range.
"""
start_date_str = ensure_pendulum_datetime(start_date).format("YYYY-MM-DD")
end_date_str = ensure_pendulum_datetime(end_date).format("YYYY-MM-DD")
# Ensure start_date.last_value is a pendulum.DateTime object
start_date_obj = ensure_pendulum_datetime(start_date.last_value) # type: ignore
start_date_str = start_date_obj.format("YYYY-MM-DD")

# Ensure end_date is a pendulum.DateTime object
end_date_obj = ensure_pendulum_datetime(end_date)
end_date_str = end_date_obj.format("YYYY-MM-DD")

# Compose the URL
url = f"{start_date_str}..{end_date_str}?"
Expand All @@ -121,22 +126,23 @@ def exchange_rates(
data = get_path_with_retry(url)

# Extract base currency and rates from the API response
base_currency = data["base"]
rates = data["rates"]

# Iterate over the rates dictionary (one entry per date)
for date, daily_rates in rates.items():
formatted_date = pendulum.parse(date)

# Add the base currency with a rate of 1.0
yield {
"date": date,
"currency_name": base_currency,
"date": formatted_date,
"currency_code": "EUR",
"rate": 1.0,
}

# Add all other currencies and their rates
for currency_name, rate in daily_rates.items():
for currency_code, rate in daily_rates.items():
yield {
"date": date,
"currency_name": currency_name,
"date": formatted_date,
"currency_code": currency_code,
"rate": rate,
}
4 changes: 2 additions & 2 deletions ingestr/src/frankfurter/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


def get_url_with_retry(url: str) -> StrAny:
r = requests.get(url)
r = requests.get(url, timeout=5)
return r.json() # type: ignore


Expand All @@ -19,7 +19,7 @@ def get_path_with_retry(path: str) -> StrAny:
def validate_dates(start_date: datetime, end_date: datetime) -> None:
current_date = pendulum.now()

# Check if start_date is in the future
# Check if start_date is in the futurep
if start_date > current_date:
raise ValueError("Interval-start cannot be in the future.")

Expand Down
40 changes: 18 additions & 22 deletions ingestr/src/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -2164,36 +2164,32 @@ def handles_incrementality(self) -> bool:
return True

def dlt_source(self, uri: str, table: str, **kwargs):
# start and end dates only assigned and validated for exchange_rates table
# Note: if an end date but no start date is provided, start date and end date will be set to current date
from ingestr.src.frankfurter import frankfurter_source
from ingestr.src.frankfurter.helpers import validate_dates
if kwargs.get("incremental_key"):
raise ValueError(
"Frankfurter takes care of incrementality on its own, you should not provide incremental_key"
)

if table == "exchange_rates":
if kwargs.get("interval_start"):
start_date = ensure_pendulum_datetime(str(kwargs.get("interval_start")))
if kwargs.get("interval_end"):
end_date = ensure_pendulum_datetime(str(kwargs.get("interval_end")))
else:
end_date = start_date
if kwargs.get("interval_start"):
start_date = ensure_pendulum_datetime(str(kwargs.get("interval_start")))
if kwargs.get("interval_end"):
end_date = ensure_pendulum_datetime(str(kwargs.get("interval_end")))
else:
start_date = pendulum.now()
end_date = pendulum.now()
validate_dates(start_date=start_date, end_date=end_date)

# For currencies and latest tables, set start and end dates to current date
else:
start_date = pendulum.now()
end_date = pendulum.now()

# Validate table
if table not in ["currencies", "latest", "exchange_rates"]:
raise ValueError(
f"Table '{table}' is not supported for Frankfurter source."
)
from ingestr.src.frankfurter import frankfurter_source
from ingestr.src.frankfurter.helpers import validate_dates

validate_dates(start_date=start_date, end_date=end_date)

return frankfurter_source(
table=table,
src = frankfurter_source(
start_date=start_date,
end_date=end_date,
)

if table not in src.resources:
raise UnsupportedResourceError(table, "Frankfurter")

return src.with_resources(table)