Skip to content

Commit 6c601fa

Browse files
committed
Add LogInegsterTaskV2 and TimelineIngesterTaskV2
1 parent c0894d4 commit 6c601fa

7 files changed

Lines changed: 1122 additions & 0 deletions

File tree

Lines changed: 376 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,376 @@
1+
---
2+
name: log-timeline-mapper-v2
3+
description: Guidelines for implementing and testing LogIngesterTaskV2 and LogToTimelineMapperV2 tasks in KHI.
4+
---
5+
6+
# KHI Log Ingestion & Timeline Mapping Guidelines
7+
8+
This guide outlines the patterns, best practices, and testing methodologies for implementing log ingesters and timeline mappers in KHI.
9+
10+
---
11+
12+
## 1. LogIngesterV2 & LogIngesterTaskV2
13+
14+
`LogIngesterV2` is responsible for parsing raw logs and ingesting basic log metadata (summary, timestamp, severity, log type) into the KHI format.
15+
16+
### Interface Definition
17+
18+
```go
19+
type LogIngesterV2 interface {
20+
// RawLogTask returns the task reference that provides the raw logs to ingest.
21+
RawLogTask() taskid.TaskReference[[]*log.Log]
22+
// Dependencies returns additional task dependencies of the ingester.
23+
Dependencies() []taskid.UntypedTaskReference
24+
// ProcessLog is called for each log entry to customize log metadata.
25+
ProcessLog(ctx context.Context, l *log.Log) (*khifilev6.LogChangeSet, error)
26+
}
27+
```
28+
29+
### Key Implementation Guide
30+
31+
> [!IMPORTANT]
32+
> **ChangeSet Metadata Ingestion:** `LogChangeSet` does NOT automatically fill metadata defaults. You MUST explicitly read fields from `CommonFieldSet` (or custom field sets) and set them manually on the `LogChangeSet` in your `ProcessLog` implementation.
33+
>
34+
> **Skipping Logs:** If `ProcessLog` returns `(nil, nil)`, KHI will treat this log as skipped (ignored) without producing any errors.
35+
36+
#### Implementer Example
37+
38+
```go
39+
type MyLogIngester struct {}
40+
41+
func (i *MyLogIngester) RawLogTask() taskid.TaskReference[[]*log.Log] {
42+
return rawLogTaskID.Ref()
43+
}
44+
45+
func (i *MyLogIngester) Dependencies() []taskid.UntypedTaskReference {
46+
return []taskid.UntypedTaskReference{}
47+
}
48+
49+
// ProcessLog parses raw log entry and manually populates the LogChangeSet.
50+
func (i *MyLogIngester) ProcessLog(ctx context.Context, l *log.Log) (*khifilev6.LogChangeSet, error) {
51+
// 1. Create a new change set.
52+
cs, err := khifilev6.NewLogChangeSet(l)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
// 2. Manually extract fields from CommonFieldSet.
58+
if commonSet, err := log.GetFieldSet(l, &log.CommonFieldSet{}); err == nil {
59+
cs.SetTimestamp(commonSet.Timestamp)
60+
}
61+
62+
// 3. Set severity, summary, etc. manually using pre-registered styles.
63+
cs.SetSeverity(mySeverityStyle)
64+
cs.SetSummary(mySummaryString)
65+
66+
return cs, nil
67+
}
68+
69+
// Explicit interface compliance assertion is mandatory.
70+
var _ inspectiontaskbase.LogIngesterV2 = (*MyLogIngester)(nil)
71+
```
72+
73+
### Registering the LogIngester Task
74+
75+
> [!IMPORTANT]
76+
> **Package Boundaries:**
77+
>
78+
> * **TaskID** definitions (e.g., `LogIngesterTaskIDV2`) MUST be defined in the `contract` package.
79+
> * **Task Implementation** instantiations (e.g., `NewLogIngesterTaskV2`) MUST be placed in the `impl` package.
80+
81+
```go
82+
// Defined in 'contract' package:
83+
var MyLogIngesterTaskID = taskid.NewDefaultImplementationID[[]*log.Log]("my-log-ingester")
84+
85+
// Instantiated in 'impl' package:
86+
task := NewLogIngesterTaskV2(mycontract.MyLogIngesterTaskID, &MyLogIngester{})
87+
// Register task to core runner...
88+
```
89+
90+
---
91+
92+
## 2. LogToTimelineMapperV2
93+
94+
`LogToTimelineMapperV2` maps grouped logs to timeline elements (events or resource revisions). Depending on the complexity, you should choose one of the following three implementation patterns.
95+
96+
### Pattern 1: Multi-Pass with State
97+
98+
Used for complex scenarios where you need to pre-collect information across all logs in a group before applying timeline changes (e.g., matching asynchronous request/response cycles).
99+
100+
* **How to implement:** Implement the full `LogToTimelineMapperV2[T]` interface manually.
101+
102+
```go
103+
type ComplexMapper struct {}
104+
105+
func (m *ComplexMapper) PassCount() int {
106+
return 1 // Run 1 pre-processing pass.
107+
}
108+
109+
func (m *ComplexMapper) PreProcessLogByGroup(ctx context.Context, passIndex int, l *log.Log, prevGroupData MyState) (MyState, error) {
110+
// Pre-collect state from logs.
111+
nextState := analyzeLog(prevGroupData, l)
112+
return nextState, nil
113+
}
114+
115+
func (m *ComplexMapper) ProcessLogByGroup(ctx context.Context, l *log.Log, prevGroupData MyState) (*khifilev6.TimelineChangeSet, MyState, error) {
116+
// 1. Retrieve field data using log.GetFieldSet.
117+
customSet, err := log.GetFieldSet(l, &MyCustomFieldSet{})
118+
if err != nil {
119+
return nil, prevGroupData, err
120+
}
121+
122+
// 2. Retrieve the Builder from context.
123+
builder := khictx.MustGetValue(ctx, inspectioncore_contract.CurrentV6Builder)
124+
125+
// 3. Resolve target path dynamically using the accumulator facade.
126+
targetPath := builder.TimelineAccumulator.GetPath(nil, khifilev6.PathSegment{
127+
Name: "complex-timeline",
128+
Type: mycontract.TimelineTypeComplex, // Timeline styles should be imported from contract package
129+
})
130+
131+
cs := khifilev6.NewTimelineChangeSet(l)
132+
133+
// Add a revision or event conditionally using the pre-collected state and customSet fields.
134+
if prevGroupData.ShouldRegisterRevision(l) {
135+
cs.AddRevision(targetPath, &khifilev6.StagingRevision{
136+
ChangedTime: customSet.Timestamp,
137+
ResourceBody: customSet.Body,
138+
Principal: customSet.Principal,
139+
VerbType: mycontract.VerbCreate,
140+
})
141+
}
142+
143+
return cs, prevGroupData, nil
144+
}
145+
146+
// Explicit interface compliance assertion.
147+
var _ inspectiontaskbase.LogToTimelineMapperV2[MyState] = (*ComplexMapper)(nil)
148+
```
149+
150+
---
151+
152+
### Pattern 2: Single-Pass with State
153+
154+
Used when you need to maintain and propagate state sequentially through the logs in a group, but do not require a pre-processing pass.
155+
156+
* **How to implement:** Embed `SinglePassMapperBase[T]` into your mapper structure. This automatically implements `PassCount() int` (returning 0) and `PreProcessLogByGroup` (returning state as-is).
157+
158+
```go
159+
type StateTrackingMapper struct {
160+
SinglePassMapperBase[MyState] // Embeds single pass helper.
161+
}
162+
163+
func (m *StateTrackingMapper) ProcessLogByGroup(ctx context.Context, l *log.Log, prevGroupData MyState) (*khifilev6.TimelineChangeSet, MyState, error) {
164+
// 1. Retrieve field data using log.GetFieldSet.
165+
customSet, err := log.GetFieldSet(l, &MyCustomFieldSet{})
166+
if err != nil {
167+
return nil, prevGroupData, err
168+
}
169+
170+
// 2. Maintain state.
171+
nextState := updateState(prevGroupData, customSet)
172+
173+
// 3. Retrieve the Builder from context.
174+
builder := khictx.MustGetValue(ctx, inspectioncore_contract.CurrentV6Builder)
175+
176+
// 4. Resolve target path dynamically using the accumulator facade.
177+
targetPath := builder.TimelineAccumulator.GetPath(nil, khifilev6.PathSegment{
178+
Name: "stateful-revision-timeline",
179+
Type: mycontract.TimelineTypeStateful,
180+
})
181+
182+
cs := khifilev6.NewTimelineChangeSet(l)
183+
184+
// Append resource revision history sequentially to the timeline.
185+
cs.AddRevision(targetPath, &khifilev6.StagingRevision{
186+
ChangedTime: customSet.Timestamp,
187+
ResourceBody: customSet.Body,
188+
Principal: customSet.Principal,
189+
VerbType: mycontract.VerbUpdate,
190+
})
191+
192+
return cs, nextState, nil
193+
}
194+
195+
// Explicit interface compliance assertion.
196+
var _ inspectiontaskbase.LogToTimelineMapperV2[MyState] = (*StateTrackingMapper)(nil)
197+
```
198+
199+
---
200+
201+
### Pattern 3: Single-Pass Stateless (Most Common)
202+
203+
Used when timeline mapping for each log is completely independent and does not rely on other logs in the same group.
204+
205+
* **How to implement:** Embed `StatelessMapperBase` into your mapper structure. This binds the state type `T` to `struct{}` and implements the pre-processing methods as no-ops.
206+
207+
```go
208+
type SimpleEventMapper struct {
209+
StatelessMapperBase // Embeds stateless helper.
210+
}
211+
212+
func (m *SimpleEventMapper) ProcessLogByGroup(ctx context.Context, l *log.Log, _ struct{}) (*khifilev6.TimelineChangeSet, struct{}, error) {
213+
// 1. Retrieve the Builder from context.
214+
builder := khictx.MustGetValue(ctx, inspectioncore_contract.CurrentV6Builder)
215+
216+
// 2. Resolve target path dynamically.
217+
targetPath := builder.TimelineAccumulator.GetPath(nil, khifilev6.PathSegment{
218+
Name: "simple-event-timeline",
219+
Type: mycontract.TimelineTypeEvent,
220+
})
221+
222+
cs := khifilev6.NewTimelineChangeSet(l)
223+
224+
// Add a simple timeline event.
225+
cs.AddEvent(targetPath)
226+
227+
return cs, struct{}{}, nil
228+
}
229+
230+
// Explicit interface compliance assertion.
231+
var _ inspectiontaskbase.LogToTimelineMapperV2[struct{}] = (*SimpleEventMapper)(nil)
232+
```
233+
234+
### Registering the TimelineMapper Task
235+
236+
> [!IMPORTANT]
237+
> **Package Boundaries:**
238+
>
239+
> * **TaskID** definitions (e.g., `LogToTimelineMapperTaskIDV2`) MUST be defined in the `contract` package.
240+
> * **Task Implementation** instantiations (e.g., `NewLogToTimelineMapperTaskV2`) MUST be placed in the `impl` package.
241+
242+
```go
243+
// Defined in 'contract' package:
244+
var MyTimelineMapperTaskID = taskid.NewDefaultImplementationID[struct{}]("my-timeline-mapper")
245+
246+
// Instantiated in 'impl' package:
247+
task := NewLogToTimelineMapperTaskV2(mycontract.MyTimelineMapperTaskID, &SimpleEventMapper{})
248+
// Register task to core runner...
249+
```
250+
251+
---
252+
253+
## 3. Testing Guidelines
254+
255+
Tests for V2 tasks must follow the standard Table-Driven testing pattern. To verify mappers or ingesters produced correct outcomes across various scenarios, you should write unit tests utilizing `testchangeset` fluent assertions.
256+
257+
### Table-Driven Fluent Assertions
258+
259+
KHI provides a dedicated test utility `github.com/GoogleCloudPlatform/khi/pkg/testutil/testchangeset` to perform readable assertions against staged changesets. By incorporating `testchangeset.AssertLog` or `testchangeset.AssertTimeline` into your table-driven loop, you can verify multiple test cases cleanly and expressively.
260+
261+
#### LogIngester Table-Driven Assertion Example
262+
263+
To isolate ingester parsing logic, instantiate logs using `log.NewLogWithFieldSetsForTest` and define chainable assertions inside test cases:
264+
265+
```go
266+
func TestMyLogIngester_ProcessLog(t *testing.T) {
267+
testCases := []struct {
268+
name string
269+
input *log.Log
270+
assert func(t *testing.T, cs *khifilev6.LogChangeSet)
271+
}{
272+
{
273+
name: "successful info log ingestion",
274+
input: log.NewLogWithFieldSetsForTest(
275+
&log.CommonFieldSet{
276+
Timestamp: time.Date(2026, 5, 22, 12, 0, 0, 0, time.UTC),
277+
Severity: "INFO",
278+
},
279+
&log.MainMessageFieldSet{
280+
MainMessage: "server started",
281+
},
282+
),
283+
assert: func(t *testing.T, cs *khifilev6.LogChangeSet) {
284+
testchangeset.AssertLog(t, cs).
285+
HasSummary("server started").
286+
HasSeverity(infoSeverityStyle)
287+
},
288+
},
289+
}
290+
291+
ingester := &MyLogIngester{}
292+
for _, tc := range testCases {
293+
t.Run(tc.name, func(t *testing.T) {
294+
// Obtain context dynamically using t.Context()
295+
ctx := t.Context()
296+
297+
cs, err := ingester.ProcessLog(ctx, tc.input)
298+
if err != nil {
299+
t.Fatalf("ProcessLog() returned unexpected error: %v", err)
300+
}
301+
302+
tc.assert(t, cs)
303+
})
304+
}
305+
}
306+
```
307+
308+
#### TimelineMapper Table-Driven Assertion Example
309+
310+
Mappers translate structured log `FieldSet` data into timeline changes. Isolate mapper tests using `log.NewLogWithFieldSetsForTest` and execute assertions using the fluent `changeset` asserter.
311+
312+
> [!IMPORTANT]
313+
> **Shared Builder Reference:** When unit testing mappers that dynamically construct timeline paths via context builder, you MUST initialize a single `khifilev6.Builder` and resolve all comparison `TimelinePath` instances using this builder. Crucially, the same builder instance must be injected into the execution context using `khictx.WithValue` to ensure pointer equality during assertions.
314+
315+
```go
316+
func TestMyTimelineMapper_ProcessLogByGroup(t *testing.T) {
317+
// 1. Initialize the Builder first.
318+
builder := khifilev6.NewBuilder()
319+
320+
// 2. Resolve comparative path instances using the Builder's accumulator.
321+
// TimelineTypes must be imported from the contract package.
322+
resourceTimelinePath := builder.TimelineAccumulator.GetPath(nil, khifilev6.PathSegment{
323+
Name: "resource-timeline",
324+
Type: mycontract.TimelineTypeResource,
325+
})
326+
327+
testCases := []struct {
328+
name string
329+
inputLog *log.Log
330+
prevState MyState
331+
assert func(t *testing.T, cs *khifilev6.TimelineChangeSet)
332+
}{
333+
{
334+
name: "create resource revision",
335+
inputLog: log.NewLogWithFieldSetsForTest(&MyCustomFieldSet{
336+
Verb: "create",
337+
}),
338+
prevState: MyState{},
339+
assert: func(t *testing.T, cs *khifilev6.TimelineChangeSet) {
340+
testchangeset.AssertTimeline(t, cs).
341+
HasEvent(resourceTimelinePath).
342+
HasRevision(resourceTimelinePath, &khifilev6.StagingRevision{
343+
VerbType: mycontract.VerbCreate,
344+
})
345+
},
346+
},
347+
{
348+
name: "skip timeline revision on delete verb",
349+
inputLog: log.NewLogWithFieldSetsForTest(&MyCustomFieldSet{
350+
Verb: "delete",
351+
}),
352+
prevState: MyState{},
353+
assert: func(t *testing.T, cs *khifilev6.TimelineChangeSet) {
354+
testchangeset.AssertTimeline(t, cs).
355+
HasNoEvent(resourceTimelinePath).
356+
HasNoRevision(resourceTimelinePath)
357+
},
358+
},
359+
}
360+
361+
mapper := &MySimpleMapper{}
362+
for _, tc := range testCases {
363+
t.Run(tc.name, func(t *testing.T) {
364+
// 3. Set up the context using t.Context() and SAME builder instance.
365+
ctx := khictx.WithValue(t.Context(), inspectioncore_contract.CurrentV6Builder, builder)
366+
367+
cs, _, err := mapper.ProcessLogByGroup(ctx, tc.inputLog, tc.prevState)
368+
if err != nil {
369+
t.Fatalf("ProcessLogByGroup() returned unexpected error: %v", err)
370+
}
371+
372+
tc.assert(t, cs)
373+
})
374+
}
375+
}
376+
```

0 commit comments

Comments
 (0)