-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathdemo-config.yaml
More file actions
198 lines (190 loc) · 9.85 KB
/
demo-config.yaml
File metadata and controls
198 lines (190 loc) · 9.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
receivers:
webhookevent:
endpoint: 0.0.0.0:8088
read_timeout: "500ms"
path: "/eventsource/receiver"
health_path: "/eventreceiver/healthcheck"
split_logs_at_newline: false
# Filelog receiver: reads metrics JSON and sends to Loki as logs
filelog/metrics:
include:
- /data/metrics.jsonl
start_at: end
poll_interval: 250ms
operators:
- type: json_parser
id: parser-metrics
output: extract_metric_data
- type: add
id: extract_metric_data
field: attributes.service_name
value: "signaltometrics"
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
batch/metrics:
timeout: 1s
send_batch_size: 10
truthbeam:
endpoint: "https://compass:8081"
# Disable compression for small enrichment API requests
# Compression overhead is unnecessary for ~200 byte payloads
compression: ""
tls:
ca_file: /certs/truthbeam.crt
# Transform processor: extracts metric data from OTLP JSON and converts to log attributes
transform/metrics_to_logs:
error_mode: ignore
log_statements:
- context: log
conditions:
- attributes["service_name"] == "signaltometrics"
statements:
# Parse OTLP JSON: resourceMetrics[0].scopeMetrics[0].metrics[0]
- set(attributes["_metric_parsed"], true) where IsMap(ParseJSON(body))
- set(attributes["metric_name"], ParseJSON(body)["resourceMetrics"][0]["scopeMetrics"][0]["metrics"][0]["name"]) where IsMap(ParseJSON(body)) and ParseJSON(body)["resourceMetrics"] != nil
# Extract metric value (asDouble or asInt)
- set(attributes["metric_value"], ParseJSON(body)["resourceMetrics"][0]["scopeMetrics"][0]["metrics"][0]["sum"]["dataPoints"][0]["asDouble"]) where ParseJSON(body)["resourceMetrics"] != nil and ParseJSON(body)["resourceMetrics"][0]["scopeMetrics"][0]["metrics"][0]["sum"] != nil and ParseJSON(body)["resourceMetrics"][0]["scopeMetrics"][0]["metrics"][0]["sum"]["dataPoints"][0]["asDouble"] != nil
- set(attributes["metric_value"], ParseJSON(body)["resourceMetrics"][0]["scopeMetrics"][0]["metrics"][0]["sum"]["dataPoints"][0]["asInt"]) where ParseJSON(body)["resourceMetrics"] != nil and ParseJSON(body)["resourceMetrics"][0]["scopeMetrics"][0]["metrics"][0]["sum"] != nil and ParseJSON(body)["resourceMetrics"][0]["scopeMetrics"][0]["metrics"][0]["sum"]["dataPoints"][0]["asInt"] != nil
# Extract metric labels from dataPoints and resource attributes
- merge_maps(attributes, ParseJSON(body)["resourceMetrics"][0]["scopeMetrics"][0]["metrics"][0]["sum"]["dataPoints"][0]["attributes"], "upsert") where ParseJSON(body)["resourceMetrics"] != nil and ParseJSON(body)["resourceMetrics"][0]["scopeMetrics"][0]["metrics"][0]["sum"]["dataPoints"][0]["attributes"] != nil
- merge_maps(attributes, ParseJSON(body)["resourceMetrics"][0]["resource"]["attributes"], "upsert") where ParseJSON(body)["resourceMetrics"] != nil and ParseJSON(body)["resourceMetrics"][0]["resource"]["attributes"] != nil
- set(body, attributes) where attributes["_metric_parsed"] == true
# For logs that are received from or something similar filelog instead of OTLP.
# These are expected to be in OCSF format before entering the pipeline.
transform/ocsf:
error_mode: ignore
log_statements:
- context: log
conditions:
- body != nil
statements:
- set(observed_time, Now()) where observed_time_unix_nano == 0
- set(time, observed_time) where time_unix_nano == 0
# Extract policy.rule.id from OCSF policy.uid field (procedure ID for Compass mapper lookup)
- set(attributes["policy.rule.id"], ParseJSON(body)["policy"]["uid"]) where ParseJSON(body)["policy"]["uid"] != nil
# Extract control ID from policy.data.sources[0].name as a separate attribute (don't overwrite policy.rule.id)
# Note: policy.rule.id must match procedure.id in plan.yml for Compass mapper to work correctly
- set(attributes["policy.control.id"], ParseJSON(ParseJSON(body)["policy"]["data"])["sources"][0]["name"]) where ParseJSON(body)["policy"]["data"] != nil and ParseJSON(ParseJSON(body)["policy"]["data"])["sources"] != nil and ParseJSON(ParseJSON(body)["policy"]["data"])["sources"][0]["name"] != nil
# Extract policy.data.config.include (first element of array) as attribute
- set(attributes["policy.config.include"], ParseJSON(ParseJSON(body)["policy"]["data"])["sources"][0]["config"]["include"][0]) where ParseJSON(body)["policy"]["data"] != nil and ParseJSON(ParseJSON(body)["policy"]["data"])["sources"] != nil and ParseJSON(ParseJSON(body)["policy"]["data"])["sources"][0]["config"] != nil and ParseJSON(ParseJSON(body)["policy"]["data"])["sources"][0]["config"]["include"] != nil and Len(ParseJSON(ParseJSON(body)["policy"]["data"])["sources"][0]["config"]["include"]) > 0
# Extract policy.engine.name from OCSF metadata.product.name field
- set(attributes["policy.engine.name"], ParseJSON(body)["metadata"]["product"]["name"]) where ParseJSON(body)["metadata"]["product"]["name"] != nil
# Extract policy.evaluation.result from OCSF status field (mapped to Compass enum values)
- set(attributes["policy.evaluation.result"], "Passed") where ParseJSON(body)["status"] == "success"
- set(attributes["policy.evaluation.result"], "Failed") where ParseJSON(body)["status"] == "failure"
- set(attributes["policy.evaluation.result"], "Not Run") where ParseJSON(body)["status"] == "not_run"
- set(attributes["policy.evaluation.result"], "Needs Review") where ParseJSON(body)["status"] == "needs_review"
- set(attributes["policy.evaluation.result"], "Not Applicable") where ParseJSON(body)["status"] == "not_applicable"
- set(attributes["policy.evaluation.result"], "Unknown") where ParseJSON(body)["status"] == "unknown" or ParseJSON(body)["status"] == "error" or ParseJSON(body)["status"] == "timeout"
# Set default Unknown if status is not recognized
- set(attributes["policy.evaluation.result"], "Unknown") where ParseJSON(body)["status"] != nil and attributes["policy.evaluation.result"] == nil
# Extract severity from OCSF evidence
- set(attributes["severity"], ParseJSON(body)["severity"]) where ParseJSON(body)["severity"] != nil
# Set policy.rule.id as resource attribute for S3 partitioning
- set(resource.attributes["policy.rule.id"], attributes["policy.rule.id"]) where attributes["policy.rule.id"] != nil
connectors:
# SignalToMetrics: converts enriched logs to metrics (exporter from logs, receiver to metrics)
signaltometrics:
logs:
# Control evaluation count metric
- name: compliance.control.evaluations
description: "Total number of control evaluations"
unit: "1"
sum:
value: "1" # increment by 1 for each log record
# Control evaluation result metric
- name: compliance.control.evaluation.result
description: "Count of control evaluations by result (Passed/Failed/etc)"
unit: "1"
sum:
value: "1"
# Control compliance status metric
- name: compliance.control.status
description: "Count of control evaluations by compliance status"
unit: "1"
sum:
value: "1"
# Policy config include metric - associates config.include with control ID
- name: compliance.control.config.include
description: "Count of control evaluations by policy config include value"
unit: "1"
sum:
value: "1"
# Control evaluations by policy engine
- name: compliance.control.by.engine
description: "Count of control evaluations by policy engine"
unit: "1"
sum:
value: "1"
# Control evaluations by control category
- name: compliance.control.by.category
description: "Count of control evaluations by control category"
unit: "1"
sum:
value: "1"
# Control evaluations by enrichment status
- name: compliance.control.by.enrichment.status
description: "Count of control evaluations by enrichment status"
unit: "1"
sum:
value: "1"
# Control evaluations by severity
- name: compliance.control.by.severity
description: "Count of control evaluations by severity level"
unit: "1"
sum:
value: "1"
exporters:
debug:
verbosity: detailed
otlphttp/logs:
endpoint: "http://loki:3100/otlp"
tls:
insecure: true
# File exporter: writes metrics as JSON for filelog receiver
file/metrics:
path: /data/metrics.jsonl
format: json
awss3/logs:
s3uploader:
region: ${AWS_REGION}
s3_bucket: ${S3_BUCKETNAME}
s3_prefix: ${S3_OBJ_DIR}
endpoint: ${S3_ENDPOINT}
s3_force_path_style: true
disable_ssl: true
unique_key_func_name: uuidv7
s3_partition_format: ""
file_prefix: "evidence_"
resource_attrs_to_s3:
s3_prefix: "policy.rule.id"
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [debug]
# Metrics pipeline: SignalToMetrics -> file -> filelog -> Loki
metrics/control_health:
receivers: [signaltometrics]
processors: [batch/metrics]
exporters: [debug, file/metrics]
# Logs pipeline: reads metrics from file and exports to Loki
logs/metrics_from_file:
receivers: [filelog/metrics]
processors: [batch/metrics, transform/metrics_to_logs]
exporters: [debug, otlphttp/logs]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [debug]
logs/analysis_pipeline:
receivers: [webhookevent, otlp]
processors: [batch, transform/ocsf, truthbeam]
exporters: [debug, otlphttp/logs, awss3/logs, signaltometrics]