Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions manifests/03-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ rules:
- get
- list
- watch
- apiGroups:
- config.openshift.io
resources:
- clusteroperators/status
verbs:
- update
- apiGroups:
- policy
resources:
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/gather_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (g *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) er
}

// upload data
insightsRequestID, statusCode, err := uploader.Upload(ctx, lastArchive)
insightsRequestID, statusCode, err := uploader.Upload(ctx, lastArchive, configClient.ConfigV1())
dataUploadedCon := status.DataUploadedCondition(
metav1.ConditionTrue,
status.SucceededReason,
Expand Down
37 changes: 36 additions & 1 deletion pkg/insights/insightsuploader/insightsuploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"archive/tar"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -12,10 +13,13 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

configv1 "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1"
"github.com/openshift/insights-operator/pkg/authorizer"
"github.com/openshift/insights-operator/pkg/config/configobserver"
"github.com/openshift/insights-operator/pkg/controller/status"
"github.com/openshift/insights-operator/pkg/controllerstatus"
"github.com/openshift/insights-operator/pkg/insights/insightsclient"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type Authorizer interface {
Expand Down Expand Up @@ -201,7 +205,7 @@ func (c *Controller) ArchiveUploaded() <-chan struct{} {

// Upload is an alternative simple upload method used only in TechPreview clusters.
// Returns Insights request ID and error=nil in case of successful data upload.
func (c *Controller) Upload(ctx context.Context, s *insightsclient.Source) (string, int, error) {
func (c *Controller) Upload(ctx context.Context, s *insightsclient.Source, configClient configv1.ConfigV1Interface) (string, int, error) {
defer s.Contents.Close()
start := time.Now()
s.ID = start.Format(time.RFC3339)
Expand All @@ -223,7 +227,13 @@ func (c *Controller) Upload(ctx context.Context, s *insightsclient.Source) (stri
if err != nil {
return "", statusCode, err
}

klog.Infof("Uploaded report successfully in %s", time.Since(start))

if err := updateClusterOperatorLastReportTime(ctx, configClient); err != nil {
klog.Errorf("Failed to update ClusterOperator lastReportTime: %v", err)
}

return requestID, statusCode, nil
}

Expand All @@ -245,3 +255,28 @@ func reportToLogs(source io.Reader) error {
}
return nil
}

// Update the ClusterOperator's lastReportTime extension field
func updateClusterOperatorLastReportTime(ctx context.Context, client configv1.ConfigV1Interface) error {
insightsCo, err := client.ClusterOperators().Get(ctx, "insights", metav1.GetOptions{})
if err != nil {
return err
}

reported := status.Reported{
LastReportTime: metav1.Time{Time: time.Now().UTC()},
}

data, err := json.Marshal(reported)
if err != nil {
return fmt.Errorf("unable to marshal status extension: %v", err)
}
insightsCo.Status.Extension.Raw = data

if _, err := client.ClusterOperators().UpdateStatus(ctx, insightsCo, metav1.UpdateOptions{}); err != nil {
return err
}

klog.Infof("Successfully updated LastReportTime to %s", reported.LastReportTime)
return nil
}
160 changes: 160 additions & 0 deletions pkg/insights/insightsuploader/insightsuploader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package insightsuploader

import (
"context"
"encoding/json"
"testing"
"time"

configv1 "github.com/openshift/api/config/v1"
configfake "github.com/openshift/client-go/config/clientset/versioned/fake"
"github.com/openshift/insights-operator/pkg/controller/status"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)

func Test_updateClusterOperatorLastReportTime(t *testing.T) {
tests := []struct {
name string
clusterOperator *configv1.ClusterOperator
expErr bool
expTimeSet bool
expErrContains string
}{
{
name: "Successfully updates lastReportTime on existing ClusterOperator",
clusterOperator: &configv1.ClusterOperator{
ObjectMeta: metav1.ObjectMeta{
Name: "insights",
},
Status: configv1.ClusterOperatorStatus{
Extension: runtime.RawExtension{
Raw: []byte(`{}`),
},
},
},
expErr: false,
expTimeSet: true,
},
{
name: "Updates lastReportTime when extension has existing data",
clusterOperator: &configv1.ClusterOperator{
ObjectMeta: metav1.ObjectMeta{
Name: "insights",
},
Status: configv1.ClusterOperatorStatus{
Extension: runtime.RawExtension{
Raw: []byte(`{"lastReportTime":"2024-01-01T00:00:00Z"}`),
},
},
},
expErr: false,
expTimeSet: true,
},
{
name: "Returns error when ClusterOperator doesn't exist",
clusterOperator: nil,
expErr: true,
expTimeSet: false,
expErrContains: "not found",
},
{
name: "Handles invalid JSON in extension by overwriting",
clusterOperator: &configv1.ClusterOperator{
ObjectMeta: metav1.ObjectMeta{
Name: "insights",
},
Status: configv1.ClusterOperatorStatus{
Extension: runtime.RawExtension{
Raw: []byte(`{invalid json}`),
},
},
},
expErr: false,
expTimeSet: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var operators []runtime.Object
if tt.clusterOperator != nil {
operators = append(operators, tt.clusterOperator)
}
fakeClient := configfake.NewSimpleClientset(operators...)

timeBefore := time.Now().UTC().Truncate(time.Second)

ctx := context.Background()
err := updateClusterOperatorLastReportTime(ctx, fakeClient.ConfigV1())

timeAfter := time.Now().UTC().Truncate(time.Second).Add(time.Second)

if tt.expErr {
assert.Error(t, err)
if tt.expErrContains != "" {
assert.Contains(t, err.Error(), tt.expErrContains)
}
return
}

assert.NoError(t, err)

if tt.expTimeSet {
reportTime := getLastReportTime(t, ctx, fakeClient).UTC()

assert.True(t, !reportTime.Before(timeBefore),
"LastReportTime (%v) should be at or after timeBefore (%v)", reportTime, timeBefore)
assert.True(t, !reportTime.After(timeAfter),
"LastReportTime (%v) should be at or before timeAfter (%v)", reportTime, timeAfter)
}
})
}
}

func Test_updateClusterOperatorLastReportTime_TimestampProgression(t *testing.T) {
clusterOperator := createBasicClusterOperator()

fakeClient := configfake.NewSimpleClientset(clusterOperator)
ctx := context.Background()

err := updateClusterOperatorLastReportTime(ctx, fakeClient.ConfigV1())
assert.NoError(t, err)

firstTime := getLastReportTime(t, ctx, fakeClient)

time.Sleep(10 * time.Millisecond)

err = updateClusterOperatorLastReportTime(ctx, fakeClient.ConfigV1())
assert.NoError(t, err)

secondTime := getLastReportTime(t, ctx, fakeClient)

assert.True(t, !secondTime.Before(firstTime),
"Second timestamp (%v) should be at or after first timestamp (%v)", secondTime, firstTime)
}

// Create a basic ClusterOperator for testing
func createBasicClusterOperator() *configv1.ClusterOperator {
return &configv1.ClusterOperator{
ObjectMeta: metav1.ObjectMeta{
Name: "insights",
},
Status: configv1.ClusterOperatorStatus{},
}
}

// Retrieve and unmarshal the LastReportTime
func getLastReportTime(t *testing.T, ctx context.Context, client *configfake.Clientset) time.Time {
updatedCo, err := client.ConfigV1().ClusterOperators().Get(ctx, "insights", metav1.GetOptions{})
assert.NoError(t, err)
assert.NotNil(t, updatedCo.Status.Extension.Raw)

var reported status.Reported
err = json.Unmarshal(updatedCo.Status.Extension.Raw, &reported)
assert.NoError(t, err)
assert.False(t, reported.LastReportTime.IsZero())

return reported.LastReportTime.Time
}