Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/core/inspection/taskbase/loggroup_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ type LogGroup struct {
// LogGroupMap is a map of log groups, where the key is the group identifier.
type LogGroupMap = map[string]*LogGroup

// LogGrouper defines a function that returns a group key for a given log.
type LogGrouper = func(ctx context.Context, log *log.Log) string
// LogGrouperFunc defines a function that returns a group key for a given log.
type LogGrouperFunc = func(ctx context.Context, log *log.Log) string

// NewLogGrouperTask creates a task that groups logs based on a grouper function.
// It processes a list of logs and organizes them into a map of LogGroup,
// where each group contains logs with the same key.
func NewLogGrouperTask(taskId taskid.TaskImplementationID[LogGroupMap], logTask taskid.TaskReference[[]*log.Log], grouper LogGrouper) coretask.Task[LogGroupMap] {
func NewLogGrouperTask(taskId taskid.TaskImplementationID[LogGroupMap], logTask taskid.TaskReference[[]*log.Log], grouper LogGrouperFunc) coretask.Task[LogGroupMap] {
return NewProgressReportableInspectionTask(taskId, []taskid.UntypedTaskReference{
logTask,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/inspection/taskbase/loggroup_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestNwewLogGrouperTask(t *testing.T) {
name string
taskMode inspectioncore_contract.InspectionTaskModeType
logYamls []string
logGrouper LogGrouper
logGrouper LogGrouperFunc
resultLogIDs map[string][]string
}{
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"go.opentelemetry.io/otel/trace"
)

// NewLogSerializerTask store its given logs to history to prepare the history type to have ChangeSet associated with the log.
// This must be called before HistoryModifier and Logs must be discarded before this task if it shouldn't be included in the result.
func NewLogSerializerTask(taskID taskid.TaskImplementationID[[]*log.Log], input taskid.TaskReference[[]*log.Log]) coretask.Task[[]*log.Log] {
// NewLogIngesterTask returns a task that stores its given logs to history to prepare the history type to have ChangeSet associated with the log.
// This must be called before LogToTimelineMapperTask and Logs must be discarded before this task if it shouldn't be included in the result.
func NewLogIngesterTask(taskID taskid.TaskImplementationID[[]*log.Log], input taskid.TaskReference[[]*log.Log]) coretask.Task[[]*log.Log] {
return NewProgressReportableInspectionTask(taskID, []taskid.UntypedTaskReference{input}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) ([]*log.Log, error) {
if taskMode == inspectioncore_contract.TaskModeDryRun {
return []*log.Log{}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
"github.com/GoogleCloudPlatform/khi/pkg/testutil/testlog"
)

func TestLogSerializerTask_DryRunMode(t *testing.T) {
func TestLogIngesterTask_DryRunMode(t *testing.T) {
l := testlog.MustLogFromYAML("insertId: foo", &mockCommonLogFieldSetReader{})
ctx := inspectiontest.WithDefaultTestInspectionTaskContext(t.Context())
inputTaskID := taskid.NewDefaultImplementationID[[]*log.Log]("input")
taskID := taskid.NewDefaultImplementationID[[]*log.Log]("test")
task := NewLogSerializerTask(taskID, inputTaskID.Ref())
task := NewLogIngesterTask(taskID, inputTaskID.Ref())

result, _, err := inspectiontest.RunInspectionTask(ctx, task, inspectioncore_contract.TaskModeDryRun, map[string]any{},
tasktest.NewTaskDependencyValuePair(inputTaskID.Ref(), []*log.Log{l}))
Expand All @@ -40,22 +40,22 @@ func TestLogSerializerTask_DryRunMode(t *testing.T) {
}

if len(result) != 0 {
t.Errorf("LogSerializerTask returned a log result for dryrun mode")
t.Errorf("LogIngesterTask returned a log result for dryrun mode")
}

builder := khictx.MustGetValue(ctx, inspectioncore_contract.CurrentHistoryBuilder)
_, err = builder.GetLog(l.ID)
if err == nil {
t.Errorf("LogSerializerTask must not write serialzied log to the builder when it run for dryrun, but it generated a serialized log.")
t.Errorf("LogIngesterTask must not write log to the builder when it run for dryrun, but it wrote a log.")
}
}

func TestLogSerializerTask_RunMode(t *testing.T) {
func TestLogIngesterTask_RunMode(t *testing.T) {
l := testlog.MustLogFromYAML("insertId: foo", &mockCommonLogFieldSetReader{})
ctx := inspectiontest.WithDefaultTestInspectionTaskContext(t.Context())
inputTaskID := taskid.NewDefaultImplementationID[[]*log.Log]("input")
taskID := taskid.NewDefaultImplementationID[[]*log.Log]("test")
task := NewLogSerializerTask(taskID, inputTaskID.Ref())
task := NewLogIngesterTask(taskID, inputTaskID.Ref())

result, _, err := inspectiontest.RunInspectionTask(ctx, task, inspectioncore_contract.TaskModeRun, map[string]any{},
tasktest.NewTaskDependencyValuePair(inputTaskID.Ref(), []*log.Log{l}))
Expand All @@ -64,12 +64,12 @@ func TestLogSerializerTask_RunMode(t *testing.T) {
}

if len(result) != 1 {
t.Errorf("LogSerializerTask didn't return a log result for run mode")
t.Errorf("LogIngesterTask didn't return a log result for run mode")
}

builder := khictx.MustGetValue(ctx, inspectioncore_contract.CurrentHistoryBuilder)
_, err = builder.GetLog(l.ID)
if err != nil {
t.Errorf("LogSerializerTask must write serialzied log to the builder when it run. err=%v", err)
t.Errorf("LogIngesterTask must write log to the builder when it run. err=%v", err)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,28 @@ import (
"go.opentelemetry.io/otel/trace"
)

// HistoryModifer defines the interface for modifying the History with change sets based on log entries.
// Implementations of this interface can be used to customize how log data is transformed into
// structured history.
// To process data generated from processing the last log in the same group, the method ModifyChangeSetFromLog receive and return a variable typed T.
type HistoryModifer[T any] interface {
// LogSerializerTask is one of prerequiste task of HistoryModifier serializes its logs to history data before processing with this modifier.
LogSerializerTask() taskid.TaskReference[[]*log.Log]
// Dependencies are the additional references used in history modifier.
// LogToTimelineMapper defines the interface for mapping logs to timeline elements (events or revisions).
// Implementations of this interface can be used to customize how log data is transformed into timeline elements.
// To process data generated from processing the last log in the same group, the method ProcessLogByGroup receives and returns a variable typed T.
type LogToTimelineMapper[T any] interface {
// LogIngesterTask is one of prerequisite task of LogToTimelineMapper ingesting logs to history data before processing with this mapper.
LogIngesterTask() taskid.TaskReference[[]*log.Log]
// Dependencies are the additional references used in timeline mapper.
Dependencies() []taskid.UntypedTaskReference
// GroupedLogTask returns a reference to the task that provides the grouped logs.
GroupedLogTask() taskid.TaskReference[LogGroupMap]
// ModifyChangeSetFromLog is called for each log entry to modify the corresponding ChangeSet.
// ProcessLogByGroup is called for each log entry to modify the corresponding ChangeSet.
// This method allows for custom logic to be applied during the history building process.
// The prevGroupData is the returned value from the last procesed log in the same group.
ModifyChangeSetFromLog(ctx context.Context, l *log.Log, cs *history.ChangeSet, builder *history.Builder, prevGroupData T) (T, error)
ProcessLogByGroup(ctx context.Context, l *log.Log, cs *history.ChangeSet, builder *history.Builder, prevGroupData T) (T, error)
}

// NewHistoryModifierTask creates a task that modifies the history builder based on grouped logs.
// It processes logs in parallel and applies the logic from the provided HistoryModifer
// NewLogToTimelineMapperTask creates a task that modifies the history builder based on grouped logs.
// It processes logs in parallel and applies the logic from the provided LogToTimelineMapper
// to build a comprehensive history of events.
func NewHistoryModifierTask[T any](tid taskid.TaskImplementationID[struct{}], historyModifier HistoryModifer[T], labels ...coretask.LabelOpt) coretask.Task[struct{}] {
groupedLogTaskID := historyModifier.GroupedLogTask()
dependencies := append([]taskid.UntypedTaskReference{historyModifier.LogSerializerTask(), historyModifier.GroupedLogTask()}, historyModifier.Dependencies()...)
func NewLogToTimelineMapperTask[T any](tid taskid.TaskImplementationID[struct{}], mapper LogToTimelineMapper[T], labels ...coretask.LabelOpt) coretask.Task[struct{}] {
groupedLogTaskID := mapper.GroupedLogTask()
dependencies := append([]taskid.UntypedTaskReference{mapper.LogIngesterTask(), mapper.GroupedLogTask()}, mapper.Dependencies()...)
return NewProgressReportableInspectionTask(tid, dependencies, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, tp *inspectionmetadata.TaskProgressMetadata) (struct{}, error) {
if taskMode == inspectioncore_contract.TaskModeDryRun {
slog.DebugContext(ctx, "Skipping task because this is dry run mode")
Expand Down Expand Up @@ -91,7 +90,7 @@ func NewHistoryModifierTask[T any](tid taskid.TaskImplementationID[struct{}], hi
for _, l := range group.Logs {
cs := history.NewChangeSet(l)
var err error
groupData, err = historyModifier.ModifyChangeSetFromLog(ctx, l, cs, builder, groupData)
groupData, err = mapper.ProcessLogByGroup(ctx, l, cs, builder, groupData)
if err != nil {
var yaml string
yamlBytes, err2 := l.Serialize("", &structured.YAMLNodeSerializer{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,32 @@ import (
inspectioncore_contract "github.com/GoogleCloudPlatform/khi/pkg/task/inspection/inspectioncore/contract"
)

var mockHistoryModifierPrevTaskID = taskid.NewDefaultImplementationID[LogGroupMap]("mock-history-modifier-prev")
var mockLogToTimelineMapperPrevTaskID = taskid.NewDefaultImplementationID[LogGroupMap]("mock-timeline-mapper-prev")

var mockLogSerializerPrevTaskID = taskid.NewDefaultImplementationID[[]*log.Log]("mock-history-modifier-prev-log-serializer")
var mockLogSerializerPrevTaskID = taskid.NewDefaultImplementationID[[]*log.Log]("mock-timeline-mapper-prev-log-serializer")

type mockHistoryModifierGroupData struct {
type mockLogToTimelineMapperGroupData struct {
CurrentGroupLogCount int
}

type mockHistoryModifier struct {
type mockLogToTimelineMapper struct {
}

// GroupedLogTask implements HistoryModifer.
func (m *mockHistoryModifier) GroupedLogTask() taskid.TaskReference[LogGroupMap] {
return mockHistoryModifierPrevTaskID.Ref()
// GroupedLogTask implements LogToTimelineMapper.
func (m *mockLogToTimelineMapper) GroupedLogTask() taskid.TaskReference[LogGroupMap] {
return mockLogToTimelineMapperPrevTaskID.Ref()
}

func (m *mockHistoryModifier) LogSerializerTask() taskid.TaskReference[[]*log.Log] {
func (m *mockLogToTimelineMapper) LogIngesterTask() taskid.TaskReference[[]*log.Log] {
return mockLogSerializerPrevTaskID.Ref()
}

func (m *mockHistoryModifier) Dependencies() []taskid.UntypedTaskReference {
func (m *mockLogToTimelineMapper) Dependencies() []taskid.UntypedTaskReference {
return []taskid.UntypedTaskReference{}
}

// ModifyChangeSetFromLog implements HistoryModifer.
func (m *mockHistoryModifier) ModifyChangeSetFromLog(ctx context.Context, l *log.Log, cs *history.ChangeSet, builder *history.Builder, prevData mockHistoryModifierGroupData) (mockHistoryModifierGroupData, error) {
// ProcessLogByGroup implements LogToTimelineMapper.
func (m *mockLogToTimelineMapper) ProcessLogByGroup(ctx context.Context, l *log.Log, cs *history.ChangeSet, builder *history.Builder, prevData mockLogToTimelineMapperGroupData) (mockLogToTimelineMapperGroupData, error) {
// encode current group count to severity to use them assert in tasecases to verify the prevData is correctly handled.
switch prevData.CurrentGroupLogCount {
case 0:
Expand All @@ -70,7 +70,7 @@ func (m *mockHistoryModifier) ModifyChangeSetFromLog(ctx context.Context, l *log
}
shouldErr := l.ReadBoolOrDefault("error", false)
if shouldErr {
return mockHistoryModifierGroupData{
return mockLogToTimelineMapperGroupData{
CurrentGroupLogCount: prevData.CurrentGroupLogCount + 1,
}, fmt.Errorf("test error")
}
Expand All @@ -80,12 +80,12 @@ func (m *mockHistoryModifier) ModifyChangeSetFromLog(ctx context.Context, l *log
l.ReadStringOrDefault("namespace", "unknown"),
l.ReadStringOrDefault("name", "unknown"),
))
return mockHistoryModifierGroupData{
return mockLogToTimelineMapperGroupData{
CurrentGroupLogCount: prevData.CurrentGroupLogCount + 1,
}, nil
}

var _ HistoryModifer[mockHistoryModifierGroupData] = (*mockHistoryModifier)(nil)
var _ LogToTimelineMapper[mockLogToTimelineMapperGroupData] = (*mockLogToTimelineMapper)(nil)

type mockCommonLogFieldSetReader struct {
}
Expand Down Expand Up @@ -118,7 +118,7 @@ func mustNewLogFromYAML(t *testing.T, yaml string) *log.Log {
return l
}

func TestHistoryModifierTask(t *testing.T) {
func TestLogToTimelineMapperTask(t *testing.T) {
testCases := []struct {
desc string
taskMode inspectioncore_contract.InspectionTaskModeType
Expand Down Expand Up @@ -268,11 +268,11 @@ func TestHistoryModifierTask(t *testing.T) {

for _, testCase := range testCases {
t.Run(testCase.desc, func(t *testing.T) {
tid := taskid.NewDefaultImplementationID[struct{}]("mock-history-modifier")
tid := taskid.NewDefaultImplementationID[struct{}]("mock-timeline-mapper")

ctx := context.Background()
ctx = inspectiontest.WithDefaultTestInspectionTaskContext(ctx)
task := NewHistoryModifierTask(tid, &mockHistoryModifier{})
task := NewLogToTimelineMapperTask(tid, &mockLogToTimelineMapper{})
builder := khictx.MustGetValue(ctx, inspectioncore_contract.CurrentHistoryBuilder)

for _, group := range testCase.prevLogGroupMap {
Expand All @@ -282,7 +282,7 @@ func TestHistoryModifierTask(t *testing.T) {
}
}

_, _, err := inspectiontest.RunInspectionTask(ctx, task, testCase.taskMode, map[string]any{}, tasktest.NewTaskDependencyValuePair(mockHistoryModifierPrevTaskID.Ref(), testCase.prevLogGroupMap))
_, _, err := inspectiontest.RunInspectionTask(ctx, task, testCase.taskMode, map[string]any{}, tasktest.NewTaskDependencyValuePair(mockLogToTimelineMapperPrevTaskID.Ref(), testCase.prevLogGroupMap))
if (err != nil) != testCase.wantError {
t.Fatalf("RunInspectionTask() error = %v, wantError %v", err, testCase.wantError)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ type ResourceChangeEvent struct {
EventTargetBodyReader *structured.NodeReader
}

// ManifestHistoryModifierTaskSetting is the setting for the manifest history modifier task.
type ManifestHistoryModifierTaskSetting[T any] interface {
// ManifestLogToTimelineMapperTaskSetting is the setting for the manifest timeline mapper task.
type ManifestLogToTimelineMapperTaskSetting[T any] interface {
// TaskID returns the task ID.
TaskID() taskid.TaskImplementationID[struct{}]
// LogSerializerTask returns the task reference for the log serializer task.
LogSerializerTask() taskid.TaskReference[[]*log.Log]
// LogIngesterTask returns the task reference for the log serializer task.
LogIngesterTask() taskid.TaskReference[[]*log.Log]
// GroupedLogTask returns the task reference for the grouped log task.
GroupedLogTask() taskid.TaskReference[ResourceManifestLogGroupMap]
// Dependencies returns the dependencies of the task.
Expand All @@ -104,8 +104,8 @@ type ManifestHistoryModifierTaskSetting[T any] interface {
Process(ctx context.Context, passIndex int, event ResourceChangeEvent, cs *history.ChangeSet, builder *history.Builder, prevGroupData T) (T, error)
}

// NewManifestHistoryModifier creates a new manifest history modifier task.
// ManifestHistoryModifier is a task that generates a timeline of resource changes based on the processed manifests.
// NewManifestLogToTimelineMapper creates a new timeline mapper task but from resource logs.
// ManifestLogToTimelineMapper is a task that generates a timeline of resource changes based on the processed manifests.
// It is designed to handle the relationship between two resources (Source and Target) and generate revisions for the Target resource based on the changes in the Source resource.
// For example, it can be used to generate a timeline of Pod status changes based on the Pod resource itself (Source=None, Target=Pod), or to generate a timeline of binding subresource but deleted when its parent Pod is deleted (Source=Pod, Target=Source pod's binding).
// The setting has ResourcePairs method that returns the resource pairs to know these pairs of target and source.
Expand All @@ -116,8 +116,8 @@ type ManifestHistoryModifierTaskSetting[T any] interface {
// Type Parameter T:
// The type parameter T represents the state that is passed between Process calls for the same resource pair.
// This allows the implementation to track the history of the resource and detect changes.
func NewManifestHistoryModifier[T any](setting ManifestHistoryModifierTaskSetting[T]) coretask.Task[struct{}] {
dependencies := append([]taskid.UntypedTaskReference{setting.LogSerializerTask(), setting.GroupedLogTask()}, setting.Dependencies()...)
func NewManifestLogToTimelineMapper[T any](setting ManifestLogToTimelineMapperTaskSetting[T]) coretask.Task[struct{}] {
dependencies := append([]taskid.UntypedTaskReference{setting.LogIngesterTask(), setting.GroupedLogTask()}, setting.Dependencies()...)
return inspectiontaskbase.NewProgressReportableInspectionTask(setting.TaskID(), dependencies, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, tp *inspectionmetadata.TaskProgressMetadata) (struct{}, error) {
if taskMode == inspectioncore_contract.TaskModeDryRun {
slog.DebugContext(ctx, "Skipping task because this is dry run mode")
Expand Down
Loading