Open
Description
What's the issue or suggestion?
Issue
According to document, we'll implement an custom I/O manager to handle partitions
class MyPartitionedIOManager(IOManager):
def _get_path(self, context) -> str:
if context.has_partition_key:
return "/".join(context.asset_key.path + [context.asset_partition_key])
else:
return "/".join(context.asset_key.path)
def handle_output(self, context: OutputContext, obj):
write_csv(self._get_path(context), obj)
def load_input(self, context: InputContext):
return read_csv(self._get_path(context))
If both partitioned asset
and un-partitioned asset
materialize at the same time, the code above will introduce error:
- The
partitioned asset
can work normally because it has bothcontext.has_partition_key=True
and value forcontext.asset_partition_key
- The
un-partitioned asset
will be crashed because it hascontext.has_partition_key=True
, but has no value forcontext.asset_partition_key
Suggestion
Instead of context.has_partition_key
to check whether an asset has partitions or not, we should use context.has_asset_partitions
class MyPartitionedIOManager(IOManager):
def _get_path(self, context) -> str:
if context.has_asset_partitions:
return "/".join(context.asset_key.path + [context.asset_partition_key])
else:
return "/".join(context.asset_key.path)
def handle_output(self, context: OutputContext, obj):
write_csv(self._get_path(context), obj)
def load_input(self, context: InputContext):
return read_csv(self._get_path(context))
Additional information
Dagster versioin: 1.9.3
Message from the maintainers
Impacted by this issue? Give it a 👍! We factor engagement into prioritization.