Skip to content

Commit 304218f

Browse files
authored
Merge pull request #386 from drey/fix/standalone-pod-tracking
fix: add standalone pod tracking support
2 parents 4ae22b4 + 119a05f commit 304218f

File tree

1 file changed

+150
-0
lines changed

1 file changed

+150
-0
lines changed

pkg/trackers/dyntracker/dynamic_readiness_tracker.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ func NewDynamicReadinessTracker(
151151
IgnoreLogs: opts.IgnoreLogs,
152152
IgnoreReadinessProbeFailsByContainerName: ignoreReadinessProbeFailsByContainerName,
153153
})
154+
case schema.GroupKind{Group: "", Kind: "Pod"}:
155+
tracker = pod.NewTracker(resourceName, resourceNamespace, staticClient, informerFactory, pod.Options{
156+
IgnoreReadinessProbeFailsByContainerName: ignoreReadinessProbeFailsByContainerName,
157+
IgnoreLogs: opts.IgnoreLogs,
158+
})
154159
default:
155160
resid := resid.NewResourceID(resourceName, resourceGVK, resid.NewResourceIDOptions{
156161
Namespace: resourceNamespace,
@@ -219,6 +224,10 @@ func (t *DynamicReadinessTracker) Track(ctx context.Context) error {
219224
if err := t.trackCanary(ctx, tracker); err != nil {
220225
return fmt.Errorf("track canary: %w", err)
221226
}
227+
case *pod.Tracker:
228+
if err := t.trackPod(ctx, tracker); err != nil {
229+
return fmt.Errorf("track standalone pod: %w", err)
230+
}
222231
case *generic.Tracker:
223232
if err := t.trackGeneric(ctx, tracker); err != nil {
224233
return fmt.Errorf("track generic: %w", err)
@@ -724,6 +733,117 @@ func (t *DynamicReadinessTracker) trackJob(ctx context.Context, tracker *job.Tra
724733
}
725734
}
726735

736+
func (t *DynamicReadinessTracker) trackPod(ctx context.Context, tracker *pod.Tracker) error {
737+
trackCtx, trackCtxCancelFn := watchtools.ContextWithOptionalTimeout(ctx, t.timeout)
738+
defer trackCtxCancelFn()
739+
740+
doneChan := make(chan struct{})
741+
trackErrCh := make(chan error, 1)
742+
743+
go func() {
744+
if err := tracker.Start(trackCtx); err != nil {
745+
trackErrCh <- err
746+
} else {
747+
doneChan <- struct{}{}
748+
}
749+
}()
750+
751+
for {
752+
select {
753+
case err := <-trackErrCh:
754+
if err != nil && !errors.Is(err, commontracker.ErrStopTrack) {
755+
return fmt.Errorf("track resource %q: %w", t.resourceHumanID, err)
756+
}
757+
758+
return nil
759+
case status := <-tracker.Added:
760+
var (
761+
abort bool
762+
abortErr error
763+
)
764+
t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) {
765+
t.handlePodStatus(&status, ts)
766+
t.setRootResourceCreated(ts)
767+
abort, abortErr = t.handleTaskStateStatus(ts)
768+
})
769+
770+
if abort {
771+
return abortErr
772+
}
773+
case status := <-tracker.Succeeded:
774+
var (
775+
abort bool
776+
abortErr error
777+
)
778+
status.IsReady = true
779+
780+
t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) {
781+
t.handlePodStatus(&status, ts)
782+
abort, abortErr = t.handleTaskStateStatus(ts)
783+
})
784+
785+
if abort {
786+
return abortErr
787+
}
788+
case status := <-tracker.Ready:
789+
var (
790+
abort bool
791+
abortErr error
792+
)
793+
status.IsReady = true
794+
795+
t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) {
796+
t.handlePodStatus(&status, ts)
797+
abort, abortErr = t.handleTaskStateStatus(ts)
798+
})
799+
800+
if abort {
801+
return abortErr
802+
}
803+
case report := <-tracker.Failed:
804+
var (
805+
abort bool
806+
abortErr error
807+
)
808+
report.PodStatus.IsFailed = true
809+
810+
t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) {
811+
t.handlePodStatus(&report.PodStatus, ts)
812+
abort, abortErr = t.handleTaskStateStatus(ts)
813+
})
814+
815+
if abort {
816+
return abortErr
817+
}
818+
case status := <-tracker.Status:
819+
var (
820+
abort bool
821+
abortErr error
822+
)
823+
t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) {
824+
t.handlePodStatus(&status, ts)
825+
abort, abortErr = t.handleTaskStateStatus(ts)
826+
})
827+
828+
if abort {
829+
return abortErr
830+
}
831+
case msg := <-tracker.EventMsg:
832+
t.taskState.RTransaction(func(ts *statestore.ReadinessTaskState) {
833+
t.handleEventMessage(msg, ts, time.Now())
834+
})
835+
case chunk := <-tracker.ContainerLogChunk:
836+
t.taskState.RTransaction(func(ts *statestore.ReadinessTaskState) {
837+
t.logStore.RWTransaction(func(ls *logstore.LogStore) {
838+
t.handlePodLogChunk(&pod.PodLogChunk{ContainerLogChunk: chunk, PodName: tracker.ResourceName}, ls, ts)
839+
})
840+
})
841+
case <-doneChan:
842+
return nil
843+
}
844+
}
845+
}
846+
727847
func (t *DynamicReadinessTracker) trackCanary(ctx context.Context, tracker *canary.Tracker) error {
728848
trackCtx, trackCtxCancelFn := watchtools.ContextWithOptionalTimeout(ctx, t.timeout)
729849
defer trackCtxCancelFn()
@@ -952,6 +1072,36 @@ func (t *DynamicReadinessTracker) handlePodsFromJobStatus(status *job.JobStatus,
9521072
}
9531073
}
9541074

1075+
func (t *DynamicReadinessTracker) handlePodStatus(status *pod.PodStatus, taskState *statestore.ReadinessTaskState) {
1076+
taskState.AddResourceState(status.Name, taskState.Namespace(), podGvk)
1077+
1078+
if status.StatusIndicator != nil {
1079+
taskState.ResourceState(status.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) {
1080+
setPodStatusAttribute(rs, status.StatusIndicator.Value)
1081+
})
1082+
}
1083+
1084+
if status.IsReady {
1085+
taskState.SetStatus(statestore.ReadinessTaskStatusReady)
1086+
1087+
for _, state := range taskState.ResourceStates() {
1088+
state.RWTransaction(func(rs *statestore.ResourceState) {
1089+
rs.SetStatus(statestore.ResourceStatusReady)
1090+
})
1091+
}
1092+
1093+
return
1094+
}
1095+
1096+
if status.IsFailed {
1097+
taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) {
1098+
rs.AddError(errors.New(status.FailedReason), "", time.Now())
1099+
})
1100+
1101+
return
1102+
}
1103+
}
1104+
9551105
func (t *DynamicReadinessTracker) handlePodsFromDeploymentPodAddedReport(report *deployment.PodAddedReport, taskState *statestore.ReadinessTaskState) {
9561106
if !report.ReplicaSetPod.ReplicaSet.IsNew {
9571107
return

0 commit comments

Comments
 (0)