Skip to content

Commit cd6b1b7

Browse files
committed
add task result files
1 parent d003921 commit cd6b1b7

File tree

18 files changed

+829
-37
lines changed

18 files changed

+829
-37
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- +goose Up
2+
-- +goose StatementBegin
3+
4+
CREATE TABLE IF NOT EXISTS public."task_results"
5+
(
6+
"run_id" INTEGER NOT NULL,
7+
"task_id" INTEGER NOT NULL,
8+
"result_type" TEXT NOT NULL,
9+
"result_index" INTEGER NOT NULL,
10+
"name" TEXT NOT NULL,
11+
"size" INTEGER NOT NULL,
12+
"data" BYTEA NOT NULL,
13+
CONSTRAINT "task_results_pkey" PRIMARY KEY ("run_id", "task_id", "result_type", "result_index")
14+
);
15+
16+
CREATE INDEX IF NOT EXISTS "task_results_name_idx" ON public."task_results" ("run_id", "task_id", "name");
17+
18+
-- +goose StatementEnd
19+
-- +goose Down
20+
-- +goose StatementBegin
21+
SELECT 'NOT SUPPORTED';
22+
-- +goose StatementEnd
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- +goose Up
2+
-- +goose StatementBegin
3+
4+
CREATE TABLE IF NOT EXISTS "task_results"
5+
(
6+
"run_id" INTEGER NOT NULL,
7+
"task_id" INTEGER NOT NULL,
8+
"result_type" TEXT NOT NULL,
9+
"result_index" INTEGER NOT NULL,
10+
"name" TEXT NOT NULL,
11+
"size" INTEGER NOT NULL,
12+
"data" BLOB NOT NULL,
13+
CONSTRAINT "task_results_pkey" PRIMARY KEY ("run_id", "task_id", "result_type", "result_index")
14+
);
15+
16+
CREATE INDEX IF NOT EXISTS "task_results_name_idx" ON "task_results" ("run_id", "task_id", "name");
17+
18+
-- +goose StatementEnd
19+
-- +goose Down
20+
-- +goose StatementBegin
21+
SELECT 'NOT SUPPORTED';
22+
-- +goose StatementEnd

