|
| 1 | +# pip install streamdaq |
| 2 | + |
| 3 | +import pathway as pw |
| 4 | +from streamdaq import DaQMeasures as dqm |
| 5 | +from streamdaq import CompactData, Windows, StreamDaQ |
| 6 | + |
| 7 | +# Configuration constants for compact data structure |
| 8 | +FIELDS_COLUMN = "fields" |
| 9 | +FIELDS = ["temperature", "humidity", "pressure"] # simulating IoT sensor measurements |
| 10 | +VALUES_COLUMN = "values" |
| 11 | +TIMESTAMP_COLUMN = "timestamp" |
| 12 | + |
| 13 | + |
| 14 | +# We first need to define a data source sending compact data. |
| 15 | +# If you already have one, skip this part! |
| 16 | +class CompactDataSource(pw.io.python.ConnectorSubject): |
| 17 | + """ |
| 18 | + Simulates an IoT sensor network sending compact data format. |
| 19 | + |
| 20 | + Example compact format: |
| 21 | + { |
| 22 | + "timestamp": 1, |
| 23 | + "fields": ["temperature", "humidity", "pressure"], |
| 24 | + "values": [23.5, 65.2, 1013.25] |
| 25 | + } |
| 26 | +
|
| 27 | + vs. traditional native format: |
| 28 | + {"timestamp": 1, "temperature": 23.5, "humidity": 65.2, "pressure": 1013.25} |
| 29 | + """ |
| 30 | + |
| 31 | + def run(self): |
| 32 | + nof_fields = len(FIELDS) |
| 33 | + nof_compact_rows = 5 # how many compact data rows to send in this simulation |
| 34 | + timestamp = value = 0 |
| 35 | + for _ in range(nof_compact_rows): |
| 36 | + message = { |
| 37 | + TIMESTAMP_COLUMN: timestamp, |
| 38 | + FIELDS_COLUMN: FIELDS, |
| 39 | + VALUES_COLUMN: [value + i for i in range(nof_fields)] |
| 40 | + # VALUES_COLUMN: [value + i if (value + i) % 5 > 0 else None for i in range(nof_fields)] |
| 41 | + # replace with the above line to make it more spicy by adding a missing reading every five ;) |
| 42 | + } |
| 43 | + value += len(FIELDS) |
| 44 | + timestamp += 1 |
| 45 | + self.next(**message) |
| 46 | + |
| 47 | + |
| 48 | +# Define schema for the compact data structure |
| 49 | +schema_dict = { |
| 50 | + TIMESTAMP_COLUMN: int, |
| 51 | + FIELDS_COLUMN: list[str], |
| 52 | + VALUES_COLUMN: list[int | None], # Supports missing values (None) for real-world scenarios |
| 53 | +} |
| 54 | +schema = pw.schema_from_dict(schema_dict) |
| 55 | + |
| 56 | +# Create the compact data stream (simulating IoT sensor network) |
| 57 | +compact_data_stream = pw.io.python.read( |
| 58 | + CompactDataSource(), |
| 59 | + schema=schema, |
| 60 | +) |
| 61 | + |
| 62 | +print("The initial data source sends compact data, like this:") |
| 63 | +pw.debug.compute_and_print(compact_data_stream) |
| 64 | + |
| 65 | +# If you already have a compact data source, your job starts here! |
| 66 | + |
| 67 | +# Step 1: Configure Stream DaQ for compact data monitoring |
| 68 | +# Stream DaQ automatically handles the transformation from compact to native format, |
| 69 | +# eliminating the need for manual data preprocessing that would typically require: |
| 70 | +# - Unpacking compact rows into individual field records |
| 71 | +# - Handling missing values and data type conversions |
| 72 | +# - Managing temporal alignment across different fields |
| 73 | +daq = StreamDaQ().configure( |
| 74 | + window=Windows.sliding(duration=3, hop=1, origin=0), # 3-second sliding window with 1-second hop |
| 75 | + source=compact_data_stream, |
| 76 | + time_column=TIMESTAMP_COLUMN, |
| 77 | + # Just define how your compact data is structured; Stream DaQ takes care of all the rest! |
| 78 | + # This CompactData configuration tells Stream DaQ how to interpret your format |
| 79 | + compact_data=CompactData() |
| 80 | + .with_fields_column(FIELDS_COLUMN) |
| 81 | + .with_values_column(VALUES_COLUMN) |
| 82 | + .with_values_dtype(int), |
| 83 | +) |
| 84 | + |
| 85 | +# Step 2: Define data quality measures for IoT sensor monitoring |
| 86 | +# Notice how we can directly reference individual fields (temperature, humidity, pressure) |
| 87 | +# even though they arrive in compact format - Stream DaQ handles the unpacking automatically! |
| 88 | +daq.add(dqm.count("pressure"), name="readings") \ |
| 89 | + .add(dqm.missing_count("temperature") |
| 90 | + + dqm.missing_count("pressure") |
| 91 | + + dqm.missing_count("humidity"), # Measures the total missing readings per window in all fields |
| 92 | + assess="<2", # We can tolerate at most one missing reading per window |
| 93 | + name="missing_readings", |
| 94 | + ). \ |
| 95 | + add(dqm.is_frozen("humidity"), name="frozen_humidity_sensor") # Detect stuck humidity sensor |
| 96 | + |
| 97 | +# Complete list of Data Quality Measures (dqm): https://github.com/Bilpapster/stream-DaQ/blob/main/streamdaq/DaQMeasures.py |
| 98 | + |
| 99 | + |
| 100 | +# Step 3: Kick-off monitoring and let Stream DaQ do the work while you focus on the important |
| 101 | +daq.watch_out() |
| 102 | + |
| 103 | +# IoT Compact Data Monitoring Benefits: |
| 104 | +# |
| 105 | +# 1. Bandwidth Efficiency: |
| 106 | +# - Compact format reduces network traffic by ~60% compared to individual field transmissions |
| 107 | +# - Critical for battery-powered sensors with limited connectivity |
| 108 | +# |
| 109 | +# 2. Automatic Transformation: |
| 110 | +# - Stream DaQ internally converts compact data to native format for quality analysis |
| 111 | +# - No manual preprocessing required - just specify the compact data structure |
| 112 | +# - Handles missing values, data types, and temporal alignment automatically |
| 113 | +# |
| 114 | +# 3. Real-World IoT Scenarios: |
| 115 | +# - Environmental monitoring stations (temperature, humidity, pressure) |
| 116 | +# - Industrial sensor networks (vibration, temperature, speed) |
| 117 | +# - Smart building systems (occupancy, air quality, energy usage) |
| 118 | +# - Vehicle telemetry (GPS, speed, fuel consumption, engine metrics) |
| 119 | +# |
| 120 | +# 4. Quality Monitoring Without Complexity: |
| 121 | +# - Apply the same quality measures as native data streams |
| 122 | +# - Detect sensor failures, missing readings, and data anomalies |
| 123 | +# - Monitor trends and patterns across multiple sensor types simultaneously |
| 124 | +# |
| 125 | +# Stream DaQ's compact data handling eliminates the typical IoT data preprocessing |
| 126 | +# pipeline, allowing you to focus on defining meaningful quality measures rather |
| 127 | +# than data transformation logic. This is especially valuable in resource-constrained |
| 128 | +# environments where development time and computational efficiency are critical! |
| 129 | +# |
| 130 | +# 📚 Learn More: |
| 131 | +# - Comprehensive compact data documentation: docs/examples/advanced-examples.rst |
| 132 | +# - Conceptual background: docs/concepts/compact-vs-native-data.rst |
0 commit comments