Skip to content

Commit 86bcd9f

Browse files
Arta AsadiArta Asadi
authored andcommitted
fix: give env vars in migrator:
1 parent 3aa4b08 commit 86bcd9f

File tree

6 files changed

+167
-101
lines changed

6 files changed

+167
-101
lines changed

jobs/post-install-job/job/migrations/tasks/migrator.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/opengovern/opensecurity/jobs/post-install-job/db"
1212
"github.com/opengovern/opensecurity/services/tasks/db/models"
1313
"github.com/opengovern/opensecurity/services/tasks/worker"
14+
"github.com/opengovern/opensecurity/services/tasks/worker/consts"
1415
"github.com/xhit/go-str2duration/v2"
1516
"gopkg.in/yaml.v3"
1617
"gorm.io/gorm/clause"
@@ -27,6 +28,16 @@ import (
2728
"go.uber.org/zap"
2829
)
2930

31+
var (
32+
ESAddress = os.Getenv("ELASTICSEARCH_ADDRESS")
33+
ESUsername = os.Getenv("ELASTICSEARCH_USERNAME")
34+
ESPassword = os.Getenv("ELASTICSEARCH_PASSWORD")
35+
ESIsOnAks = os.Getenv("ELASTICSEARCH_ISONAKS")
36+
37+
InventoryBaseURL = os.Getenv("CORE_BASEURL")
38+
NatsURL = os.Getenv("NATS_URL")
39+
)
40+
3041
type Migration struct {
3142
}
3243

@@ -108,6 +119,18 @@ func (m Migration) Run(ctx context.Context, conf config.MigratorConfig, logger *
108119
return err
109120
}
110121

122+
defaultEnvVars := defaultEnvs(&task)
123+
envVarsJsonData, err := json.Marshal(defaultEnvVars)
124+
if err != nil {
125+
return err
126+
}
127+
128+
var envVarsJsonb pgtype.JSONB
129+
err = envVarsJsonb.Set(envVarsJsonData)
130+
if err != nil {
131+
return err
132+
}
133+
111134
timeoutFloat, err := parseToTotalSeconds(task.Timeout)
112135
if err != nil {
113136
return err
@@ -128,6 +151,7 @@ func (m Migration) Run(ctx context.Context, conf config.MigratorConfig, logger *
128151
Timeout: timeoutFloat,
129152
NatsConfig: natsJsonb,
130153
ScaleConfig: scaleJsonb,
154+
EnvVars: envVarsJsonb,
131155
}).Error; err != nil {
132156
return err
133157
}
@@ -327,3 +351,21 @@ func loadCloudqlBinary(itDbm db.Database, logger *zap.Logger, task worker.Task)
327351

328352
return nil
329353
}
354+
355+
func defaultEnvs(taskConfig *worker.Task) map[string]string {
356+
return map[string]string{
357+
consts.NatsURLEnv: NatsURL,
358+
consts.NatsConsumerEnv: taskConfig.NatsConfig.Consumer,
359+
consts.NatsStreamNameEnv: taskConfig.NatsConfig.Stream,
360+
consts.NatsTopicNameEnv: taskConfig.NatsConfig.Topic,
361+
consts.NatsResultTopicNameEnv: taskConfig.NatsConfig.ResultTopic,
362+
consts.ElasticSearchAddressEnv: ESAddress,
363+
consts.ElasticSearchUsernameEnv: ESUsername,
364+
consts.ElasticSearchPasswordEnv: ESPassword,
365+
consts.ElasticSearchIsOnAksNameEnv: ESIsOnAks,
366+
consts.ElasticSearchIsOpenSearch: "false",
367+
consts.ElasticSearchAwsRegionEnv: "",
368+
consts.ElasticSearchAssumeRoleArnEnv: "",
369+
consts.InventoryBaseURL: InventoryBaseURL,
370+
}
371+
}

services/tasks/api/tasks.go

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,48 @@
11
package api
22

3+
import (
4+
"time"
5+
)
6+
37
type TaskListResponse struct {
48
Items []TaskResponse `json:"items"`
59
TotalCount int `json:"total_count"`
610
}
711