pkg/coordinator/db/task_results.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package db
2+
3+
import "github.com/jmoiron/sqlx"
4+
5+
type TaskResult struct {
6+
RunID int `db:"run_id"`
7+
TaskID int `db:"task_id"`
8+
Type string `db:"result_type"`
9+
Index int `db:"result_index"`
10+
Name string `db:"name"`
11+
Size int `db:"size"`
12+
Data []byte `db:"data"`
13+
}
14+
15+
type TaskResultHeader struct {
16+
TaskID int `db:"task_id"`
17+
Type string `db:"result_type"`
18+
Index int `db:"result_index"`
19+
Name string `db:"name"`
20+
Size int `db:"size"`
21+
}
22+
23+
func (db *Database) UpsertTaskResult(tx *sqlx.Tx, result *TaskResult) error {
24+
_, err := tx.Exec(db.EngineQuery(map[EngineType]string{
25+
EnginePgsql: `
26+
INSERT INTO task_results (run_id, task_id, result_type, result_index, name, size, data)
27+
VALUES ($1, $2, $3, $4, $5, $6, $7)
28+
ON CONFLICT (run_id, task_id, result_type, result_index) DO UPDATE SET
29+
name = excluded.name,
30+
size = excluded.size,
31+
data = excluded.data
32+
`,
33+
EngineSqlite: `
34+
INSERT OR REPLACE INTO task_results (run_id, task_id, result_type, result_index, name, size, data)
35+
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
36+
}),
37+
result.RunID, result.TaskID, result.Type, result.Index, result.Name, result.Size, result.Data)
38+
39+
return err
40+
}
41+
42+
func (db *Database) GetTaskResultByIndex(runID, taskID int, resultType string, index int) (*TaskResult, error) {
43+
var result TaskResult
44+
err := db.reader.Get(&result, `
45+
SELECT * FROM task_results
46+
WHERE run_id = $1 AND task_id = $2 AND result_type = $3 AND result_index = $4`,
47+
runID, taskID, resultType, index)
48+
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
return &result, nil
54+
}
55+
56+
func (db *Database) GetTaskResultByName(runID, taskID int, resultType, name string) (*TaskResult, error) {
57+
var result TaskResult
58+
err := db.reader.Get(&result, `
59+
SELECT * FROM task_results
60+
WHERE run_id = $1 AND task_id = $2 AND result_type = $3 AND name = $4`,
61+
runID, taskID, resultType, name)
62+
63+
if err != nil {
64+
return nil, err
65+
}
66+
67+
return &result, nil
68+
}
69+
70+
func (db *Database) GetTaskResults(runID, taskID int, summaryType string) ([]TaskResult, error) {
71+
var results []TaskResult
72+
err := db.reader.Select(&results, `
73+
SELECT * FROM task_results
74+
WHERE run_id = $1 AND task_id = $2 AND result_type = $3`,
75+
runID, taskID, summaryType)
76+
77+
return results, err
78+
}
79+
80+
func (db *Database) GetAllTaskResultHeaders(runID int) ([]TaskResultHeader, error) {
81+
var headers []TaskResultHeader
82+
err := db.reader.Select(&headers, `
83+
SELECT task_id, result_type, result_index, name, size FROM task_results
84+
WHERE run_id = $1`,
85+
runID)
86+
87+
return headers, err
88+
}

pkg/coordinator/scheduler/scheduler.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ func (ts *TaskScheduler) GetServices() types.TaskServices {
4343
return ts.services
4444
}
4545

46+
func (ts *TaskScheduler) GetTestRunID() uint64 {
47+
return ts.testRunID
48+
}
49+
4650
func (ts *TaskScheduler) AddRootTask(options *types.TaskOptions) (types.TaskIndex, error) {
4751
task, err := ts.newTaskState(options, nil, nil, false)
4852
if err != nil {
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package runshell
2+
3+
import (
4+
"fmt"
5+
"os"
6+
)
7+
8+
type resultFile struct {
9+
filePath string
10+
}
11+
12+
func newResultFile(filePath string) (*resultFile, error) {
13+
// Touch the file
14+
if err := os.WriteFile(filePath, []byte{}, 0o600); err != nil {
15+
return nil, fmt.Errorf("failed to create result file: %w", err)
16+
}
17+
18+
return &resultFile{
19+
filePath: filePath,
20+
}, nil
21+
}
22+
23+
func (f *resultFile) FilePath() string {
24+
return f.filePath
25+
}
26+
27+
func (f *resultFile) Cleanup() ([]byte, error) {
28+
// Read file contents
29+
data, err := os.ReadFile(f.filePath)
30+
if err != nil && !os.IsNotExist(err) {
31+
return nil, fmt.Errorf("failed to read result file: %w", err)
32+
}
33+
34+
// Remove temp file
35+
if err := os.Remove(f.filePath); err != nil {
36+
return nil, fmt.Errorf("failed to remove temp file: %w", err)
37+
}
38+
39+
return data, nil
40+
}

pkg/coordinator/tasks/run_shell/task.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,16 @@ import (
66
"encoding/json"
77
"fmt"
88
"io"
9+
"os"
910
"os/exec"
11+
"path/filepath"
1012
"regexp"
1113
"syscall"
1214
"time"
1315

16+
"github.com/ethpandaops/assertoor/pkg/coordinator/db"
1417
"github.com/ethpandaops/assertoor/pkg/coordinator/types"
18+
"github.com/jmoiron/sqlx"
1519
"github.com/sirupsen/logrus"
1620
)
1721

@@ -78,6 +82,19 @@ func (t *Task) Execute(ctx context.Context) error {
7882
cmdLogger := t.logger.WithField("shell", t.config.Shell)
7983
cmdLogger.Info("running command")
8084

85+
// create temp dir for task
86+
taskDir, err := os.MkdirTemp(os.TempDir(), fmt.Sprintf("assertoor_%v_%v_", t.ctx.Scheduler.GetTestRunID(), t.ctx.Index))
87+
if err != nil {
88+
cmdLogger.Errorf("failed creating task dir: %v", err)
89+
return err
90+
}
91+
92+
defer func() {
93+
if err2 := os.RemoveAll(taskDir); err2 != nil {
94+
cmdLogger.Errorf("failed cleaning up task dir: %v", err2)
95+
}
96+
}()
97+
8198
//nolint:gosec // ignore
8299
command := exec.CommandContext(ctx, t.config.Shell, t.config.ShellArgs...)
83100

@@ -124,6 +141,28 @@ func (t *Task) Execute(ctx context.Context) error {
124141
command.Env = append(command.Env, fmt.Sprintf("%v=%v", envName, string(varJSON)))
125142
}
126143

144+
// create summaries file
145+
summaryFile, err := newResultFile(filepath.Join(taskDir, "summary"))
146+
if err != nil {
147+
cmdLogger.Errorf("failed creating summary file: %v", err)
148+
return err
149+
}
150+
151+
command.Env = append(command.Env, fmt.Sprintf("ASSERTOOR_SUMMARY=%v", summaryFile.FilePath()))
152+
153+
// create folder for result files
154+
resultDir := filepath.Join(taskDir, "results")
155+
if err = os.MkdirAll(resultDir, 0o700); err != nil {
156+
cmdLogger.Errorf("failed creating result dir: %v", err)
157+
return err
158+
}
159+
160+
command.Env = append(command.Env, fmt.Sprintf("ASSERTOOR_RESULT_DIR=%v", resultDir))
161+
162+
defer func() {
163+
t.storeTaskResults(summaryFile, resultDir)
164+
}()
165+
127166
// start shell
128167
err = command.Start()
129168
if err != nil {
@@ -305,3 +344,70 @@ func (t *Task) parseOutputVars(line string) bool {
305344

306345
return false
307346
}
347+
348+
func (t *Task) storeTaskResults(summaryFile *resultFile, resultDir string) {
349+
// store files to db
350+
database := t.ctx.Scheduler.GetServices().Database()
351+
if err2 := database.RunTransaction(func(tx *sqlx.Tx) error {
352+
// store summary file
353+
data, err3 := summaryFile.Cleanup()
354+
if err3 != nil {
355+
t.logger.Errorf("failed cleaning up summary file: %v", err3)
356+
} else if err3 = database.UpsertTaskResult(tx, &db.TaskResult{
357+
RunID: int(t.ctx.Scheduler.GetTestRunID()),
358+
TaskID: int(t.ctx.Index),
359+
Type: "summary",
360+
Index: 0,
361+
Name: "",
362+
Size: len(data),
363+
Data: data,
364+
}); err3 != nil {
365+
t.logger.Errorf("failed storing summary file to db: %v", err3)
366+
}
367+
368+
// store result files
369+
fileIdx := 0
370+
371+
var storeResultFilesFn func(path string, prefix string)
372+
storeResultFilesFn = func(path string, prefix string) {
373+
if prefix != "" {
374+
prefix += "/"
375+
}
376+
377+
files, err3 := os.ReadDir(path)
378+
if err3 != nil {
379+
t.logger.Errorf("failed reading result dir: %v", err3)
380+
} else {
381+
for _, file := range files {
382+
if file.IsDir() {
383+
storeResultFilesFn(filepath.Join(path, file.Name()), fmt.Sprintf("%v%v", prefix, file.Name()))
384+
continue
385+
}
386+
387+
data, err3 := os.ReadFile(filepath.Join(path, file.Name()))
388+
if err3 != nil {
389+
t.logger.Errorf("failed reading result file: %v", err3)
390+
} else if err3 = database.UpsertTaskResult(tx, &db.TaskResult{
391+
RunID: int(t.ctx.Scheduler.GetTestRunID()),
392+
TaskID: int(t.ctx.Index),
393+
Type: "result",
394+
Index: fileIdx,
395+
Name: file.Name(),
396+
Size: len(data),
397+
Data: data,
398+
}); err3 != nil {
399+
t.logger.Errorf("failed storing result file to db: %v", err3)
400+
}
401+
402+
fileIdx++
403+
}
404+
}
405+
}
406+
407+
storeResultFilesFn(resultDir, "")
408+
409+
return nil
410+
}); err2 != nil {
411+
t.logger.Errorf("failed storing task results to db: %v", err2)
412+
}
413+
}

pkg/coordinator/types/scheduler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
type TaskSchedulerRunner interface {
1414
TaskScheduler
1515
GetServices() TaskServices
16+
GetTestRunID() uint64
1617
ParseTaskOptions(rawtask helper.IRawMessage) (*TaskOptions, error)
1718
ExecuteTask(ctx context.Context, taskIndex TaskIndex, taskWatchFn func(ctx context.Context, cancelFn context.CancelFunc, taskIndex TaskIndex)) error
1819
WatchTaskPass(ctx context.Context, cancelFn context.CancelFunc, taskIndex TaskIndex)

0 commit comments

Comments
 (0)