diff --git a/docs/supported-sources/frankfurter.md b/docs/supported-sources/frankfurter.md index dbe20bdb8..b2da04925 100644 --- a/docs/supported-sources/frankfurter.md +++ b/docs/supported-sources/frankfurter.md @@ -41,7 +41,9 @@ ingestr ingest \ ### **`--interval-end` (Optional)** - **Description**: The end date for fetching historical exchange rates. - **Value**: A date in the format `YYYY-MM-DD` (e.g., `'2025-03-28'`). -- **Purpose**: Defines the ending point for fetching historical data. If not provided, it defaults to the value of `--interval-start`. +- **Purpose**: Defines the end point for fetching historical data. + - If `--interval-start` is provided without `--interval-end`, `--interval-end` defaults to the current date and retrieves up until the latest published data. + - If `--interval-end` is provided without `--interval-start`, it will be ignored and the call will retrieve the last published data. - For `latest` and `currencies` this parameter is ignored. --- @@ -104,7 +106,7 @@ ingestr ingest \ - **Primary Key**: Composite key of `date` and `currency_name`. - **Notes**: - An optional start and end date can be added via the arguments `--interval-start` and optionally `--interval-end` to define the date range (see examples below). If no start date is specified, the date will default today's date (and thus return the latest exchange rates). - - If a start date but no end date is specified, then the end date will default to the start date and ingestr will retrieve data for the specified start date only. + - If a start date but no end date is specified, then the end date will default to today's date and ingestr will retrieve data up until the latest published data. - Note that the [Frankfurter API](https://www.frankfurter.dev/) only publishes updates Monday-Friday. If the given date is on the weekend, the date will default to the previous Friday. The source is however implemented in ingestr in such a way as to avoid duplicating rows of data in this case (see [Incremental Loading - Replace](https://bruin-data.github.io/ingestr/getting-started/incremental-loading.html)). #### **Example Table: Handling Weekend Dates** diff --git a/ingestr/main_test.py b/ingestr/main_test.py index beda40946..17d66b662 100644 --- a/ingestr/main_test.py +++ b/ingestr/main_test.py @@ -3020,6 +3020,125 @@ def test_applovin_source(testcase): testcase() +def frankfurter_test_cases() -> Iterable[Callable]: + def invalid_source_table(dest_uri): + schema = f"testschema_frankfurter_{get_random_string(5)}" + dest_table = f"{schema}.frankfurter_{get_random_string(5)}" + result = invoke_ingest_command( + "frankfurter://", + "invalid table", + dest_uri, + dest_table, + ) + assert result.exit_code != 0 + assert has_exception(result.exception, UnsupportedResourceError) + + def interval_start_does_not_exceed_interval_end(dest_uri): + schema = f"testschema_frankfurter_{get_random_string(5)}" + dest_table = f"{schema}.frankfurter_{get_random_string(5)}" + result = invoke_ingest_command( + "frankfurter://", + "exchange_rates", + dest_uri, + dest_table, + interval_start="2025-04-11", + interval_end="2025-04-10", + ) + assert result.exit_code != 0 + assert has_exception(result.exception, ValueError) + assert "Interval-end cannot be before interval-start." in str(result.exception) + + def interval_start_can_equal_interval_end(dest_uri): + schema = f"testschema_frankfurter_{get_random_string(5)}" + dest_table = f"{schema}.frankfurter_{get_random_string(5)}" + result = invoke_ingest_command( + "frankfurter://", + "exchange_rates", + dest_uri, + dest_table, + interval_start="2025-04-10", + interval_end="2025-04-10", + ) + assert result.exit_code == 0 + + def interval_start_does_not_exceed_current_date(dest_uri): + schema = f"testschema_frankfurter_{get_random_string(5)}" + dest_table = f"{schema}.frankfurter_{get_random_string(5)}" + start_date = pendulum.now().add(days=1).format("YYYY-MM-DD") + result = invoke_ingest_command( + "frankfurter://", + "exchange_rates", + dest_uri, + dest_table, + interval_start=start_date, + ) + assert result.exit_code != 0 + assert has_exception(result.exception, ValueError) + assert "Interval-start cannot be in the future." in str(result.exception) + + def interval_end_does_not_exceed_current_date(dest_uri): + schema = f"testschema_frankfurter_{get_random_string(5)}" + dest_table = f"{schema}.frankfurter_{get_random_string(5)}" + start_date = pendulum.now().subtract(days=1).format("YYYY-MM-DD") + end_date = pendulum.now().add(days=1).format("YYYY-MM-DD") + result = invoke_ingest_command( + "frankfurter://", + "exchange_rates", + dest_uri, + dest_table, + interval_start=start_date, + interval_end=end_date, + ) + assert result.exit_code != 0 + assert has_exception(result.exception, ValueError) + assert "Interval-end cannot be in the future." in str(result.exception) + + def exchange_rate_on_specific_date(dest_uri): + schema = f"testschema_frankfurter_{get_random_string(5)}" + dest_table = f"{schema}.frankfurter_{get_random_string(5)}" + start_date = "2025-01-03" + end_date = "2025-01-03" + result = invoke_ingest_command( + "frankfurter://", + "exchange_rates", + dest_uri, + dest_table, + interval_start=start_date, + interval_end=end_date, + ) + assert result.exit_code == 0, f"Ingestion failed: {result.output}" + + dest_engine = sqlalchemy.create_engine(dest_uri) + + query = f"SELECT rate FROM {dest_table} WHERE currency_code = 'GBP'" + with dest_engine.connect() as conn: + rows = conn.execute(query).fetchall() + + # Assert that the rate for GBP is 0.82993 + assert len(rows) > 0, "No data found for GBP" + assert abs(rows[0][0] - 0.82993) <= 1e-6, ( + f"Expected rate 0.82993, but got {rows[0][0]}" + ) + + return [ + invalid_source_table, + interval_start_does_not_exceed_interval_end, + interval_start_can_equal_interval_end, + interval_start_does_not_exceed_current_date, + interval_end_does_not_exceed_current_date, + exchange_rate_on_specific_date, + ] + + +@pytest.mark.parametrize( + "dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys()) +) +@pytest.mark.parametrize("test_case", frankfurter_test_cases()) +def test_frankfurter(dest, test_case): + test_case(dest.start()) + dest.stop() + + def test_version_cmd(): """ This should always be 0.0.0-dev. diff --git a/ingestr/src/frankfurter/__init__.py b/ingestr/src/frankfurter/__init__.py index 30dcbf182..629369169 100644 --- a/ingestr/src/frankfurter/__init__.py +++ b/ingestr/src/frankfurter/__init__.py @@ -1,4 +1,4 @@ -from typing import Any, Iterator, Optional +from typing import Any, Iterator import dlt from dlt.common.pendulum import pendulum @@ -13,25 +13,28 @@ max_table_nesting=0, ) def frankfurter_source( - table: str, - start_date: Optional[TAnyDateTime] = None, - end_date: Optional[TAnyDateTime] = None, + start_date: TAnyDateTime, + end_date: TAnyDateTime, ) -> Any: """ A dlt source for the frankfurter.dev API. It groups several resources (in this case frankfurter.dev API endpoints) containing various types of data: currencies, latest rates, historical rates. - - Returns the appropriate resource based on the provided parameters. """ - # Determine which resource to return based on the `table` parameter - if table == "currencies": - return currencies() + date_time = dlt.sources.incremental( + + "date", + initial_value=start_date, + end_value=end_date, + range_start="closed", + range_end="closed", + ) - elif table == "latest": - return latest() + return ( + currencies(), + latest(), + exchange_rates(start_date=date_time, end_date=end_date), - elif table == "exchange_rates": - return exchange_rates(start_date=start_date, end_date=end_date) + ) @dlt.resource( @@ -53,13 +56,13 @@ def currencies() -> Iterator[dict]: @dlt.resource( - write_disposition="replace", + write_disposition="merge", columns={ "date": {"data_type": "text"}, - "currency_name": {"data_type": "text"}, + "currency_code": {"data_type": "text"}, "rate": {"data_type": "double"}, }, - primary_key=["date", "currency_name"], # Composite primary key + primary_key=["date", "currency_code"], # Composite primary key ) def latest() -> Iterator[dict]: """ @@ -69,50 +72,54 @@ def latest() -> Iterator[dict]: url = "latest?" # Fetch data - latest_data = get_path_with_retry(url) + data = get_path_with_retry(url) # Extract rates and base currency - rates = latest_data["rates"] + rates = data["rates"] - # Prepare the date - date = pendulum.now().to_date_string() + date = pendulum.parse(data["date"]) # Add the base currency (EUR) with a rate of 1.0 yield { "date": date, - "currency_name": "EUR", + "currency_code": "EUR", "rate": 1.0, } # Add all currencies and their rates - for currency_name, rate in rates.items(): + for currency_code, rate in rates.items(): yield { "date": date, - "currency_name": currency_name, + "currency_code": currency_code, "rate": rate, } @dlt.resource( - write_disposition="replace", + write_disposition="merge", columns={ "date": {"data_type": "text"}, - "currency_name": {"data_type": "text"}, + "currency_code": {"data_type": "text"}, "rate": {"data_type": "double"}, }, - primary_key=["date", "currency_name"], # Composite primary key + primary_key=("date", "currency_code"), # Composite primary key ) def exchange_rates( - start_date: TAnyDateTime, end_date: TAnyDateTime, + start_date: dlt.sources.incremental[TAnyDateTime] = dlt.sources.incremental("date"), ) -> Iterator[dict]: """ Fetches exchange rates for a specified date range. - If only start_date is provided, fetches data for that date. + If only start_date is provided, fetches data until now. If both start_date and end_date are provided, fetches data for each day in the range. """ - start_date_str = ensure_pendulum_datetime(start_date).format("YYYY-MM-DD") - end_date_str = ensure_pendulum_datetime(end_date).format("YYYY-MM-DD") + # Ensure start_date.last_value is a pendulum.DateTime object + start_date_obj = ensure_pendulum_datetime(start_date.last_value) # type: ignore + start_date_str = start_date_obj.format("YYYY-MM-DD") + + # Ensure end_date is a pendulum.DateTime object + end_date_obj = ensure_pendulum_datetime(end_date) + end_date_str = end_date_obj.format("YYYY-MM-DD") # Compose the URL url = f"{start_date_str}..{end_date_str}?" @@ -121,22 +128,23 @@ def exchange_rates( data = get_path_with_retry(url) # Extract base currency and rates from the API response - base_currency = data["base"] rates = data["rates"] # Iterate over the rates dictionary (one entry per date) for date, daily_rates in rates.items(): + formatted_date = pendulum.parse(date) + # Add the base currency with a rate of 1.0 yield { - "date": date, - "currency_name": base_currency, + "date": formatted_date, + "currency_code": "EUR", "rate": 1.0, } # Add all other currencies and their rates - for currency_name, rate in daily_rates.items(): + for currency_code, rate in daily_rates.items(): yield { - "date": date, - "currency_name": currency_name, + "date": formatted_date, + "currency_code": currency_code, "rate": rate, } diff --git a/ingestr/src/frankfurter/helpers.py b/ingestr/src/frankfurter/helpers.py index 5c6229d84..c0c5ed6b0 100644 --- a/ingestr/src/frankfurter/helpers.py +++ b/ingestr/src/frankfurter/helpers.py @@ -8,7 +8,7 @@ def get_url_with_retry(url: str) -> StrAny: - r = requests.get(url) + r = requests.get(url, timeout=5) return r.json() # type: ignore @@ -19,7 +19,7 @@ def get_path_with_retry(path: str) -> StrAny: def validate_dates(start_date: datetime, end_date: datetime) -> None: current_date = pendulum.now() - # Check if start_date is in the future + # Check if start_date is in the futurep if start_date > current_date: raise ValueError("Interval-start cannot be in the future.") diff --git a/ingestr/src/sources.py b/ingestr/src/sources.py index e4ccb5824..8ed2506c4 100644 --- a/ingestr/src/sources.py +++ b/ingestr/src/sources.py @@ -2164,36 +2164,32 @@ def handles_incrementality(self) -> bool: return True def dlt_source(self, uri: str, table: str, **kwargs): - # start and end dates only assigned and validated for exchange_rates table - # Note: if an end date but no start date is provided, start date and end date will be set to current date - from ingestr.src.frankfurter import frankfurter_source - from ingestr.src.frankfurter.helpers import validate_dates + if kwargs.get("incremental_key"): + raise ValueError( + "Frankfurter takes care of incrementality on its own, you should not provide incremental_key" + ) - if table == "exchange_rates": - if kwargs.get("interval_start"): - start_date = ensure_pendulum_datetime(str(kwargs.get("interval_start"))) - if kwargs.get("interval_end"): - end_date = ensure_pendulum_datetime(str(kwargs.get("interval_end"))) - else: - end_date = start_date + if kwargs.get("interval_start"): + start_date = ensure_pendulum_datetime(str(kwargs.get("interval_start"))) + if kwargs.get("interval_end"): + end_date = ensure_pendulum_datetime(str(kwargs.get("interval_end"))) else: - start_date = pendulum.now() end_date = pendulum.now() - validate_dates(start_date=start_date, end_date=end_date) - - # For currencies and latest tables, set start and end dates to current date else: start_date = pendulum.now() end_date = pendulum.now() - # Validate table - if table not in ["currencies", "latest", "exchange_rates"]: - raise ValueError( - f"Table '{table}' is not supported for Frankfurter source." - ) + from ingestr.src.frankfurter import frankfurter_source + from ingestr.src.frankfurter.helpers import validate_dates + + validate_dates(start_date=start_date, end_date=end_date) - return frankfurter_source( - table=table, + src = frankfurter_source( start_date=start_date, end_date=end_date, ) + + if table not in src.resources: + raise UnsupportedResourceError(table, "Frankfurter") + + return src.with_resources(table)