812
type TaskResponse struct {
9-
ID string `json:"id"`
10-
Name string `json:"name"`
11-
Description string `json:"description"`
12-
ImageUrl string `json:"image_url"`
13-
Timeout float64 `json:"timeout"`
13+
ID string `json:"id"`
14+
Name string `json:"name"`
15+
Description string `json:"description"`
16+
ImageUrl string `json:"image_url"`
17+
SchedulesNumber int `json:"schedules_number"`
18+
}
19+
20+
type TaskDetailsResponse struct {
21+
ID string `json:"id"`
22+
Name string `json:"name"`
23+
Description string `json:"description"`
24+
ImageUrl string `json:"image_url"`
25+
RunSchedules []RunScheduleObject `json:"run_schedules"`
26+
Credentials []string `json:"credentials"`
27+
EnvVars []string `json:"env_vars"`
28+
ScaleConfig ScaleConfig `json:"scale_config"`
29+
}
30+
31+
type ScaleConfig struct {
32+
Stream string `json:"stream" yaml:"stream"`
33+
Consumer string `json:"consumer" yaml:"consumer"`
34+
LagThreshold string `json:"lag_threshold" yaml:"lag_threshold"`
35+
MinReplica int32 `json:"min_replica" yaml:"min_replica"`
36+
MaxReplica int32 `json:"max_replica" yaml:"max_replica"`
37+
38+
PollingInterval int32 `json:"polling_interval" yaml:"polling_interval"`
39+
CooldownPeriod int32 `json:"cooldown_period" yaml:"cooldown_period"`
40+
}
41+
42+
type RunScheduleObject struct {
43+
LastRun *time.Time `json:"last_run"`
44+
Params map[string]any `json:"params"`
45+
Frequency float64 `json:"frequency"`
1446
}
1547

