Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions internal/cmd/dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ func runDequeue(ctx *Context, args []string) error {
return dequeueDAGRun(ctx, queueName, dagRun, false)
}

// dequeueFirst dequeues the first dag-run from the given queue.
// dequeueFirst dequeues the first DAG run from the named queue and processes that run as aborted.
//
// It returns an error if queues are disabled, if removing an item from the queue fails,
// if the queue is empty, if retrieving the dequeued item's DAG-run data fails, or if
// processing the dequeued DAG run fails.
func dequeueFirst(ctx *Context, queueName string) error {
// Check if queues are enabled
if !ctx.Config.Queues.Enabled {
Expand All @@ -59,7 +63,11 @@ func dequeueFirst(ctx *Context, queueName string) error {
return fmt.Errorf("no dag-run found in queue %s", queueName)
}

return dequeueDAGRun(ctx, queueName, item.Data(), true)
data, err := item.Data()
if err != nil {
return fmt.Errorf("failed to get dag-run data: %w", err)
}
return dequeueDAGRun(ctx, queueName, *data, true)
}

// dequeueDAGRun dequeues a dag-run from the queue.
Expand Down
2 changes: 1 addition & 1 deletion internal/core/execution/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,5 @@ type QueuedItemData interface {
// ID returns the ID of the queued item
ID() string
// Data returns the data of the queued item
Data() DAGRunRef
Data() (*DAGRunRef, error)
}
3 changes: 2 additions & 1 deletion internal/integration/distributed_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ steps:
require.Len(t, queueItems, 1, "DAG should be enqueued once")

if len(queueItems) > 0 {
data := queueItems[0].Data()
data, err := queueItems[0].Data()
require.NoError(t, err, "Should be able to get queue item data")
t.Logf("DAG enqueued: dag=%s runId=%s", data.Name, data.ID)
}

Expand Down
11 changes: 7 additions & 4 deletions internal/integration/distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ steps:
require.Len(t, queueItems, 1, "DAG should be enqueued once")

if len(queueItems) > 0 {
data := queueItems[0].Data()
data, err := queueItems[0].Data()
require.NoError(t, err, "Should be able to get queue item data")
t.Logf("DAG enqueued: dag=%s runId=%s", data.Name, data.ID)
}

Expand Down Expand Up @@ -156,9 +157,10 @@ steps:
var dagRunID string
var dagRun execution.DAGRunRef
if len(queueItems) > 0 {
data := queueItems[0].Data()
data, err := queueItems[0].Data()
require.NoError(t, err, "Should be able to get queue item data")
dagRunID = data.ID
dagRun = data
dagRun = *data
t.Logf("DAG enqueued: dag=%s runId=%s", data.Name, data.ID)
}

Expand All @@ -183,7 +185,8 @@ steps:
require.Len(t, queueItems, 1, "Retry should be enqueued once")

if len(queueItems) > 0 {
data := queueItems[0].Data()
data, err := queueItems[0].Data()
require.NoError(t, err, "Should be able to get queue item data")
require.Equal(t, dagRunID, data.ID, "Should have same DAG run ID")
t.Logf("Retry enqueued: dag=%s runId=%s", data.Name, data.ID)
}
Expand Down
93 changes: 80 additions & 13 deletions internal/persistence/filequeue/job.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,101 @@
package filequeue

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"

"github.com/dagu-org/dagu/internal/core/execution"
)

var _ execution.QueuedItemData = (*Job)(nil)
var _ execution.QueuedItemData = (*QueuedFile)(nil)

