Skip to content

Commit 8c257be

Browse files
committed
Added streaming reading in MS Sentinel connector
1 parent 268dba1 commit 8c257be

File tree

8 files changed

+951
-186
lines changed

8 files changed

+951
-186
lines changed

README.md

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,9 @@ Supported write options:
159159

160160
### Reading from Microsoft Sentinel / Azure Monitor
161161

162-
The data source supports batch reading logs from Azure Monitor / Log Analytics workspaces using KQL (Kusto Query Language) queries. If schema isn't specified with `.schema`, it will be inferred automatically.
162+
The data source supports both batch and streaming reads from Azure Monitor / Log Analytics workspaces using KQL (Kusto Query Language) queries. If schema isn't specified with `.schema`, it will be inferred automatically.
163+
164+
#### Batch Read
163165

164166
Batch read usage:
165167

@@ -233,6 +235,60 @@ query = "SecurityAlert | where TimeGenerated > ago(7d) | project TimeGenerated,
233235
query = "MyCustomTable_CL | where TimeGenerated > ago(1h)"
234236
```
235237

238+
#### Streaming Read
239+
240+
The data source supports streaming reads from Azure Monitor / Log Analytics. The streaming reader uses time-based offsets to track progress and splits time ranges into partitions for parallel processing.
241+
242+
Streaming read usage:
243+
244+
```python
245+
from cyber_connectors import *
246+
spark.dataSource.register(AzureMonitorDataSource)
247+
248+
# Stream from a specific timestamp
249+
stream_options = {
250+
"workspace_id": "your-workspace-id",
251+
"query": "AzureActivity | project TimeGenerated, OperationName, ResourceGroup",
252+
"start_time": "2024-01-01T00:00:00Z", # Start streaming from this timestamp
253+
"tenant_id": tenant_id,
254+
"client_id": client_id,
255+
"client_secret": client_secret,
256+
"checkpointLocation": "/tmp/azure-monitor-checkpoint/",
257+
"partition_duration": "3600", # Optional: partition size in seconds (default 1 hour)
258+
}
259+
260+
# Read stream
261+
stream_df = spark.readStream.format("azure-monitor") \
262+
.options(**stream_options) \
263+
.load()
264+
265+
# Write to console or another sink
266+
query = stream_df.writeStream \
267+
.format("console") \
268+
.trigger(availableNow=True) \
269+
.option("checkpointLocation", "/tmp/azure-monitor-checkpoint/") \
270+
.start()
271+
272+
query.awaitTermination()
273+
```
274+
275+
Supported streaming read options:
276+
277+
- `workspace_id` (string, required) - Log Analytics workspace ID
278+
- `query` (string, required) - KQL query to execute (should not include time filters - these are added automatically)
279+
- `start_time` (string, optional, default: "latest") - Start time in ISO 8601 format (e.g., "2024-01-01T00:00:00Z"). Use "latest" to start from current time
280+
- `partition_duration` (int, optional, default: 3600) - Duration in seconds for each partition (controls parallelism)
281+
- `tenant_id` (string, required) - Azure Tenant ID
282+
- `client_id` (string, required) - Application ID (client ID) of Azure Service Principal
283+
- `client_secret` (string, required) - Client Secret of Azure Service Principal
284+
- `checkpointLocation` (string, required) - Directory path for Spark streaming checkpoints
285+
286+
**Important notes for streaming:**
287+
- The reader automatically tracks the timestamp of the last processed data in checkpoints
288+
- Time ranges are split into partitions based on `partition_duration` for parallel processing
289+
- The query should NOT include time filters (e.g., `where TimeGenerated > ago(1d)`) - the reader adds these automatically based on offsets
290+
- Use `start_time: "latest"` to begin streaming from the current time (useful for monitoring real-time data)
291+
236292
## Simple REST API
237293

238294
Right now only implements writing to arbitrary REST API - both batch & streaming. Registered data source name is `rest`.

cyber_connectors/MsSentinel.py

Lines changed: 236 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
from dataclasses import dataclass
2-
from datetime import datetime, date
2+
from datetime import date, datetime
33

44
from azure.monitor.ingestion import LogsIngestionClient
5-
from pyspark.errors.exceptions.base import PySparkNotImplementedError
65
from pyspark.sql.datasource import (
76
DataSource,
87
DataSourceReader,
8+
DataSourceStreamReader,
99
DataSourceStreamWriter,
1010
DataSourceWriter,
1111
InputPartition,
@@ -183,13 +183,13 @@ def _convert_value_to_schema_type(value, spark_type):
183183
elif isinstance(spark_type, (IntegerType, LongType)):
184184
if isinstance(value, bool):
185185
# Don't convert bool to int (bool is subclass of int in Python)
186-
raise ValueError(f"Cannot convert boolean to integer")
186+
raise ValueError("Cannot convert boolean to integer")
187187
return int(value)
188188

189189
# Float types
190190
elif isinstance(spark_type, (FloatType, DoubleType)):
191191
if isinstance(value, bool):
192-
raise ValueError(f"Cannot convert boolean to float")
192+
raise ValueError("Cannot convert boolean to float")
193193
return float(value)
194194

195195
# Timestamp type
@@ -264,13 +264,11 @@ def schema(self):
264264
StructType: The schema of the data
265265
266266
"""
267-
268267
infer_schema = self.options.get("inferSchema", "true").lower() == "true"
269268
if infer_schema:
270269
return self._infer_read_schema()
271270
else:
272271
raise Exception("Must provide schema if inferSchema is false")
273-
274272

275273
def _infer_read_schema(self):
276274
"""Infer schema by executing a sample query with limit 1.
@@ -284,13 +282,13 @@ def _infer_read_schema(self):
284282
"""
285283
from pyspark.sql.types import (
286284
BooleanType,
285+
DateType,
287286
DoubleType,
288287
LongType,
289288
StringType,
290289
StructField,
291290
StructType,
292291
TimestampType,
293-
DateType,
294292
)
295293

296294
# Get read options
@@ -381,6 +379,9 @@ def _infer_read_schema(self):
381379
def reader(self, schema: StructType):
382380
return AzureMonitorBatchReader(self.options, schema)
383381

382+
def streamReader(self, schema: StructType):
383+
return AzureMonitorStreamReader(self.options, schema)
384+
384385
def streamWriter(self, schema: StructType, overwrite: bool):
385386
return AzureMonitorStreamWriter(self.options)
386387

@@ -521,6 +522,234 @@ def read(self, partition: TimeRangePartition):
521522
yield Row(**row_dict)
522523

523524

525+
class AzureMonitorOffset:
526+
"""Represents the offset for Azure Monitor streaming.
527+
528+
The offset tracks the timestamp of the last processed data to enable incremental streaming.
529+
"""
530+
531+
def __init__(self, timestamp: str):
532+
"""Initialize offset with ISO 8601 timestamp.
533+
534+
Args:
535+
timestamp: ISO 8601 formatted timestamp string (e.g., "2024-01-01T00:00:00Z")
536+
537+
"""
538+
self.timestamp = timestamp
539+
540+
def json(self):
541+
"""Serialize offset to JSON string.
542+
543+
Returns:
544+
JSON string representation of the offset
545+
546+
"""
547+
import json
548+
549+
return json.dumps({"timestamp": self.timestamp})
550+
551+
@staticmethod
552+
def from_json(json_str: str):
553+
"""Deserialize offset from JSON string.
554+
555+
Args:
556+
json_str: JSON string containing offset data
557+
558+
Returns:
559+
AzureMonitorOffset instance
560+
561+
"""
562+
import json
563+
564+
data = json.loads(json_str)
565+
return AzureMonitorOffset(data["timestamp"])
566+
567+
568+
class AzureMonitorStreamReader(DataSourceStreamReader):
569+
"""Stream reader for Azure Monitor / Log Analytics workspaces.
570+
571+
Implements incremental streaming by tracking time-based offsets and splitting
572+
time ranges into partitions for parallel processing.
573+
"""
574+
575+
def __init__(self, options, schema: StructType):
576+
"""Initialize the stream reader with options and schema.
577+
578+
Args:
579+
options: Dictionary of options containing workspace_id, query, start_time, credentials
580+
schema: StructType schema (provided by DataSource.schema())
581+
582+
"""
583+
# Extract and validate required options
584+
self.workspace_id = options.get("workspace_id")
585+
self.query = options.get("query")
586+
self.tenant_id = options.get("tenant_id")
587+
self.client_id = options.get("client_id")
588+
self.client_secret = options.get("client_secret")
589+
590+
# Stream-specific options
591+
start_time = options.get("start_time", "latest")
592+
# Support 'latest' as alias for current timestamp
593+
if start_time == "latest":
594+
from datetime import datetime, timezone
595+
596+
self.start_time = datetime.now(timezone.utc).isoformat()
597+
else:
598+
# Validate that start_time is a valid ISO 8601 timestamp
599+
from datetime import datetime
600+
601+
try:
602+
datetime.fromisoformat(start_time.replace("Z", "+00:00"))
603+
self.start_time = start_time
604+
except (ValueError, AttributeError) as e:
605+
raise ValueError(
606+
f"Invalid start_time format: {start_time}. Expected ISO 8601 format (e.g., '2024-01-01T00:00:00Z')"
607+
) from e
608+
609+
# Partition duration in seconds (default 1 hour)
610+
self.partition_duration = int(options.get("partition_duration", "3600"))
611+
612+
# Validate required options
613+
assert self.workspace_id is not None, "workspace_id is required"
614+
assert self.query is not None, "query is required"
615+
assert self.tenant_id is not None, "tenant_id is required"
616+
assert self.client_id is not None, "client_id is required"
617+
assert self.client_secret is not None, "client_secret is required"
618+
619+
self._schema = schema
620+
621+
def initialOffset(self):
622+
"""Return the initial offset (start time).
623+
624+
Returns:
625+
JSON string representation of AzureMonitorOffset with the configured start time
626+
627+
"""
628+
return AzureMonitorOffset(self.start_time).json()
629+
630+
def latestOffset(self):
631+
"""Return the latest offset (current time).
632+
633+
Returns:
634+
JSON string representation of AzureMonitorOffset with the current UTC timestamp
635+
636+
"""
637+
from datetime import datetime, timezone
638+
639+
current_time = datetime.now(timezone.utc).isoformat()
640+
return AzureMonitorOffset(current_time).json()
641+
642+
def partitions(self, start, end):
643+
"""Create partitions for the time range between start and end offsets.
644+
645+
Splits the time range into fixed-duration partitions based on partition_duration.
646+
647+
Args:
648+
start: JSON string representing AzureMonitorOffset for the start of the range
649+
end: JSON string representing AzureMonitorOffset for the end of the range
650+
651+
Returns:
652+
List of TimeRangePartition objects
653+
654+
"""
655+
from datetime import datetime, timedelta
656+
657+
# Deserialize JSON strings to offset objects
658+
start_offset = AzureMonitorOffset.from_json(start)
659+
end_offset = AzureMonitorOffset.from_json(end)
660+
661+
# Parse timestamps
662+
start_time = datetime.fromisoformat(start_offset.timestamp.replace("Z", "+00:00"))
663+
end_time = datetime.fromisoformat(end_offset.timestamp.replace("Z", "+00:00"))
664+
665+
# Calculate total duration
666+
total_duration = (end_time - start_time).total_seconds()
667+
668+
# If total duration is less than partition_duration, create a single partition
669+
if total_duration <= self.partition_duration:
670+
return [TimeRangePartition(start_time, end_time)]
671+
672+
# Split into fixed-duration partitions
673+
partitions = []
674+
current_start = start_time
675+
partition_delta = timedelta(seconds=self.partition_duration)
676+
677+
while current_start < end_time:
678+
current_end = min(current_start + partition_delta, end_time)
679+
partitions.append(TimeRangePartition(current_start, current_end))
680+
# Next partition starts 1 microsecond after current partition ends to avoid overlap
681+
current_start = current_end + timedelta(microseconds=1)
682+
683+
return partitions
684+
685+
def read(self, partition: TimeRangePartition):
686+
"""Read data for the given partition time range.
687+
688+
Args:
689+
partition: TimeRangePartition containing start_time and end_time
690+
691+
Yields:
692+
Row objects from the query results
693+
694+
"""
695+
# Import inside method for partition-level execution
696+
from pyspark.sql import Row
697+
698+
# Use partition's time range
699+
timespan_value = (partition.start_time, partition.end_time)
700+
701+
# Execute query using module-level function
702+
response = _execute_logs_query(
703+
workspace_id=self.workspace_id,
704+
query=self.query,
705+
timespan=timespan_value,
706+
tenant_id=self.tenant_id,
707+
client_id=self.client_id,
708+
client_secret=self.client_secret,
709+
)
710+
711+
# Create a mapping of column names to their expected types from schema
712+
schema_field_map = {field.name: field.dataType for field in self._schema.fields}
713+
714+
# Process all tables in response (reuse same logic as batch reader)
715+
for table in response.tables:
716+
# Convert Azure Monitor rows to Spark Rows
717+
for row_idx, row_data in enumerate(table.rows):
718+
row_dict = {}
719+
720+
# First, process columns from the query results
721+
for i, col in enumerate(table.columns):
722+
# Handle both string columns (real API) and objects with .name attribute (test mocks)
723+
column_name = str(col) if isinstance(col, str) else str(col.name)
724+
raw_value = row_data[i]
725+
726+
# If column is in schema, convert to expected type
727+
if column_name in schema_field_map:
728+
expected_type = schema_field_map[column_name]
729+
try:
730+
converted_value = _convert_value_to_schema_type(raw_value, expected_type)
731+
row_dict[column_name] = converted_value
732+
except ValueError as e:
733+
raise ValueError(f"Row {row_idx}, column '{column_name}': {e}")
734+
735+
# Second, add NULL values for schema columns that are not in query results
736+
for schema_column_name in schema_field_map.keys():
737+
if schema_column_name not in row_dict:
738+
row_dict[schema_column_name] = None
739+
740+
yield Row(**row_dict)
741+
742+
def commit(self, end):
743+
"""Called when a batch is successfully processed.
744+
745+
Args:
746+
end: AzureMonitorOffset representing the end of the committed batch
747+
748+
"""
749+
# Nothing special needed - Spark handles checkpointing
750+
pass
751+
752+
524753
# https://learn.microsoft.com/en-us/python/api/overview/azure/monitor-ingestion-readme?view=azure-python
525754
class AzureMonitorWriter:
526755
def __init__(self, options):

0 commit comments

Comments
 (0)