1648
type RunTaskRequest struct {

services/tasks/db/models/task.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Task struct {
2727
Timeout float64
2828
NatsConfig pgtype.JSONB
2929
ScaleConfig pgtype.JSONB
30+
EnvVars pgtype.JSONB
3031
}
3132

3233
type TaskBinary struct {

services/tasks/http.go

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package tasks
33
import (
44
"crypto/rsa"
55
"encoding/json"
6+
"fmt"
7+
"github.com/jackc/pgtype"
68
api2 "github.com/opengovern/og-util/pkg/api"
79
"github.com/opengovern/og-util/pkg/httpserver"
810
"github.com/opengovern/og-util/pkg/vault"
@@ -98,12 +100,17 @@ func (r *httpRoutes) ListTasks(ctx echo.Context) error {
98100
}
99101
var taskResponses []api.TaskResponse
100102
for _, task := range items {
103+
runSchedules, err := r.db.GetTaskRunSchedules(task.ID)
104+
if err != nil {
105+
r.logger.Error("failed to get task run schedules", zap.Error(err))
106+
return ctx.JSON(http.StatusInternalServerError, "failed to get task run schedules")
107+
}
101108
taskResponses = append(taskResponses, api.TaskResponse{
102-
ID: task.ID,
103-
Name: task.Name,
104-
Description: task.Description,
105-
ImageUrl: task.ImageUrl,
106-
Timeout: task.Timeout,
109+
ID: task.ID,
110+
Name: task.Name,
111+
Description: task.Description,
112+
ImageUrl: task.ImageUrl,
113+
SchedulesNumber: len(runSchedules),
107114
})
108115
}
109116

@@ -129,18 +136,68 @@ func (r *httpRoutes) GetTask(ctx echo.Context) error {
129136
r.logger.Error("failed to get task results", zap.Error(err))
130137
return ctx.JSON(http.StatusInternalServerError, "failed to get task results")
131138
}
132-
var taskResponse api.TaskResponse
133-
taskResponse = api.TaskResponse{
134-
ID: task.ID,
135-
Name: task.Name,
136-
Description: task.Description,
137-
ImageUrl: task.ImageUrl,
138-
Timeout: task.Timeout,
139+
runSchedules, err := r.db.GetTaskRunSchedules(task.ID)
140+
if err != nil {
141+
r.logger.Error("failed to get task run schedules", zap.Error(err))
142+
return ctx.JSON(http.StatusInternalServerError, "failed to get task run schedules")
143+
}
144+
145+
var runSchedulesObjects []api.RunScheduleObject
146+
for _, runSchedule := range runSchedules {
147+
params, err := JSONBToMap(runSchedule.Params)
148+
if err != nil {
149+
r.logger.Error("failed to get task run params", zap.Error(err))
150+
return ctx.JSON(http.StatusInternalServerError, "failed to get task run params")
151+
}
152+
runSchedulesObjects = append(runSchedulesObjects, api.RunScheduleObject{
153+
LastRun: runSchedule.LastRun,
154+
Params: params,
155+
Frequency: runSchedule.Frequency,
156+
})
157+
}
158+
159+
var credentials []string
160+
configSecrets, err := r.db.GetTaskConfigSecret(task.ID)
161+
if err != nil {
162+
r.logger.Error("failed to get task config secret", zap.Error(err))
163+
return ctx.JSON(http.StatusInternalServerError, "failed to get task config secret")
164+
}
165+
if configSecrets != nil {
166+
mapData, err := r.vault.Decrypt(ctx.Request().Context(), configSecrets.Secret)
167+
if err != nil {
168+
r.logger.Error("failed to decrypt secret", zap.Error(err))
169+
return echo.NewHTTPError(http.StatusInternalServerError, "failed to decrypt config")
170+
}
171+
for k := range mapData {
172+
credentials = append(credentials, k)
173+
}
174+
}
175+
176+
taskResponse := api.TaskDetailsResponse{
177+
ID: task.ID,
178+
Name: task.Name,
179+
Description: task.Description,
180+
ImageUrl: task.ImageUrl,
181+
RunSchedules: runSchedulesObjects,
182+
Credentials: credentials,
139183
}
140184

141185
return ctx.JSON(http.StatusOK, taskResponse)
142186
}
143187

188+
func JSONBToMap(jsonb pgtype.JSONB) (map[string]any, error) {
189+
if jsonb.Status != pgtype.Present {
190+
return nil, fmt.Errorf("JSONB data is not present")
191+
}
192+
193+
var result map[string]any
194+
if err := json.Unmarshal(jsonb.Bytes, &result); err != nil {
195+
return nil, fmt.Errorf("failed to unmarshal JSONB: %w", err)
196+
}
197+
198+
return result, nil
199+
}
200+
144201
// RunTask godoc
145202
//
146203
// @Summary Run a new task

services/tasks/scheduler/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (s *MainScheduler) Start(ctx context.Context) error {
6363
if !ok {
6464
return fmt.Errorf("current namespace lookup failed")
6565
}
66-
err = worker.CreateWorker(ctx, s.cfg, s.kubeClient, &task, currentNamespace)
66+
err = worker.CreateWorker(ctx, s.kubeClient, &task, currentNamespace)
6767
if err != nil {
6868
return err
6969
}

services/tasks/worker/worker.go

Lines changed: 17 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -5,37 +5,35 @@ import (
55
"github.com/aws/aws-sdk-go-v2/aws"
66
"github.com/jackc/pgtype"
77
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
8-
"github.com/opengovern/opensecurity/services/tasks/config"
98
"github.com/opengovern/opensecurity/services/tasks/db/models"
10-
"github.com/opengovern/opensecurity/services/tasks/worker/consts"
119
"golang.org/x/net/context"
1210
appsv1 "k8s.io/api/apps/v1"
1311
corev1 "k8s.io/api/core/v1"
1412
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1513
"os"
1614
"sigs.k8s.io/controller-runtime/pkg/client"
17-
"strconv"
1815
)
1916

20-
var (
21-
ESAddress = os.Getenv("ELASTICSEARCH_ADDRESS")
22-
ESUsername = os.Getenv("ELASTICSEARCH_USERNAME")
23-
ESPassword = os.Getenv("ELASTICSEARCH_PASSWORD")
24-
ESIsOnAks = os.Getenv("ELASTICSEARCH_ISONAKS")
25-
26-
InventoryBaseURL = os.Getenv("CORE_BASEURL")
27-
)
28-
29-
func CreateWorker(ctx context.Context, cfg config.Config, kubeClient client.Client, taskConfig *models.Task, namespace string) error {
17+
func CreateWorker(ctx context.Context, kubeClient client.Client, taskConfig *models.Task, namespace string) error {
3018
soNatsUrl, _ := os.LookupEnv("SCALED_OBJECT_NATS_URL")
3119

32-
env, err := defaultEnvs(cfg, taskConfig)
33-
if err != nil {
34-
return err
20+
var envVars map[string]string
21+
if taskConfig.EnvVars.Status == pgtype.Present {
22+
if err := json.Unmarshal(taskConfig.EnvVars.Bytes, &envVars); err != nil {
23+
return err
24+
}
25+
}
26+
27+
var env []corev1.EnvVar
28+
for k, v := range envVars {
29+
env = append(env, corev1.EnvVar{
30+
Name: k,
31+
Value: v,
32+
})
3533
}
3634

3735
var deployment appsv1.Deployment
38-
err = kubeClient.Get(ctx, client.ObjectKey{
36+
err := kubeClient.Get(ctx, client.ObjectKey{
3937
Namespace: namespace,
4038
Name: taskConfig.ID,
4139
}, &deployment)
@@ -77,15 +75,15 @@ func CreateWorker(ctx context.Context, cfg config.Config, kubeClient client.Clie
7775
},
7876
},
7977
}
80-
err := kubeClient.Create(ctx, &deployment)
78+
err = kubeClient.Create(ctx, &deployment)
8179
if err != nil {
8280
return err
8381
}
8482
}
8583

8684
var scaleConfig ScaleConfig
8785
if taskConfig.ScaleConfig.Status == pgtype.Present {
88-
if err := json.Unmarshal(taskConfig.ScaleConfig.Bytes, &scaleConfig); err != nil {
86+
if err = json.Unmarshal(taskConfig.ScaleConfig.Bytes, &scaleConfig); err != nil {
8987
return err
9088
}
9189
}
@@ -139,67 +137,3 @@ func CreateWorker(ctx context.Context, cfg config.Config, kubeClient client.Clie
139137

140138
return nil
141139
}
142-
143-
func defaultEnvs(cfg config.Config, taskConfig *models.Task) ([]corev1.EnvVar, error) {
144-
var natsConfig NatsConfig
145-
if taskConfig.NatsConfig.Status == pgtype.Present {
146-
if err := json.Unmarshal(taskConfig.NatsConfig.Bytes, &natsConfig); err != nil {
147-
return nil, err
148-
}
149-
}
150-
151-
return []corev1.EnvVar{
152-
{
153-
Name: consts.NatsURLEnv,
154-
Value: cfg.NATS.URL,
155-
},
156-
{
157-
Name: consts.NatsConsumerEnv,
158-
Value: natsConfig.Consumer,
159-
},
160-
{
161-
Name: consts.NatsStreamNameEnv,
162-
Value: natsConfig.Stream,
163-
},
164-
{
165-
Name: consts.NatsTopicNameEnv,
166-
Value: natsConfig.Topic,
167-
},
168-
{
169-
Name: consts.NatsResultTopicNameEnv,
170-
Value: natsConfig.ResultTopic,
171-
},
172-
{
173-
Name: consts.ElasticSearchAddressEnv,
174-
Value: ESAddress,
175-
},
176-
{
177-
Name: consts.ElasticSearchUsernameEnv,
178-
Value: ESUsername,
179-
},
180-
{
181-
Name: consts.ElasticSearchPasswordEnv,
182-
Value: ESPassword,
183-
},
184-
{
185-
Name: consts.ElasticSearchIsOnAksNameEnv,
186-
Value: ESIsOnAks,
187-
},
188-
{
189-
Name: consts.ElasticSearchIsOpenSearch,
190-
Value: strconv.FormatBool(cfg.ElasticSearch.IsOpenSearch),
191-
},
192-
{
193-
Name: consts.ElasticSearchAwsRegionEnv,
194-
Value: "",
195-
},
196-
{
197-
Name: consts.ElasticSearchAssumeRoleArnEnv,
198-
Value: "",
199-
},
200-
{
201-
Name: consts.InventoryBaseURL,
202-
Value: InventoryBaseURL,
203-
},
204-
}, nil
205-
}

0 commit comments

Comments
 (0)