Skip to content

Commit 92acca8

Browse files
authored
Support batch reading from Log Analytics (#107)
* Support batch reading from Log Analytics Resolves #12 * fix poetry.lock
1 parent c6c0e51 commit 92acca8

File tree

8 files changed

+1762
-22
lines changed

8 files changed

+1762
-22
lines changed

README.md

Lines changed: 104 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@
55
66
Based on [PySpark DataSource API](https://spark.apache.org/docs/preview/api/python/user_guide/sql/python_data_source.html) available with Spark 4 & [DBR 15.3+](https://docs.databricks.com/en/pyspark/datasources.html).
77

8-
- [Available data sources](#available-data-sources)
9-
- [Splunk data source](#splunk-data-source)
10-
- [Microsoft Sentinel / Azure Monitor](#microsoft-sentinel--azure-monitor)
11-
- [Simple REST API](#simple-rest-api)
12-
- [Building](#building)
13-
- [References](#references)
8+
- [Custom data sources/sinks for Cybersecurity-related work](#custom-data-sourcessinks-for-cybersecurity-related-work)
9+
- [Available data sources](#available-data-sources)
10+
- [Splunk data source](#splunk-data-source)
11+
- [Microsoft Sentinel / Azure Monitor](#microsoft-sentinel--azure-monitor)
12+
- [Authentication Requirements](#authentication-requirements)
13+
- [Writing to Microsoft Sentinel / Azure Monitor](#writing-to-microsoft-sentinel--azure-monitor)
14+
- [Reading from Microsoft Sentinel / Azure Monitor](#reading-from-microsoft-sentinel--azure-monitor)
15+
- [Simple REST API](#simple-rest-api)
16+
- [Building](#building)
17+
- [References](#references)
1418

1519

1620
# Available data sources
@@ -77,13 +81,26 @@ Supported options:
7781

7882
## Microsoft Sentinel / Azure Monitor
7983

80-
Right now only implements writing to [Microsoft Sentinel](https://learn.microsoft.com/en-us/azure/sentinel/overview/) - both batch & streaming. Registered data source name is `ms-sentinel`. The integration uses [Logs Ingestion API of Azure Monitor](https://learn.microsoft.com/en-us/azure/sentinel/create-custom-connector#connect-with-the-log-ingestion-api), so it's also exposed as `azure-monitor`.
84+
This data source supports both reading from and writing to [Microsoft Sentinel](https://learn.microsoft.com/en-us/azure/sentinel/overview/) / [Azure Monitor Log Analytics](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-analytics-overview). Registered data source names are `ms-sentinel` and `azure-monitor`.
8185

82-
To push data you need to create Data Collection Endpoint (DCE), Data Collection Rule (DCR), and create a custom table in Log Analytics workspace. See [documentation](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview) for description of this process. The structure of the data in DataFrame should match the structure of the defined custom table.
86+
### Authentication Requirements
8387

84-
This connector uses Azure Service Principal Client ID/Secret for authentication - you need to grant correct permissions (`Monitoring Metrics Publisher`) to the service principal on the DCE and DCR.
88+
This connector uses Azure Service Principal Client ID/Secret for authentication.
8589

86-
Batch usage:
90+
The service principal needs the following permissions:
91+
- For reading: **Log Analytics Reader** role on the Log Analytics workspace
92+
- For writing: **Monitoring Metrics Publisher** role on the DCE and DCR
93+
94+
95+
### Writing to Microsoft Sentinel / Azure Monitor
96+
97+
The integration uses [Logs Ingestion API of Azure Monitor](https://learn.microsoft.com/en-us/azure/sentinel/create-custom-connector#connect-with-the-log-ingestion-api) for writing data.
98+
99+
To push data you need to create Data Collection Endpoint (DCE), Data Collection Rule (DCR), and create a custom table in Log Analytics workspace. See [documentation](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview) for description of this process. The structure of the data in DataFrame should match the structure of the defined custom table.
100+
101+
You need to grant correct permissions (`Monitoring Metrics Publisher`) to the service principal on the DCE and DCR.
102+
103+
Batch write usage:
87104

88105
```python
89106
from cyber_connectors import *
@@ -105,7 +122,7 @@ df.write.format("ms-sentinel") \
105122
.save()
106123
```
107124

108-
Streaming usage:
125+
Streaming write usage:
109126

110127
```python
111128
from cyber_connectors import *
@@ -130,7 +147,7 @@ stream = sdf.writeStream.format("ms-sentinel") \
130147
.options(**sentinel_stream_options).start()
131148
```
132149

133-
Supported options:
150+
Supported write options:
134151

135152
- `dce` (string, required) - URL of the Data Collection Endpoint.
136153
- `dcr_id` (string, required) - ID of Data Collection Rule.
@@ -140,6 +157,81 @@ Supported options:
140157
- `client_secret` (string, required) - Client Secret of Azure Service Principal.
141158
- `batch_size` (int. optional, default: 50) - the size of the buffer to collect payload before sending to MS Sentinel.
142159

160+
### Reading from Microsoft Sentinel / Azure Monitor
161+
162+
The data source supports batch reading logs from Azure Monitor / Log Analytics workspaces using KQL (Kusto Query Language) queries. If schema isn't specified with `.schema`, it will be inferred automatically.
163+
164+
Batch read usage:
165+
166+
```python
167+
from cyber_connectors import *
168+
spark.dataSource.register(AzureMonitorDataSource)
169+
170+
# Option 1: Using timespan (ISO 8601 duration)
171+
read_options = {
172+
"workspace_id": "your-workspace-id",
173+
"query": "AzureActivity | where TimeGenerated > ago(1d) | take 100",
174+
"timespan": "P1D", # ISO 8601 duration: 1 day
175+
"tenant_id": tenant_id,
176+
"client_id": client_id,
177+
"client_secret": client_secret,
178+
}
179+
180+
# Option 2: Using start_time and end_time (ISO 8601 timestamps)
181+
read_options = {
182+
"workspace_id": "your-workspace-id",
183+
"query": "AzureActivity | take 100",
184+
"start_time": "2024-01-01T00:00:00Z",
185+
"end_time": "2024-01-02T00:00:00Z",
186+
"tenant_id": tenant_id,
187+
"client_id": client_id,
188+
"client_secret": client_secret,
189+
}
190+
191+
# Option 3: Using only start_time (end_time defaults to current time)
192+
read_options = {
193+
"workspace_id": "your-workspace-id",
194+
"query": "AzureActivity | take 100",
195+
"start_time": "2024-01-01T00:00:00Z", # Query from start_time to now
196+
"tenant_id": tenant_id,
197+
"client_id": client_id,
198+
"client_secret": client_secret,
199+
}
200+
201+
df = spark.read.format("azure-monitor") \
202+
.options(**read_options) \
203+
.load()
204+
205+
df.show()
206+
```
207+
208+
Supported read options:
209+
210+
- `workspace_id` (string, required) - Log Analytics workspace ID
211+
- `query` (string, required) - KQL query to execute
212+
- **Time range options (choose one approach):**
213+
- `timespan` (string) - Time range in ISO 8601 duration format (e.g., "P1D" = 1 day, "PT1H" = 1 hour, "P7D" = 7 days)
214+
- `start_time` (string) - Start time in ISO 8601 format (e.g., "2024-01-01T00:00:00Z"). If provided without `end_time`, queries from `start_time` to current time
215+
- `end_time` (string, optional) - End time in ISO 8601 format. Only valid when `start_time` is specified
216+
- **Note**: `timespan` and `start_time/end_time` are mutually exclusive - choose one approach
217+
- `tenant_id` (string, required) - Azure Tenant ID
218+
- `client_id` (string, required) - Application ID (client ID) of Azure Service Principal
219+
- `client_secret` (string, required) - Client Secret of Azure Service Principal
220+
- `num_partitions` (int, optional, default: 1) - Number of partitions for reading data
221+
222+
**KQL Query Examples:**
223+
224+
```python
225+
# Get recent Azure Activity logs
226+
query = "AzureActivity | where TimeGenerated > ago(24h) | project TimeGenerated, OperationName, ResourceGroup"
227+
228+
# Get security alerts
229+
query = "SecurityAlert | where TimeGenerated > ago(7d) | project TimeGenerated, AlertName, Severity"
230+
231+
# Custom table query
232+
query = "MyCustomTable_CL | where TimeGenerated > ago(1h)"
233+
```
234+
143235
## Simple REST API
144236

145237
Right now only implements writing to arbitrary REST API - both batch & streaming. Registered data source name is `rest`.

0 commit comments

Comments
 (0)