Skip to content

Commit f5f6c34

Browse files
Merge pull request #14 from msdotnetclr/issue-13
introduced new source detail option for eventhub source: eventhub.accessKeySecretName Details can be found [here](#13)
2 parents 28ddde6 + ecbbe81 commit f5f6c34

File tree

13 files changed

+85
-8
lines changed

13 files changed

+85
-8
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
[Please read through the Keep a Changelog (~5min)](https://keepachangelog.com/en/1.0.0/).
1111

12+
## [v0.0.4] - 2023-10-09
13+
### Added
14+
- Functionality to introduce an new option for event hub configuration. Namely a source_details option 'eventhub.accessKeySecretName' to properly construct the eh_shared_key_value properly. Without this option, there were errors while connecting to the event hub service (linked to [issue-13 - java.lang.RuntimeException: non-nullable field authBytes was serialized as null #13](https://github.com/databrickslabs/dlt-meta/issues/13))
15+
1216
## [v0.0.3] - 2023-06-07
1317
### Fixed
1418
- infer datatypes from sequence_by to __START_AT, __END_AT for apply changes API

docs/content/getting_started/additionals.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export DATABRICKS_TOKEN=<DATABRICKS TOKEN> # Account needs permission to create
2323
5. Run integration test against cloudfile or eventhub or kafka using below options:
2424
5a. Run the command for cloudfiles ```python integration-tests/run-integration-test.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=cloudfiles --dbfs_path=dbfs:/tmp/DLT-META/```
2525

26-
5b. Run the command for eventhub ```python integration-tests/run-integration-test.py --cloud_provider_name=azure --dbr_version=11.3.x-scala2.12 --source=eventhub --dbfs_path=dbfs:/tmp/DLT-META/ --eventhub_name=iot --eventhub_secrets_scope_name=eventhubs_creds --eventhub_namespace=int_test-standard --eventhub_port=9093 --eventhub_producer_accesskey_name=producer --eventhub_consumer_accesskey_name=consumer```
26+
5b. Run the command for eventhub ```python integration-tests/run-integration-test.py --cloud_provider_name=azure --dbr_version=11.3.x-scala2.12 --source=eventhub --dbfs_path=dbfs:/tmp/DLT-META/ --eventhub_name=iot --eventhub_secrets_scope_name=eventhubs_creds --eventhub_namespace=int_test-standard --eventhub_port=9093 --eventhub_producer_accesskey_name=producer --eventhub_producer_accesskey_secret_name=producer --eventhub_consumer_accesskey_name=consumer --eventhub_consumer_accesskey_secret_name=consumer```
2727

2828
For eventhub integration tests, the following are the prerequisites:
2929
1. Needs eventhub instance running
@@ -36,7 +36,9 @@ export DATABRICKS_TOKEN=<DATABRICKS TOKEN> # Account needs permission to create
3636
3. Provide eventhub port : --eventhub_port
3737
4. Provide databricks secret scope name : --eventhub_secrets_scope_name
3838
5. Provide eventhub producer access key name : --eventhub_producer_accesskey_name
39-
6. Provide eventhub access key name : --eventhub_consumer_accesskey_name
39+
6. Provide eventhub consumer access key name : --eventhub_consumer_accesskey_name
40+
7. Provide eventhub producer access key secret name : --eventhub_producer_accesskey_secret_name
41+
8. Provide eventhub consumer access key secret name : --eventhub_consumer_accesskey_secret_name
4042

4143

4244
5c. Run the command for kafka ```python3 integration-tests/run-integration-test.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=kafka --dbfs_path=dbfs:/tmp/DLT-META/ --kafka_topic_name=dlt-meta-integration-test --kafka_broker=host:9092```

docs/content/getting_started/metadatapreperation.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ draft: false
1717
| data_flow_id | This is unique identifer for pipeline |
1818
| data_flow_group | This is group identifer for launching multiple pipelines under single DLT |
1919
| source_format | Source format e.g `cloudFiles`, `eventhub`, `kafka`, `delta` |
20-
| source_details | This map Type captures all source details for cloudfiles = `source_schema_path`, `source_path_{env}`, `source_database` and for eventhub= `source_schema_path` , `eventhub.accessKeyName`, `eventhub.name` , `eventhub.secretsScopeName` , `kafka.sasl.mechanism`, `kafka.security.protocol`, `eventhub.namespace`, `eventhub.port`. For Source schema file spark DDL schema format parsing is supported <br> In case of custom schema format then write schema parsing function `bronze_schema_mapper(schema_file_path, spark):Schema` and provide to `OnboardDataflowspec` initialization <br> .e.g `onboardDataFlowSpecs = OnboardDataflowspec(spark, dict_obj,bronze_schema_mapper).onboardDataFlowSpecs()` |
20+
| source_details | This map Type captures all source details for cloudfiles = `source_schema_path`, `source_path_{env}`, `source_database` and for eventhub= `source_schema_path` , `eventhub.accessKeyName`, `eventhub.accessKeySecretName`, `eventhub.name` , `eventhub.secretsScopeName` , `kafka.sasl.mechanism`, `kafka.security.protocol`, `eventhub.namespace`, `eventhub.port`. For Source schema file spark DDL schema format parsing is supported <br> In case of custom schema format then write schema parsing function `bronze_schema_mapper(schema_file_path, spark):Schema` and provide to `OnboardDataflowspec` initialization <br> .e.g `onboardDataFlowSpecs = OnboardDataflowspec(spark, dict_obj,bronze_schema_mapper).onboardDataFlowSpecs()` |
2121
| bronze_database_{env} | Delta lake bronze database name. |
2222
| bronze_table | Delta lake bronze table name |
2323
| bronze_reader_options | Reader options which can be provided to spark reader <br> e.g multiline=true,header=true in json format |

examples/onboarding.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
"source_schema_path": "{dbfs_path}/integration-tests/resources/eventhub_iot_schema.ddl",
8989
"eventhub.accessKeyName": "{eventhub_accesskey_name}",
9090
"eventhub.name": "{eventhub_name}",
91+
"eventhub.accessKeySecretName": "{eventhub_accesskey_secret_name}",
9192
"eventhub.secretsScopeName": "{eventhub_secrets_scope_name}",
9293
"kafka.sasl.mechanism": "PLAIN",
9394
"kafka.security.protocol": "SASL_SSL",
@@ -120,4 +121,4 @@
120121
"silver_table_path_prd": "",
121122
"silver_transformation_json_it": ""
122123
}
123-
]
124+
]

integration-tests/conf/dlt-meta/eventhub-onboarding.template

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
"source_schema_path": "{dbfs_path}/integration-tests/resources/eventhub_iot_schema.ddl",
99
"eventhub.accessKeyName": "{eventhub_accesskey_name}",
1010
"eventhub.name": "{eventhub_name}",
11+
"eventhub.accessKeySecretName": "{eventhub_accesskey_secret_name}",
1112
"eventhub.secretsScopeName": "{eventhub_secrets_scope_name}",
1213
"kafka.sasl.mechanism": "PLAIN",
1314
"kafka.security.protocol": "SASL_SSL",
@@ -16,7 +17,7 @@
1617
},
1718
"bronze_reader_options": {
1819
"maxOffsetsPerTrigger": "50000",
19-
"startingOffsets": "latest",
20+
"startingOffsets": "earliest",
2021
"failOnDataLoss": "false",
2122
"kafka.request.timeout.ms": "60000",
2223
"kafka.session.timeout.ms": "60000"
53 Bytes
Binary file not shown.

integration-tests/run-integration-test.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ def create_eventhub_workflow_spec(args, job_spec_dict):
212212
"eventhub_namespace": args.__getattribute__("eventhub_namespace"),
213213
"eventhub_secrets_scope_name": args.__getattribute__("eventhub_secrets_scope_name"),
214214
"eventhub_accesskey_name": args.__getattribute__("eventhub_producer_accesskey_name"),
215+
"eventhub_accesskey_secret_name": args.__getattribute__("eventhub_producer_accesskey_secret_name"),
215216
"eventhub_input_data": f"/{dbfs_file_path}/integration-tests/resources/data/iot/iot.json"
216217
}
217218
}
@@ -498,6 +499,7 @@ def create_eventhub_onboarding(args, eventhub_template, dbfs_tmp_path, run_id):
498499
onboard_obj = json.load(f)
499500
eventhub_name = args.__getattribute__("eventhub_name").lower()
500501
eventhub_accesskey_name = args.__getattribute__("eventhub_consumer_accesskey_name").lower()
502+
eventhub_accesskey_secret_name = args.__getattribute__("eventhub_consumer_accesskey_secret_name").lower()
501503
eventhub_secrets_scope_name = args.__getattribute__("eventhub_secrets_scope_name").lower()
502504
eventhub_namespace = args.__getattribute__("eventhub_namespace").lower()
503505
eventhub_port = args.__getattribute__("eventhub_port").lower()
@@ -511,6 +513,8 @@ def create_eventhub_onboarding(args, eventhub_template, dbfs_tmp_path, run_id):
511513
data_flow[key][source_key] = source_value.format(eventhub_name=eventhub_name)
512514
if 'eventhub_accesskey_name' in source_value:
513515
data_flow[key][source_key] = source_value.format(eventhub_accesskey_name=eventhub_accesskey_name)
516+
if 'eventhub_accesskey_secret_name' in source_value:
517+
data_flow[key][source_key] = source_value.format(eventhub_accesskey_secret_name=eventhub_accesskey_secret_name)
514518
if 'eventhub_secrets_scope_name' in source_value:
515519
data_flow[key][source_key] = source_value.format(eventhub_secrets_scope_name=eventhub_secrets_scope_name)
516520
if 'eventhub_nmspace' in source_value:
@@ -662,6 +666,8 @@ def process_arguments():
662666
parser.add_argument("--eventhub_name", help="Provide eventhub_name e.g --eventhub_name=iot")
663667
parser.add_argument("--eventhub_producer_accesskey_name", help="Provide access key that has write permission on the eventhub e.g --eventhub_producer_accesskey_name=iotProducerAccessKey")
664668
parser.add_argument("--eventhub_consumer_accesskey_name", help="Provide access key that has read permission on the eventhub e.g --eventhub_consumer_accesskey_name=iotConsumerAccessKey")
669+
parser.add_argument("--eventhub_producer_accesskey_secret_name", help="Provide name of the secret that stores access key with write permission on the eventhub. Optional if same as `eventhub_producer_accesskey_name` e.g --eventhub_producer_accesskey_secret_name=iotProducerAccessKey")
670+
parser.add_argument("--eventhub_consumer_accesskey_secret_name", help="Provide name of the secret that stores access key with read permission on the eventhub. Optional if same as `eventhub_consumer_accesskey_name` e.g --eventhub_consumer_accesskey_secret_name=iotConsumerAccessKey")
665671
parser.add_argument("--eventhub_secrets_scope_name",
666672
help="Provide eventhub_secrets_scope_name e.g --eventhub_secrets_scope_name=eventhubs_creds")
667673
parser.add_argument("--eventhub_namespace", help="Provide eventhub_namespace e.g --eventhub_namespace=topic-standard")
@@ -684,7 +690,15 @@ def process_arguments():
684690
if source.lower() not in supported_sources:
685691
raise Exception("Invalid value for --source! Supported values: --source=cloudfiles")
686692
if source.lower() == "eventhub":
687-
eventhub_madatory_args = ["eventhub_name", "eventhub_producer_accesskey_name", "eventhub_consumer_accesskey_name", "eventhub_secrets_scope_name", "eventhub_namespace", "eventhub_port"]
693+
eventhub_madatory_args = ["eventhub_name",
694+
"eventhub_producer_accesskey_name",
695+
"eventhub_consumer_accesskey_name",
696+
"eventhub_secrets_scope_name",
697+
"eventhub_producer_accesskey_secret_name",
698+
"eventhub_consumer_accesskey_secret_name",
699+
"eventhub_namespace",
700+
"eventhub_port"
701+
]
688702
check_mandatory_arg(args, eventhub_madatory_args)
689703
if source.lower() == "kafka":
690704
kafka_madatory_args = ["kafka_topic_name", "kafka_broker"]

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
"""
2020
setup(
2121
name="dlt_meta",
22-
version="0.0.3",
22+
version="0.0.4",
2323
python_requires=">=3.8",
2424
setup_requires=["wheel>=0.37.1,<=0.41.2"],
2525
install_requires=INSTALL_REQUIRES,

src/pipeline_readers.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,12 @@ def get_eventhub_kafka_options(spark, bronze_dataflow_spec):
120120
eh_port = bronze_dataflow_spec.sourceDetails.get("eventhub.port")
121121
eh_name = bronze_dataflow_spec.sourceDetails.get("eventhub.name")
122122
eh_shared_key_name = bronze_dataflow_spec.sourceDetails.get("eventhub.accessKeyName")
123+
secret_name = bronze_dataflow_spec.sourceDetails.get("eventhub.accessKeySecretName")
124+
if not secret_name:
125+
# set default value if "eventhub.accessKeySecretName" is not specified
126+
secret_name = eh_shared_key_name
123127
secret_scope = bronze_dataflow_spec.sourceDetails.get("eventhub.secretsScopeName")
124-
eh_shared_key_value = dbutils.secrets.get(secret_scope, eh_shared_key_name)
128+
eh_shared_key_value = dbutils.secrets.get(secret_scope, secret_name)
125129
eh_shared_key_value = f"SharedAccessKeyName={eh_shared_key_name};SharedAccessKey={eh_shared_key_value}"
126130
eh_conn_str = f"Endpoint=sb://{eh_namespace}.servicebus.windows.net/;{eh_shared_key_value}"
127131
eh_kafka_str = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule"

tests/resources/onboarding.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
"source_schema_path": "tests/resources/schema/eventhub_iot_schema.ddl",
117117
"eventhub.accessKeyName": "iotIngestionAccessKey",
118118
"eventhub.name": "iot",
119+
"eventhub.accessKeySecretName": "iotIngestionAccessKey",
119120
"eventhub.secretsScopeName": "eventhubs_creds",
120121
"kafka.sasl.mechanism": "PLAIN",
121122
"kafka.security.protocol": "SASL_SSL",

0 commit comments

Comments
 (0)