-
Couldn't load subscription status.
- Fork 59
Description
Describe the bug
Issue was already described here:
PurviewOut triggers but does not seem to be able to consolidate the START and COMPLETE events.
To Reproduce
df = spark.createDataFrame([
{'a': 2, 'b': 3},
{'a': 4, 'b': 5}
])
df.write.mode("overwrite").saveAsTable("default.temp")
input_df = spark.read.table("default.temp")
transformed_df = input_df.selectExpr("a * 2 as a", "b * 2 as b")
transformed_df.write.mode("overwrite").saveAsTable("default.final_table")
Generated payloads:
Start event:
{ "eventTime": "2024-10-31T16:09:21.103Z", "producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", "eventType": "START", "run": { "runId": "0192e356-abdd-7713-af1e-1a41cc84e552", "facets": { "parent": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet", "run": { "runId": "0192e355-1c37-7e5a-aa75-1bdf5a97eb5a" }, "job": { "namespace": "default", "name": "databricks_shell" } }, "spark_properties": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", "properties": { "spark.master": "spark://10.139.64.10:7077", "spark.app.name": "Databricks Shell" } }, "processing_engine": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet", "version": "3.5.0", "name": "spark", "openlineageAdapterVersion": "1.23.0" } } }, "job": { "namespace": "default", "name": "adb-1217703170417694.14.azuredatabricks.net.atomic_replace_table_as_select.default_final_table", "facets": { "jobType": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", "processingType": "BATCH", "integration": "SPARK", "jobType": "SQL_JOB" } } }, "inputs": [ { "namespace": "dbfs", "name": "/user/hive/warehouse/temp", "facets": { "dataSource": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", "name": "dbfs", "uri": "dbfs" }, "schema": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", "fields": [ { "name": "a", "type": "long" }, { "name": "b", "type": "long" } ] }, "symlinks": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet", "identifiers": [ { "namespace": "dbfs:/user/hive/warehouse", "name": "default.temp", "type": "TABLE" } ] } }, "inputFacets": {} }, { "namespace": "dbfs", "name": "/user/hive/warehouse/temp", "facets": { "dataSource": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", "name": "dbfs", "uri": "dbfs" }, "schema": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", "fields": [ { "name": "a", "type": "long" }, { "name": "b", "type": "long" } ] }, "symlinks": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet", "identifiers": [ { "namespace": "dbfs:/user/hive/warehouse", "name": "default.temp", "type": "TABLE" } ] } }, "inputFacets": {} } ], "outputs": [] }
Complete event:
{ "eventTime": "2024-10-31T16:09:30.901Z", "producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", "eventType": "COMPLETE", "run": { "runId": "0192e356-abdd-7713-af1e-1a41cc84e552", "facets": { "parent": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet", "run": { "runId": "0192e355-1c37-7e5a-aa75-1bdf5a97eb5a" }, "job": { "namespace": "default", "name": "databricks_shell" } }, "spark_properties": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", "properties": { "spark.master": "spark://10.139.64.10:7077", "spark.app.name": "Databricks Shell" } }, "processing_engine": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet", "version": "3.5.0", "name": "spark", "openlineageAdapterVersion": "1.23.0" } } }, "job": { "namespace": "default", "name": "adb-1217703170417694.14.azuredatabricks.net.atomic_replace_table_as_select.default_final_table", "facets": { "jobType": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", "processingType": "BATCH", "integration": "SPARK", "jobType": "SQL_JOB" } } }, "inputs": [ { "namespace": "dbfs", "name": "/user/hive/warehouse/temp", "facets": { "dataSource": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", "name": "dbfs", "uri": "dbfs" }, "schema": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", "fields": [ { "name": "a", "type": "long" }, { "name": "b", "type": "long" } ] }, "symlinks": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet", "identifiers": [ { "namespace": "dbfs:/user/hive/warehouse", "name": "default.temp", "type": "TABLE" } ] } }, "inputFacets": {} } ], "outputs": [ { "namespace": "dbfs", "name": "/user/hive/warehouse/final_table", "facets": { "dataSource": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", "name": "dbfs", "uri": "dbfs" }, "schema": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", "fields": [ { "name": "a", "type": "long" }, { "name": "b", "type": "long" } ] }, "storage": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/StorageDatasetFacet.json#/$defs/StorageDatasetFacet", "storageLayer": "unity", "fileFormat": "parquet" }, "symlinks": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet", "identifiers": [ { "namespace": "dbfs:/user/hive/warehouse", "name": "default.final_table", "type": "TABLE" } ] }, "lifecycleStateChange": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet", "lifecycleStateChange": "OVERWRITE" } }, "outputFacets": { "outputStatistics": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json#/$defs/OutputStatisticsOutputDatasetFacet", "rowCount": 0, "size": 0 } } } ] }
Expected behavior
PurviewOut consolidating the events and sending them to Purview to visualize the lineage.
Logs
N/A
