Skip to content

Commit e0626b9

Browse files
zmoogcursoragentKavindu-Dodan
authored
feat: generate Azure activity logs compatible with azurelogs translator (#121)
* Generate valid activity logs * fix: remove unused vars and funcs left over from activity log refactor Removes azureActions, azureProtocols, azureDirections vars and randomAzureAction, randomAzureStorageAccount, randomBlobPath, randomHTTPStatusCode funcs that are no longer referenced after the generateAzureProperties rewrite. Co-authored-by: Cursor <cursoragent@cursor.com> * Fix imports * Flush batch when full * fix review suggestionis & improve Send logic Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co> --------- Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co> Co-authored-by: Cursor <cursoragent@cursor.com> Co-authored-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>
1 parent 1abbff3 commit e0626b9

3 files changed

Lines changed: 380 additions & 80 deletions

File tree

exporters/internal/eventhub.go

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package internal
22

33
import (
4+
"bytes"
45
"context"
6+
"errors"
57
"fmt"
68
"os"
79

@@ -73,24 +75,77 @@ func NewEventHubExporter(ctx context.Context, c *conf.Config) (*EventHubExporter
7375
}, nil
7476
}
7577

78+
// Send delivers data to Event Hubs. The generator accumulates multiple
79+
// {"records": [...]} objects separated by newlines (NDJSON). Each line is
80+
// sent as its own EventData so that no individual message exceeds the 1 MB
81+
// Event Hubs limit. Lines are packed into batches; when a batch is full the
82+
// SDK returns ErrEventDataTooLarge, at which point the batch is flushed and a
83+
// new one is started for the remaining lines.
7684
func (e *EventHubExporter) Send(data *[]byte) error {
77-
batch, err := e.producer.NewEventDataBatch(context.Background(), nil)
85+
ctx := context.Background()
86+
lines := bytes.Split(bytes.TrimRight(*data, "\n"), []byte("\n"))
87+
88+
batch, err := e.producer.NewEventDataBatch(ctx, nil)
7889
if err != nil {
7990
return fmt.Errorf("failed to create event batch: %w", err)
8091
}
8192

82-
event := &azeventhubs.EventData{
83-
Body: *data,
93+
for _, line := range lines {
94+
if len(line) == 0 {
95+
continue
96+
}
97+
98+
batch, err = e.addEventToBatch(ctx, batch, line)
99+
if err != nil {
100+
return err
101+
}
84102
}
85103

86-
err = batch.AddEventData(event, nil)
87-
if err != nil {
88-
return fmt.Errorf("failed to add event to batch: %w", err)
104+
return e.flushBatch(ctx, batch)
105+
}
106+
107+
// addEventToBatch adds a line to the batch, flushing and creating a new batch if needed.
108+
func (e *EventHubExporter) addEventToBatch(ctx context.Context, batch *azeventhubs.EventDataBatch, line []byte) (*azeventhubs.EventDataBatch, error) {
109+
err := batch.AddEventData(&azeventhubs.EventData{Body: line}, nil)
110+
if err == nil {
111+
return batch, nil
112+
}
113+
114+
if !errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
115+
return nil, fmt.Errorf("failed to add event to batch: %w", err)
116+
}
117+
118+
// Single record exceeds limit — cannot proceed
119+
if batch.NumEvents() == 0 {
120+
return nil, fmt.Errorf("single record (%d bytes) exceeds Event Hubs message size limit", len(line))
121+
}
122+
123+
// Flush current batch and create a new one
124+
if err = e.producer.SendEventDataBatch(ctx, batch, nil); err != nil {
125+
return nil, fmt.Errorf("failed to send event batch to %s: %w", e.cfg.EventHubName, err)
89126
}
90127

91-
err = e.producer.SendEventDataBatch(context.Background(), batch, nil)
128+
newBatch, err := e.producer.NewEventDataBatch(ctx, nil)
92129
if err != nil {
93-
return fmt.Errorf("failed to send event to event hub %s: %w", e.cfg.EventHubName, err)
130+
return nil, fmt.Errorf("failed to create event batch: %w", err)
131+
}
132+
133+
// Retry the line in the fresh batch
134+
if err = newBatch.AddEventData(&azeventhubs.EventData{Body: line}, nil); err != nil {
135+
return nil, fmt.Errorf("failed to add event to fresh batch: %w", err)
136+
}
137+
138+
return newBatch, nil
139+
}
140+
141+
// flushBatch sends any remaining events in the batch.
142+
func (e *EventHubExporter) flushBatch(ctx context.Context, batch *azeventhubs.EventDataBatch) error {
143+
if batch.NumEvents() == 0 {
144+
return nil
145+
}
146+
147+
if err := e.producer.SendEventDataBatch(ctx, batch, nil); err != nil {
148+
return fmt.Errorf("failed to send event batch to %s: %w", e.cfg.EventHubName, err)
94149
}
95150

96151
return nil

generators/internal/azure_resource_log.go

Lines changed: 178 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package internal
22

33
import (
44
"encoding/json"
5+
"fmt"
6+
"math/rand"
57
"os"
68
"strconv"
79
"time"
@@ -105,10 +107,22 @@ type azureIdentity struct {
105107
}
106108

107109
// azureAuthorization contains authorization details for the Azure operation.
110+
// The Evidence field follows the translator's schema
111+
// (https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/activity-log-schema).
108112
type azureAuthorization struct {
109-
Scope string `json:"scope,omitempty"`
110-
Action string `json:"action,omitempty"`
111-
Role string `json:"role,omitempty"`
113+
Scope string `json:"scope,omitempty"`
114+
Action string `json:"action,omitempty"`
115+
Evidence *azureEvidence `json:"evidence,omitempty"`
116+
}
117+
118+
// azureEvidence holds role-assignment details inside an authorization block.
119+
type azureEvidence struct {
120+
Role string `json:"role,omitempty"`
121+
RoleAssignmentScope string `json:"roleAssignmentScope,omitempty"`
122+
RoleAssignmentID string `json:"roleAssignmentId,omitempty"`
123+
RoleDefinitionID string `json:"roleDefinitionId,omitempty"`
124+
PrincipalID string `json:"principalId,omitempty"`
125+
PrincipalType string `json:"principalType,omitempty"`
112126
}
113127

114128
func buildAzureResourceLog() azureResourceLog {
@@ -135,20 +149,40 @@ func buildAzureResourceLog() azureResourceLog {
135149
log.ResultDescription = randomAzureErrorDescription()
136150
}
137151

138-
// Add identity for certain operation types
139-
if shouldHaveIdentity(operationName) {
152+
// Administrative and Policy logs carry a full authorization+claims identity;
153+
// other categories use only a minimal claims block.
154+
if shouldHaveFullIdentity(category) {
155+
subscriptionID := randomAzureGUID()
156+
157+
now := time.Now().Unix()
158+
140159
log.Identity = &azureIdentity{
141160
Authorization: &azureAuthorization{
142161
Scope: randomAzureResourceID(),
143-
Action: randomAzureAction(),
144-
Role: randomAzureRole(),
162+
Action: operationName,
163+
Evidence: &azureEvidence{
164+
Role: randomAzureRole(),
165+
RoleAssignmentScope: "/subscriptions/" + subscriptionID,
166+
RoleAssignmentID: randomAZaz09String(32),
167+
RoleDefinitionID: randomAZaz09String(32),
168+
PrincipalID: randomAZaz09String(32),
169+
PrincipalType: "User",
170+
},
145171
},
146172
Claims: map[string]string{
147-
"aud": "https://management.azure.com/",
148-
"iss": "https://sts.windows.net/" + randomAzureGUID() + "/",
149-
"iat": "1234567890",
150-
"nbf": "1234567890",
151-
"exp": "1234567890",
173+
"aud": "https://management.core.windows.net/",
174+
"iss": "https://sts.windows.net/" + randomAzureGUID() + "/",
175+
"iat": strconv.FormatInt(now-int64(rand.Intn(300)), 10), // issued up to 5 min ago
176+
"nbf": strconv.FormatInt(now, 10), // not valid before now
177+
"exp": strconv.FormatInt(now+int64(3600), 10), // expires in 1 hour
178+
"name": randomAzureUserName(),
179+
"ipaddr": randomIP(),
180+
},
181+
}
182+
} else {
183+
log.Identity = &azureIdentity{
184+
Claims: map[string]string{
185+
"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/spn": randomAzureServicePrincipal(category),
152186
},
153187
}
154188
}
@@ -159,44 +193,151 @@ func buildAzureResourceLog() azureResourceLog {
159193
return log
160194
}
161195

162-
func shouldHaveIdentity(operationName string) bool {
163-
// Add identity for write operations
164-
return operationName == "Microsoft.Storage/storageAccounts/write" ||
165-
operationName == "Microsoft.Compute/virtualMachines/write" ||
166-
operationName == "Microsoft.Network/networkSecurityGroups/securityRules/write" ||
167-
operationName == "Microsoft.KeyVault/vaults/secrets/write"
196+
// shouldHaveFullIdentity returns true for categories that carry a full
197+
// identity object (authorization + claims) in real Azure activity logs.
198+
func shouldHaveFullIdentity(category string) bool {
199+
return category == "Administrative" || category == "Policy"
168200
}
169201

202+
// generateAzureProperties returns a properties map whose keys match the
203+
// schemas expected by the opentelemetry-collector-contrib azurelogs translator.
204+
// See: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/translator/azurelogs
170205
func generateAzureProperties(category, operationName string) map[string]interface{} {
171206
props := make(map[string]interface{})
172207

173208
switch category {
174209
case "Administrative":
210+
// administrativeLogProperties: entity, message, hierarchy (+ eventCategory)
211+
resourceID := randomAzureResourceID()
212+
tenantID := randomAzureGUID()
213+
subscriptionID := randomAzureGUID()
175214
props["eventCategory"] = "Administrative"
176-
props["eventDataId"] = randomAzureGUID()
177-
props["operationId"] = randomAzureGUID()
178-
props["httpRequest"] = map[string]interface{}{
179-
"clientRequestId": randomAzureGUID(),
180-
"clientIpAddress": randomIP(),
181-
"method": randomHTTPMethod(),
182-
}
215+
props["entity"] = resourceID
216+
props["message"] = operationName
217+
props["hierarchy"] = tenantID + "/" + subscriptionID
218+
183219
case "Security":
184-
props["securityEventType"] = randomAzureSecurityEventType()
185-
props["protocol"] = randomAzureProtocol()
186-
props["direction"] = randomAzureDirection()
220+
// securityLogProperties: accountLogonId, commandLine, domainName,
221+
// parentProcess, parentProcessid, processId, processName,
222+
// userName, UserSID, ActionTaken, Severity
223+
processName := fmt.Sprintf("c:\\windows\\system32\\%s.exe", randomAZaz09String(6))
224+
props["eventCategory"] = "Security"
225+
props["accountLogonId"] = fmt.Sprintf("0x%s", randomAZaz09String(4))
226+
props["commandLine"] = processName
227+
props["domainName"] = randomAZaz09String(6)
228+
props["parentProcess"] = "explorer.exe"
229+
props["parentProcess id"] = fmt.Sprintf("%d", rand.Intn(9000)+1000)
230+
props["processId"] = fmt.Sprintf("%d", rand.Intn(9000)+1000)
231+
props["processName"] = processName
232+
props["userName"] = fmt.Sprintf("user%s", randomAZaz09String(4))
233+
props["UserSID"] = fmt.Sprintf("S-1-5-21-%d-%d-%d", rand.Intn(9999999), rand.Intn(9999999), rand.Intn(9999999))
234+
props["ActionTaken"] = randomAzureSecurityActionTaken()
235+
props["Severity"] = randomAzureSecuritySeverity()
236+
187237
case "ServiceHealth":
188-
props["eventType"] = "ServiceIssue"
189-
props["trackingId"] = randomAzureGUID()
238+
// serviceHealthLogProperties: title, service, region, communication,
239+
// communicationId, incidentType, trackingId, impactStartTime,
240+
// impactMitigationTime, impactedServices
241+
service := randomAzureServiceName()
242+
region := randomAzureRegion()
243+
trackingID := randomAZaz09String(8)
244+
impactStart := time.Now().UTC().Add(-time.Duration(rand.Intn(72)) * time.Hour)
245+
props["title"] = fmt.Sprintf("Service issue with %s in %s", service, region)
246+
props["service"] = service
247+
props["region"] = region
248+
props["communication"] = fmt.Sprintf("We are aware of an issue with %s in %s and are actively investigating.", service, region)
249+
props["communicationId"] = randomAZaz09String(12)
190250
props["incidentType"] = randomAzureIncidentType()
251+
props["trackingId"] = trackingID
252+
props["impactStartTime"] = impactStart.Format(time.RFC3339Nano)
253+
props["impactMitigationTime"] = impactStart.Add(time.Duration(rand.Intn(24)+1) * time.Hour).Format(time.RFC3339Nano)
254+
props["impactedServices"] = fmt.Sprintf(
255+
`[{"ImpactedRegions":[{"RegionName":"%s"}],"ServiceName":"%s"}]`,
256+
region, service,
257+
)
258+
props["stage"] = "Active"
259+
props["isHIR"] = false
260+
props["IsSynthetic"] = "False"
261+
191262
case "ResourceHealth":
192-
props["currentHealthStatus"] = randomAzureHealthStatus()
193-
props["previousHealthStatus"] = randomAzureHealthStatus()
263+
// resourceHealthLogProperties: title, details, currentHealthStatus,
264+
// previousHealthStatus, type, cause
265+
currentStatus := randomAzureHealthStatus()
266+
previousStatus := randomAzureHealthStatus()
267+
props["title"] = currentStatus
268+
props["details"] = fmt.Sprintf("Resource transitioned from %s to %s", previousStatus, currentStatus)
269+
props["currentHealthStatus"] = currentStatus
270+
props["previousHealthStatus"] = previousStatus
271+
props["type"] = randomAzureHealthType()
194272
props["cause"] = randomAzureHealthCause()
195-
case "StorageRead", "StorageWrite", "StorageDelete":
196-
props["requestUrl"] = "https://" + randomAzureStorageAccount() + ".blob.core.windows.net/" + randomBlobPath()
197-
props["userAgentHeader"] = randomUserAgent()
198-
props["statusCode"] = randomHTTPStatusCode()
199-
props["serverLatencyMs"] = randomDurationMs()
273+
274+
case "Alert":
275+
// alertLogProperties: RuleUri, RuleName, RuleDescription, Threshold,
276+
// WindowSizeInMinutes, Aggregation, Operator, MetricName, MetricUnit
277+
ruleName := fmt.Sprintf("alert-%s", randomAZaz09String(6))
278+
resourceID := randomAzureResourceID()
279+
props["RuleUri"] = resourceID + "/providers/microsoft.insights/alertrules/" + ruleName
280+
props["RuleName"] = ruleName
281+
props["RuleDescription"] = fmt.Sprintf("Alert rule for %s", randomAzureAlertMetricName())
282+
props["Threshold"] = fmt.Sprintf("%d", rand.Intn(99000)+1000)
283+
props["WindowSizeInMinutes"] = fmt.Sprintf("%d", []int{5, 10, 15, 30, 60}[rand.Intn(5)])
284+
props["Aggregation"] = randomAzureAlertAggregation()
285+
props["Operator"] = randomAzureAlertOperator()
286+
props["MetricName"] = randomAzureAlertMetricName()
287+
props["MetricUnit"] = "Count"
288+
289+
case "Recommendation":
290+
// recommendationLogProperties: recommendationSchemaVersion,
291+
// recommendationCategory, recommendationImpact, recommendationName,
292+
// recommendationResourceLink, recommendationType
293+
recommendationType := randomAzureRecommendationType()
294+
resourceID := randomAzureResourceID()
295+
props["recommendationSchemaVersion"] = "1.0"
296+
props["recommendationCategory"] = randomAzureRecommendationCategory()
297+
props["recommendationImpact"] = randomAzureRecommendationImpact()
298+
props["recommendationName"] = randomAzureRecommendationName()
299+
props["recommendationResourceLink"] = fmt.Sprintf(
300+
"https://portal.azure.com/#blade/Microsoft_Azure_Expert/RecommendationListBlade/recommendationTypeId/%s/resourceId/%s",
301+
recommendationType, resourceID,
302+
)
303+
props["recommendationType"] = recommendationType
304+
305+
case "Policy":
306+
// policyLogProperties: isComplianceCheck, resourceLocation, ancestors,
307+
// policies (JSON string), eventCategory, entity, message, hierarchy
308+
subscriptionID := randomAzureGUID()
309+
resourceID := randomAzureResourceID()
310+
policyDefID := randomAzurePolicyDefinitionID()
311+
policyJSON := fmt.Sprintf(
312+
`[{"policyDefinitionId":"%s","policyDefinitionEffect":"AuditIfNotExists","policyAssignmentScope":"/subscriptions/%s"}]`,
313+
policyDefID, subscriptionID,
314+
)
315+
props["isComplianceCheck"] = "False"
316+
props["resourceLocation"] = randomAzureRegion()
317+
props["ancestors"] = subscriptionID
318+
props["policies"] = policyJSON
319+
props["eventCategory"] = "Policy"
320+
props["entity"] = resourceID
321+
props["message"] = operationName
322+
props["hierarchy"] = ""
323+
324+
case "Autoscale":
325+
// autoscaleLogProperties: Description, ResourceName, OldInstancesCount,
326+
// NewInstancesCount, LastScaleActionTime
327+
resourceName := randomAzureResourceID()
328+
oldCount := rand.Intn(8) + 2
329+
newCount := oldCount + []int{-1, 1}[rand.Intn(2)]
330+
if newCount < 1 {
331+
newCount = 1
332+
}
333+
props["Description"] = fmt.Sprintf(
334+
"The autoscale engine attempting to scale resource '%s' from %d instances count to %d instances count.",
335+
resourceName, oldCount, newCount,
336+
)
337+
props["ResourceName"] = resourceName
338+
props["OldInstancesCount"] = fmt.Sprintf("%d", oldCount)
339+
props["NewInstancesCount"] = fmt.Sprintf("%d", newCount)
340+
props["LastScaleActionTime"] = time.Now().UTC().Format(time.RFC1123)
200341
}
201342

202343
return props

0 commit comments

Comments
 (0)