type Job struct {
id string
ItemData
type QueuedFile struct {
id string
file string

cache *ItemData
lock sync.Mutex
}

func NewJob(data ItemData) *Job {
base := filepath.Base(data.FileName)
// NewQueuedFile creates a QueuedFile for the given queue file path.
// The returned QueuedFile's id is derived from the base filename (the filename without its extension)
// and its file field is set to the provided path. It does not read or validate the file contents.
func NewQueuedFile(file string) *QueuedFile {
base := filepath.Base(file)
ext := filepath.Ext(base)
name := base[:len(base)-len(ext)]
return &Job{
id: name,
ItemData: data,
return &QueuedFile{
file: file,
id: name,
}
}

// ID implements execution.QueuedItemData.
func (j *QueuedFile) ID() string {
return j.id
}

// Data implements execution.QueuedItemData.
func (j *QueuedFile) Data() (*execution.DAGRunRef, error) {
itemData, err := j.loadData()
if err != nil {
return nil, fmt.Errorf("failed to load job data: %w", err)
}
return &itemData.DAGRun, nil
}

func (j *QueuedFile) loadData() (*ItemData, error) {
j.lock.Lock()
defer j.lock.Unlock()

if j.cache != nil {
return j.cache, nil
}

var itemData ItemData

fileData, err := os.ReadFile(j.file) // nolint: gosec
if err != nil {
return nil, fmt.Errorf("failed to read queue file %s: %w", j.file, err)
}

if err := json.Unmarshal(fileData, &itemData); err != nil {
return nil, fmt.Errorf("failed to unmarshal queue file %s: %w", j.file, err)
}

j.cache = &itemData

return &itemData, nil
}

// ExtractJob loads and returns the underlying QueuedItemData.
func (j *QueuedFile) ExtractJob() (*Job, error) {
data, err := j.loadData()
if err != nil {
return nil, fmt.Errorf("failed to load job data: %w", err)
}

return &Job{
id: j.id,
ItemData: *data,
}, nil
}

var _ execution.QueuedItemData = (*Job)(nil)

// Job implements execution.QueuedItemData for a job stored in a file.
type Job struct {
id string
ItemData
}

// ID implements models.QueuedJob.
func (j *Job) ID() string {
return j.id
}

// Data implements models.QueuedItem.
func (j *Job) Data() execution.DAGRunRef {
return j.DAGRun
func (j *Job) Data() (*execution.DAGRunRef, error) {
return &j.DAGRun, nil
}
119 changes: 110 additions & 9 deletions internal/persistence/filequeue/job_test.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,130 @@
package filequeue

import (
"encoding/json"
"os"
"path/filepath"
"testing"

"github.com/dagu-org/dagu/internal/core/execution"
"github.com/stretchr/testify/require"
)

func TestJob(t *testing.T) {
func TestQueuedFile(t *testing.T) {
t.Parallel()

// Create a new job
job := NewJob(ItemData{
FileName: "/tmp/test-file.json",
// Create a temporary directory for the test
tmpDir := t.TempDir()

// Create the test file with proper JSON content
testFilePath := filepath.Join(tmpDir, "test-file.json")
itemData := ItemData{
FileName: "test-file.json",
DAGRun: execution.DAGRunRef{
Name: "test-name",
ID: "test-dag",
},
}
fileContent, err := json.Marshal(itemData)
require.NoError(t, err, "expected no error when marshaling item data")
err = os.WriteFile(testFilePath, fileContent, 0644)
require.NoError(t, err, "expected no error when writing test file")

// Create a new QueuedFile
queuedFile := NewQueuedFile(testFilePath)

// Check if the ID is correct
require.Equal(t, "test-file", queuedFile.ID(), "expected job ID to be 'test-file'")

// Check if the data is correct (lazy loading from file)
jobData, err := queuedFile.Data()
require.NoError(t, err, "expected no error when getting job data")
require.Equal(t, "test-name", jobData.Name, "expected job name to be 'test-name'")
require.Equal(t, "test-dag", jobData.ID, "expected job ID to be 'test-dag'")
}

func TestQueuedFile_DataError(t *testing.T) {
t.Parallel()

// Create a QueuedFile with a non-existent file
queuedFile := NewQueuedFile("/nonexistent/path/test-file.json")

// Check if Data() returns an error for non-existent file
_, err := queuedFile.Data()
require.Error(t, err, "expected error when reading non-existent file")
}

func TestQueuedFile_ExtractJob(t *testing.T) {
t.Parallel()

// Create a temporary directory for the test
tmpDir := t.TempDir()

// Create the test file with proper JSON content
testFilePath := filepath.Join(tmpDir, "test-file.json")
itemData := ItemData{
FileName: "test-file.json",
DAGRun: execution.DAGRunRef{
Name: "test-name",
ID: "test-dag",
},
})
}
fileContent, err := json.Marshal(itemData)
require.NoError(t, err, "expected no error when marshaling item data")
err = os.WriteFile(testFilePath, fileContent, 0644)
require.NoError(t, err, "expected no error when writing test file")

// Check if the job ID is correct
require.Equal(t, "test-file", job.ID(), "expected job ID to be 'test-file'")
// Create a QueuedFile and extract the Job
queuedFile := NewQueuedFile(testFilePath)
job, err := queuedFile.ExtractJob()
require.NoError(t, err, "expected no error when extracting job")

// Check if the job data is correct
jobData := job.Data()
// Delete the file to simulate what happens after Pop()
err = os.Remove(testFilePath)
require.NoError(t, err, "expected no error when removing test file")

// The extracted Job should still have the cached data
jobData, err := job.Data()
require.NoError(t, err, "expected no error when getting job data after file deletion")
require.Equal(t, "test-name", jobData.Name, "expected job name to be 'test-name'")
require.Equal(t, "test-dag", jobData.ID, "expected job ID to be 'test-dag'")
}

func TestQueuedFile_Caching(t *testing.T) {
t.Parallel()

// Create a temporary directory for the test
tmpDir := t.TempDir()

// Create the test file with proper JSON content
testFilePath := filepath.Join(tmpDir, "test-file.json")
itemData := ItemData{
FileName: "test-file.json",
DAGRun: execution.DAGRunRef{
Name: "test-name",
ID: "test-dag",
},
}
fileContent, err := json.Marshal(itemData)
require.NoError(t, err, "expected no error when marshaling item data")
err = os.WriteFile(testFilePath, fileContent, 0644)
require.NoError(t, err, "expected no error when writing test file")

// Create a QueuedFile
queuedFile := NewQueuedFile(testFilePath)

// First call to Data() should load from file
jobData1, err := queuedFile.Data()
require.NoError(t, err, "expected no error when getting job data")
require.Equal(t, "test-name", jobData1.Name)

// Delete the file
err = os.Remove(testFilePath)
require.NoError(t, err, "expected no error when removing test file")

// Second call to Data() should use cached data
jobData2, err := queuedFile.Data()
require.NoError(t, err, "expected no error when getting cached job data")
require.Equal(t, "test-name", jobData2.Name)
require.Equal(t, "test-dag", jobData2.ID)
}
4 changes: 1 addition & 3 deletions internal/persistence/filequeue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ func (q *DualQueue) DequeueByDAGRunID(ctx context.Context, dagRun execution.DAGR
logger.Error(ctx, "Failed to pop dag-run from queue file", tag.Error(err))
return nil, fmt.Errorf("failed to pop dag-run %s: %w", dagRun.ID, err)
}
for _, item := range popped {
items = append(items, item)
}
items = append(items, popped...)
}

// Remove directory if it's empty
Expand Down
Loading
Loading