-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
What's the issue?
We don't see all expected connections assets in Dagster UI. Some jobs are missing airbyte Assets
What did you expect to happen?
I want all the connections to be available in my asset list.
How to reproduce?
I guess you just need to create more than 20 connections, but I'm not really sure this will reproduce the issue 100%.
Dagster version
1.11.12
Deployment type
Dagster Helm chart
Deployment details
Dagster and Airbyte are deployed on an EKS cluster, both in their namespace.
Additional information
All this happened after update of Airbyte to latest version -1.8.3 when we updated- and we modified our code to use the new version of the dagster-airbyte library : https://docs.dagster.io/api/libraries/dagster-airbyte
Here is my airbyte asset code:
import re
import dagster as dg
from dagster_airbyte import (
AirbyteConnectionTableProps,
DagsterAirbyteTranslator,
build_airbyte_assets_definitions,
)
from my_project.resources.airbyte import airbyte_workspace
RE_PATTERN = r"^[A-Za-z0-9_]+$"
class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator):
"""Custom Dagster Airbyte translator re-defining asset keys for Airbyte assets."""
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
"""Get the asset spec."""
# Get the default spec from the parent class
default_spec = super().get_asset_spec(props)
custom_key = dg.AssetKey(
[
"custom_prefix",
props.connection_name.split()[0].lower(),
props.stream_name,
]
)
group_name = re.sub(r"[^A-Za-z0-9_]+", "_", props.connection_name).lower()
# Return spec with custom asset key
return default_spec.replace_attributes(key=custom_key, group_name=group_name)
def connection_selector(airbyte_conn_metadata):
"""Handle connection selector."""
return not any(
[
not bool(re.match(RE_PATTERN, stream)) # type: ignore
for stream in [airbyte_conn_metadata.name.split()[0].lower()]
]
)
# Build assets with custom translator and connection filter
airbyte_assets = build_airbyte_assets_definitions(
workspace=airbyte_workspace,
dagster_airbyte_translator=CustomDagsterAirbyteTranslator(),
connection_selector_fn=connection_selector, # type: ignore
)
I've tried to modify the resource.py file to print the connection names it returns with get_connections.
Here is the code modified :
@cached_method
def fetch_airbyte_workspace_data(
self,
) -> AirbyteWorkspaceData:
"""Retrieves all Airbyte content from the workspace and returns it as a AirbyteWorkspaceData object.
Returns:
AirbyteWorkspaceData: A snapshot of the Airbyte workspace's content.
"""
connections_by_id = {}
destinations_by_id = {}
client = self.get_client()
client.validate_workspace_id()
connections = client.get_connections()
print (f"connections: {len(connections)}")
for connection in connections:
print (f"connection: {connection['name']}")
Just to see. It returns the correct number of connections, which is 24, but I have duplicates in the connection names.
I only have a suspicion on the filter in my connection_selector here, which may be bad, but I'm not sure of that.
I also wonder if the problem can come from the Airbyte API limit of 20 by default ?
Does anyone have any idea of what is happenning here ?
Message from the maintainers
Impacted by this issue? Give it a 👍! We factor engagement into prioritization.