Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/supported-sources/frankfurter.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ ingestr ingest \
### **`--interval-end` (Optional)**
- **Description**: The end date for fetching historical exchange rates.
- **Value**: A date in the format `YYYY-MM-DD` (e.g., `'2025-03-28'`).
- **Purpose**: Defines the ending point for fetching historical data. If not provided, it defaults to the value of `--interval-start`.
- **Purpose**: Defines the end point for fetching historical data.
- If not provided, it defaults to the value of `--interval-start`.
- If `--interval-end` is provided without `--interval-start`, it will be ignored and the call will retrieve the last published data.
- For `latest` and `currencies` this parameter is ignored.

---
Expand Down Expand Up @@ -104,7 +106,7 @@ ingestr ingest \
- **Primary Key**: Composite key of `date` and `currency_name`.
- **Notes**:
- An optional start and end date can be added via the arguments `--interval-start` and optionally `--interval-end` to define the date range (see examples below). If no start date is specified, the date will default today's date (and thus return the latest exchange rates).
- If a start date but no end date is specified, then the end date will default to the start date and ingestr will retrieve data for the specified start date only.
- If a start date but no end date is specified, then the end date will default to today's date and ingestr will retrieve data up until the latest published data.
- Note that the [Frankfurter API](https://www.frankfurter.dev/) only publishes updates Monday-Friday. If the given date is on the weekend, the date will default to the previous Friday. The source is however implemented in ingestr in such a way as to avoid duplicating rows of data in this case (see [Incremental Loading - Replace](https://bruin-data.github.io/ingestr/getting-started/incremental-loading.html)).

#### **Example Table: Handling Weekend Dates**
Expand Down
117 changes: 117 additions & 0 deletions ingestr/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2953,6 +2953,123 @@ def test_applovin_source(testcase):
testcase()


def frankfurter_test_cases() -> Iterable[Callable]:
def invalid_source_table(dest_uri):
schema = f"testschema_frankfurter_{get_random_string(5)}"
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
result = invoke_ingest_command(
"frankfurter://",
"invalid table",
dest_uri,
dest_table,
)
assert result.exit_code != 0
assert has_exception(result.exception, UnsupportedResourceError)

def interval_start_does_not_exceed_interval_end(dest_uri):
schema = f"testschema_frankfurter_{get_random_string(5)}"
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
result = invoke_ingest_command(
"frankfurter://",
"exchange_rates",
dest_uri,
dest_table,
interval_start="2025-04-11",
interval_end="2025-04-10",
)
assert result.exit_code != 0
assert has_exception(result.exception, ValueError)
assert "Interval-end cannot be before interval-start." in str(result.exception)

def interval_start_can_equal_interval_end(dest_uri):
schema = f"testschema_frankfurter_{get_random_string(5)}"
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
result = invoke_ingest_command(
"frankfurter://",
"exchange_rates",
dest_uri,
dest_table,
interval_start="2025-04-10",
interval_end="2025-04-10",
)
assert result.exit_code == 0

def interval_start_does_not_exceed_current_date(dest_uri):
schema = f"testschema_frankfurter_{get_random_string(5)}"
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
start_date = pendulum.now("Europe/Berlin").add(days=1).format("YYYY-MM-DD")
result = invoke_ingest_command(
"frankfurter://",
"exchange_rates",
dest_uri,
dest_table,
interval_start=start_date,
)
assert result.exit_code != 0
assert has_exception(result.exception, ValueError)
assert "Interval-start cannot be in the future." in str(result.exception)

def interval_end_does_not_exceed_current_date(dest_uri):
schema = f"testschema_frankfurter_{get_random_string(5)}"
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
start_date = pendulum.now("Europe/Berlin").subtract(days=1).format("YYYY-MM-DD")
end_date = pendulum.now("Europe/Berlin").add(days=1).format("YYYY-MM-DD")
result = invoke_ingest_command(
"frankfurter://",
"exchange_rates",
dest_uri,
dest_table,
interval_start=start_date,
interval_end=end_date,
)
assert result.exit_code != 0
assert has_exception(result.exception, ValueError)
assert "Interval-end cannot be in the future." in str(result.exception)

def exchange_rate_on_specific_date(dest_uri):
schema = f"testschema_frankfurter_{get_random_string(5)}"
dest_table = f"{schema}.frankfurter_{get_random_string(5)}"
start_date = "2025-01-03"
end_date = "2025-01-03"
result = invoke_ingest_command(
"frankfurter://",
"exchange_rates",
dest_uri,
dest_table,
interval_start=start_date,
interval_end=end_date,
)
assert result.exit_code == 0, f"Ingestion failed: {result.output}"

dest_engine = sqlalchemy.create_engine(dest_uri)

query = f"SELECT rate FROM {dest_table} WHERE currency_code = 'GBP'"
with dest_engine.connect() as conn:
rows = conn.execute(query).fetchall()

# Assert that the rate for GBP is 0.82993
assert len(rows) > 0, "No data found for GBP"
assert abs(rows[0][0] - 0.82993) <= 1e-6, f"Expected rate 0.82993, but got {rows[0][0]}"

return [
invalid_source_table,
interval_start_does_not_exceed_interval_end,
interval_start_can_equal_interval_end,
interval_start_does_not_exceed_current_date,
interval_end_does_not_exceed_current_date,
exchange_rate_on_specific_date,
]


@pytest.mark.parametrize(
"dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys())
)
@pytest.mark.parametrize("test_case", frankfurter_test_cases())
def test_frankfurter(dest, test_case):
test_case(dest.start())
dest.stop()


def test_version_cmd():
"""
This should always be 0.0.0-dev.
Expand Down
84 changes: 47 additions & 37 deletions ingestr/src/frankfurter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Iterator, Optional
from typing import Any, Iterator

import dlt
from dlt.common.pendulum import pendulum
Expand All @@ -13,25 +13,26 @@
max_table_nesting=0,
)
def frankfurter_source(
table: str,
start_date: Optional[TAnyDateTime] = None,
end_date: Optional[TAnyDateTime] = None,
start_date: TAnyDateTime,
end_date: TAnyDateTime,
) -> Any:
"""
A dlt source for the frankfurter.dev API. It groups several resources (in this case frankfurter.dev API endpoints) containing
various types of data: currencies, latest rates, historical rates.

Returns the appropriate resource based on the provided parameters.
"""
# Determine which resource to return based on the `table` parameter
if table == "currencies":
return currencies()
dateTime = dlt.sources.incremental(
"date",
initial_value=start_date,
end_value=end_date,
range_start="closed",
range_end="closed",
)

elif table == "latest":
return latest()

elif table == "exchange_rates":
return exchange_rates(start_date=start_date, end_date=end_date)
return (
currencies(),
latest(),
exchange_rates(start_date=dateTime, end_date=end_date),
)


@dlt.resource(
Expand All @@ -53,13 +54,13 @@ def currencies() -> Iterator[dict]:


@dlt.resource(
write_disposition="replace",
write_disposition="merge",
columns={
"date": {"data_type": "text"},
"currency_name": {"data_type": "text"},
"currency_code": {"data_type": "text"},
"rate": {"data_type": "double"},
},
primary_key=["date", "currency_name"], # Composite primary key
primary_key=["date", "currency_code"], # Composite primary key
)
def latest() -> Iterator[dict]:
"""
Expand All @@ -69,50 +70,58 @@ def latest() -> Iterator[dict]:
url = "latest?"

# Fetch data
latest_data = get_path_with_retry(url)
data = get_path_with_retry(url)

# Extract rates and base currency
rates = latest_data["rates"]
rates = data["rates"]

# Prepare the date
date = pendulum.now().to_date_string()
date = pendulum.parse(data["date"])

# Add the base currency (EUR) with a rate of 1.0
yield {
"date": date,
"currency_name": "EUR",
"currency_code": "EUR",
"rate": 1.0,
}

# Add all currencies and their rates
for currency_name, rate in rates.items():
for currency_code, rate in rates.items():
yield {
"date": date,
"currency_name": currency_name,
"currency_code": currency_code,
"rate": rate,
}


@dlt.resource(
write_disposition="replace",
write_disposition="merge",
columns={
"date": {"data_type": "text"},
"currency_name": {"data_type": "text"},
"currency_code": {"data_type": "text"},
"rate": {"data_type": "double"},
},
primary_key=["date", "currency_name"], # Composite primary key
primary_key=("date", "currency_code"), # Composite primary key
)
def exchange_rates(
start_date: TAnyDateTime,
end_date: TAnyDateTime,
start_date: dlt.sources.incremental[TAnyDateTime] = dlt.sources.incremental("date"),
) -> Iterator[dict]:
"""
Fetches exchange rates for a specified date range.
If only start_date is provided, fetches data for that date.
If only start_date is provided, fetches data until now.
If both start_date and end_date are provided, fetches data for each day in the range.
"""
start_date_str = ensure_pendulum_datetime(start_date).format("YYYY-MM-DD")
end_date_str = ensure_pendulum_datetime(end_date).format("YYYY-MM-DD")
# Ensure start_date.last_value is not None
if start_date.last_value is None:
raise ValueError("start_date.last_value cannot be None")

# Ensure start_date.last_value is a pendulum.DateTime object
start_date_obj = ensure_pendulum_datetime(start_date.last_value)
start_date_str = start_date_obj.format("YYYY-MM-DD")

# Ensure end_date is a pendulum.DateTime object
end_date_obj = ensure_pendulum_datetime(end_date)
end_date_str = end_date_obj.format("YYYY-MM-DD")

# Compose the URL
url = f"{start_date_str}..{end_date_str}?"
Expand All @@ -121,22 +130,23 @@ def exchange_rates(
data = get_path_with_retry(url)

# Extract base currency and rates from the API response
base_currency = data["base"]
rates = data["rates"]

# Iterate over the rates dictionary (one entry per date)
for date, daily_rates in rates.items():
formatted_date = pendulum.parse(date)

# Add the base currency with a rate of 1.0
yield {
"date": date,
"currency_name": base_currency,
"date": formatted_date,
"currency_code": "EUR",
"rate": 1.0,
}

# Add all other currencies and their rates
for currency_name, rate in daily_rates.items():
for currency_code, rate in daily_rates.items():
yield {
"date": date,
"currency_name": currency_name,
"date": formatted_date,
"currency_code": currency_code,
"rate": rate,
}
4 changes: 2 additions & 2 deletions ingestr/src/frankfurter/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


def get_url_with_retry(url: str) -> StrAny:
r = requests.get(url)
r = requests.get(url, timeout=5)
return r.json() # type: ignore


Expand All @@ -17,7 +17,7 @@ def get_path_with_retry(path: str) -> StrAny:


def validate_dates(start_date: datetime, end_date: datetime) -> None:
current_date = pendulum.now()
current_date = pendulum.now("Europe/Berlin")

# Check if start_date is in the future
if start_date > current_date:
Expand Down
47 changes: 22 additions & 25 deletions ingestr/src/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@
from ingestr.src.facebook_ads import facebook_ads_source, facebook_insights_source
from ingestr.src.filesystem import readers
from ingestr.src.filters import table_adapter_exclude_columns
from ingestr.src.frankfurter import frankfurter_source
from ingestr.src.frankfurter.helpers import validate_dates
from ingestr.src.github import github_reactions, github_repo_events, github_stargazers
from ingestr.src.google_ads import google_ads
from ingestr.src.google_analytics import google_analytics
Expand Down Expand Up @@ -2050,33 +2048,32 @@ def handles_incrementality(self) -> bool:
return True

def dlt_source(self, uri: str, table: str, **kwargs):
# start and end dates only assigned and validated for exchange_rates table
# Note: if an end date but no start date is provided, start date and end date will be set to current date
if table == "exchange_rates":
if kwargs.get("interval_start"):
start_date = ensure_pendulum_datetime(str(kwargs.get("interval_start")))
if kwargs.get("interval_end"):
end_date = ensure_pendulum_datetime(str(kwargs.get("interval_end")))
else:
end_date = start_date
else:
start_date = pendulum.now()
end_date = pendulum.now()
validate_dates(start_date=start_date, end_date=end_date)
if kwargs.get("incremental_key"):
raise ValueError(
"Frankfurter takes care of incrementality on its own, you should not provide incremental_key"
)

# For currencies and latest tables, set start and end dates to current date
if kwargs.get("interval_start"):
start_date = ensure_pendulum_datetime(str(kwargs.get("interval_start")))
if kwargs.get("interval_end"):
end_date = ensure_pendulum_datetime(str(kwargs.get("interval_end")))
else:
end_date = pendulum.now("Europe/Berlin")
else:
start_date = pendulum.now()
end_date = pendulum.now()
start_date = pendulum.now("Europe/Berlin")
end_date = pendulum.now("Europe/Berlin")

# Validate table
if table not in ["currencies", "latest", "exchange_rates"]:
raise ValueError(
f"Table '{table}' is not supported for Frankfurter source."
)
from ingestr.src.frankfurter import frankfurter_source
from ingestr.src.frankfurter.helpers import validate_dates

return frankfurter_source(
table=table,
validate_dates(start_date=start_date, end_date=end_date)

src = frankfurter_source(
start_date=start_date,
end_date=end_date,
)

if table not in src.resources:
raise UnsupportedResourceError(table, "Frankfurter")

return src.with_resources(table)