Most Azure users looking to store and analyze OPC UA PubSub telemetry data sent from industrial sites via a cloud broker now have a powerful cloud store and analytics platform in Azure Databricks. Databricks provides a unified analytics platform built on Apache Spark, with native support for Delta Lake, structured streaming, and scalable data engineering — making it an excellent fit for industrial IoT workloads.
This article walks you through:
- Setting up Delta Lake tables for OPC UA PubSub telemetry and metadata
- Ingesting data from Azure Event Hubs using Structured Streaming
- Processing and expanding OPC UA PubSub messages with PySpark
- Creating a last-known-value (LKV) view for OPC UA metadata
- Importing OPC UA Information Models from the UA Cloud Library
- An Azure Databricks workspace in your Azure subscription
- An Azure Event Hub (or MQTT broker with Event Hub-compatible endpoint) receiving OPC UA PubSub messages from your industrial sites (e.g. via UA Cloud Publisher)
- A login to the UA Cloud Library, hosted by the OPC Foundation — register for free at: https://uacloudlibrary.opcfoundation.org/Identity/Account/Register
First, create the Delta Lake tables that will hold your OPC UA data. Run the following SQL commands in a Databricks notebook:
-- Create a landing table for raw OPC UA telemetry
CREATE TABLE IF NOT EXISTS opcua_raw (
payload STRING
)
USING DELTA;
-- Create an intermediate table to unbatch OPC UA PubSub messages
CREATE TABLE IF NOT EXISTS opcua_intermediate (
DataSetWriterID STRING,
Timestamp TIMESTAMP,
Payload STRING
)
USING DELTA;
-- Create the final OPC UA telemetry table
CREATE TABLE IF NOT EXISTS opcua_telemetry (
DataSetWriterID STRING,
Timestamp TIMESTAMP,
Name STRING,
Value STRING
)
USING DELTA;
-- Create a landing table for raw OPC UA metadata
CREATE TABLE IF NOT EXISTS opcua_metadata_raw (
payload STRING
)
USING DELTA;
-- Create the OPC UA metadata table
CREATE TABLE IF NOT EXISTS opcua_metadata (
DataSetWriterID STRING,
Timestamp TIMESTAMP,
Name STRING,
Type STRING,
DisplayName STRING,
Workcell STRING,
Line STRING,
Area STRING,
Site STRING,
Enterprise STRING,
NamespaceUri STRING,
NodeId STRING
)
USING DELTA;Use Databricks Structured Streaming to continuously ingest OPC UA PubSub messages from Azure Event Hubs into the raw landing tables.
# Event Hub connection configuration for telemetry
eh_telemetry_conf = {
"eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs
.EventHubsUtils.encrypt(
"<YOUR_EVENTHUB_TELEMETRY_CONNECTION_STRING>"
),
"eventhubs.consumerGroup": "$Default"
}
# Read the telemetry stream from Event Hub
telemetry_stream = (
spark.readStream
.format("eventhubs")
.options(**eh_telemetry_conf)
.load()
.selectExpr("CAST(body AS STRING) AS payload")
)
# Write to the raw telemetry Delta table
(
telemetry_stream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/opcua_raw")
.table("opcua_raw")
)# Event Hub connection configuration for metadata
eh_metadata_conf = {
"eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs
.EventHubsUtils.encrypt(
"<YOUR_EVENTHUB_METADATA_CONNECTION_STRING>"
),
"eventhubs.consumerGroup": "$Default"
}
# Read the metadata stream from Event Hub
metadata_stream = (
spark.readStream
.format("eventhubs")
.options(**eh_metadata_conf)
.load()
.selectExpr("CAST(body AS STRING) AS payload")
)
# Write to the raw metadata Delta table
(
metadata_stream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/opcua_metadata_raw")
.table("opcua_metadata_raw")
)Once raw data is landing in your Delta tables, use PySpark to expand the nested OPC UA PubSub JSON structure into the intermediate and final tables.
This step unbatches the Messages array inside each OPC UA PubSub network message:
from pyspark.sql.functions import from_json, explode, col, to_timestamp
from pyspark.sql.types import (
StructType, StructField, StringType, ArrayType, MapType
)
# Read new rows from the raw table
raw_df = spark.readStream.format("delta").table("opcua_raw")
# Parse the JSON payload
parsed_df = raw_df.withColumn(
"payload_json", from_json(col("payload"), "struct<Messages:array<struct<DataSetWriterId:string,Timestamp:string,Payload:string>>>")
)
# Explode the Messages array
intermediate_df = (
parsed_df
.select(explode(col("payload_json.Messages")).alias("msg"))
.select(
col("msg.DataSetWriterId").alias("DataSetWriterID"),
to_timestamp(col("msg.Timestamp")).alias("Timestamp"),
col("msg.Payload").alias("Payload")
)
)
# Write to the intermediate Delta table
(
intermediate_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/opcua_intermediate")
.table("opcua_intermediate")
)This step pivots the key-value payload into individual telemetry rows:
from pyspark.sql.functions import from_json, explode, col, map_keys, map_values
# Read from intermediate table
intermediate_stream = spark.readStream.format("delta").table("opcua_intermediate")
# Parse the Payload as a map and explode the keys
telemetry_df = (
intermediate_stream
.withColumn("payload_map", from_json(
col("Payload"),
MapType(StringType(), "struct<Value:string>")
))
.select(
col("DataSetWriterID"),
col("Timestamp"),
explode(col("payload_map")).alias("Name", "val_struct")
)
.select(
col("DataSetWriterID"),
col("Timestamp"),
col("Name"),
col("val_struct.Value").alias("Value")
)
)
# Write to the final telemetry Delta table
(
telemetry_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/opcua_telemetry")
.table("opcua_telemetry")
)OPC UA PubSub metadata messages contain semantic information encoded in the Name field using the pattern:
<prefix>:<Workcell>.<Line>.<Area>.<Site>.<Enterprise>;nsu=<NamespaceUri>;<NodeId>
from pyspark.sql.functions import from_json, col, regexp_extract, to_timestamp
# Read from metadata raw table
metadata_raw_stream = spark.readStream.format("delta").table("opcua_metadata_raw")
# Parse the JSON payload
metadata_parsed = metadata_raw_stream.withColumn(
"p", from_json(col("payload"),
"struct<DataSetWriterId:string,Timestamp:string,"
"MetaData:struct<Name:string,"
"Fields:array<struct<Name:string,Description:string>>>>"
)
)
# Extract fields using regex on the Name pattern
metadata_df = (
metadata_parsed
.select(
col("p.DataSetWriterId").alias("DataSetWriterID"),
to_timestamp(col("p.Timestamp")).alias("Timestamp"),
col("p.MetaData.Name").alias("Name"),
col("p.MetaData.Fields")[0]["Description"].alias("Type"),
col("p.MetaData.Fields")[0]["Name"].alias("DisplayName"),
regexp_extract(col("p.MetaData.Name"), r":([^.]+)\.", 1).alias("Workcell"),
regexp_extract(col("p.MetaData.Name"), r":(?:[^.]+)\.([^.]+)\.", 1).alias("Line"),
regexp_extract(col("p.MetaData.Name"), r":(?:[^.]+)\.(?:[^.]+)\.([^.]+)\.", 1).alias("Area"),
regexp_extract(col("p.MetaData.Name"), r":(?:[^.]+)\.(?:[^.]+)\.(?:[^.]+)\.([^.]+)\.", 1).alias("Site"),
regexp_extract(col("p.MetaData.Name"), r":(?:[^.]+)\.(?:[^.]+)\.(?:[^.]+)\.(?:[^.]+)\.([^;]+);", 1).alias("Enterprise"),
regexp_extract(col("p.MetaData.Name"), r";nsu=([^;]+);", 1).alias("NamespaceUri"),
regexp_extract(col("p.MetaData.Name"), r";nsu=[^;]+;(.+)$", 1).alias("NodeId"),
)
)
# Write to the metadata Delta table
(
metadata_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/opcua_metadata")
.table("opcua_metadata")
)In Azure Data Explorer, this was accomplished with a materialized view using arg_max. In Databricks, you can achieve the same result with a SQL view or a scheduled merge.
CREATE OR REPLACE VIEW opcua_metadata_lkv AS
SELECT m.*
FROM opcua_metadata m
INNER JOIN (
SELECT Name, DataSetWriterID, MAX(Timestamp) AS MaxTimestamp
FROM opcua_metadata
GROUP BY Name, DataSetWriterID
) latest
ON m.Name = latest.Name
AND m.DataSetWriterID = latest.DataSetWriterID
AND m.Timestamp = latest.MaxTimestamp;If you are using Delta Live Tables, you can define a streaming live table that maintains the LKV automatically:
import dlt
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window
@dlt.table(comment="Last known value for OPC UA metadata")
def opcua_metadata_lkv():
w = Window.partitionBy("Name", "DataSetWriterID").orderBy(col("Timestamp").desc())
return (
dlt.read("opcua_metadata")
.withColumn("rn", row_number().over(w))
.filter(col("rn") == 1)
.drop("rn")
)With your data flowing into Delta Lake, you can query it using SQL or PySpark. Here is an example query that joins metadata and telemetry — equivalent to the ADX KQL queries:
-- Find the status of all assembly stations in Munich in the last hour
SELECT
m.Name,
m.DisplayName,
m.Workcell,
m.Line,
t.Timestamp,
t.Value
FROM opcua_metadata_lkv m
INNER JOIN opcua_telemetry t
ON m.DataSetWriterID = t.DataSetWriterID
WHERE m.Name LIKE '%assembly%'
AND m.Name LIKE '%munich%'
AND t.Name = 'Status'
AND t.Timestamp > current_timestamp() - INTERVAL 1 HOUR;Many customers are unaware that they can import entire OPC UA Information Models into their analytics platform from the UA Cloud Library. This provides richer semantics beyond what OPC UA PubSub metadata alone can offer, including:
- Full Information Model context — not just the published data points, but the entire model hierarchy
- Complex type definitions and references to other data needed for deeper analysis
- Visibility into all available telemetry from your sites, enabling informed decisions about what to publish to the cloud
- Register for free: https://uacloudlibrary.opcfoundation.org/Identity/Account/Register
- Browse available Information Models: https://uacloudlibrary.opcfoundation.org/Explorer
- Find the unique ID via the REST API: https://uacloudlibrary.opcfoundation.org/infomodel/namespaces
- For example, the "Robotics" Information Model has the unique ID
4172981173.
- For example, the "Robotics" Information Model has the unique ID
In Azure Data Explorer, this was done using the evaluate http_request() operator. In Databricks, you can use a PySpark notebook with the requests library:
import requests
import base64
import xml.etree.ElementTree as ET
from pyspark.sql import Row
# --- Configuration ---
CLOUD_LIBRARY_USERNAME = "<your-cloud-library-username>"
CLOUD_LIBRARY_PASSWORD = "<your-cloud-library-password>"
INFORMATION_MODEL_ID = "4172981173" # e.g., Robotics
# --- Download the Information Model ---
url = f"https://uacloudlibrary.opcfoundation.org/infomodel/download/{INFORMATION_MODEL_ID}"
credentials = base64.b64encode(
f"{CLOUD_LIBRARY_USERNAME}:{CLOUD_LIBRARY_PASSWORD}".encode()
).decode()
headers = {
"Accept": "text/plain",
"Authorization": f"Basic {credentials}"
}
response = requests.get(url, headers=headers)
response.raise_for_status()
model_data = response.json()
# --- Extract metadata ---
title = model_data.get("title", "")
contributor = model_data.get("contributor", {}).get("name", "")
nodeset_xml = model_data.get("nodeset", {}).get("nodesetXml", "")
# --- Parse the OPC UA Nodeset XML ---
root = ET.fromstring(nodeset_xml)
ns = {"ua": "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd"}
rows = []
for var in root.findall(".//ua:UAVariable", ns):
node_id = var.get("NodeId", "")
browse_name = var.get("BrowseName", "")
data_type = var.get("DataType", "")
display_name_elem = var.find("ua:DisplayName", ns)
display_name = display_name_elem.text if display_name_elem is not None and display_name_elem.text else ""
rows.append(Row(
Title=title,
Contributor=contributor,
NodeId=node_id,
DisplayName=display_name,
BrowseName=browse_name,
DataType=data_type
))
# --- Create a DataFrame and save as a Delta table ---
if rows:
info_model_df = spark.createDataFrame(rows)
info_model_df.write.format("delta").mode("overwrite").saveAsTable("opcua_information_model")
print(f"Successfully imported {len(rows)} nodes from '{title}' into opcua_information_model table.")
display(info_model_df.limit(20))
else:
print("No UAVariable nodes found in the Information Model.")And voilà! You have just imported an entire OPC UA Information Model into a Delta Lake table in Azure Databricks, ready to be joined with your telemetry and metadata for richer analytics.
Azure Databricks offers a flexible, scalable, and unified analytics platform for OPC UA data. With Delta Lake, Structured Streaming, and the rich PySpark/SQL ecosystem, you get all the capabilities needed to ingest, process, contextualize, and analyze your industrial data — from the shop floor to the cloud.