Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
760 changes: 473 additions & 287 deletions api/v2/api.gen.go

Large diffs are not rendered by default.

84 changes: 74 additions & 10 deletions api/v2/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2688,8 +2688,8 @@ paths:

/queues:
get:
summary: "List all execution queues with active DAG-runs"
description: "Retrieves all queues showing both running and queued DAG-runs, organized by queue/process group"
summary: "List all execution queues with summary statistics"
description: "Returns queue list with running/queued counts. Use /queues/{name}/items for paginated item details."
operationId: "listQueues"
tags:
- "queues"
Expand All @@ -2709,6 +2709,51 @@ paths:
schema:
$ref: "#/components/schemas/Error"

/queues/{name}/items:
get:
summary: "Get paginated items for a specific queue"
description: "Returns paginated list of running or queued DAG-runs for the specified queue"
operationId: "listQueueItems"
tags:
- "queues"
parameters:
- $ref: "#/components/parameters/RemoteNode"
- $ref: "#/components/parameters/Page"
- $ref: "#/components/parameters/PerPage"
- name: name
in: path
description: "Queue name"
required: true
schema:
type: string
- name: type
in: query
description: "Item type to fetch"
required: false
schema:
type: string
enum: ["running", "queued"]
default: "queued"
responses:
"200":
description: "A successful response"
content:
application/json:
schema:
$ref: "#/components/schemas/QueueItemsResponse"
"404":
description: "Queue not found"
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
default:
description: "Generic error response"
content:
application/json:
schema:
$ref: "#/components/schemas/Error"

/services/resources/history:
get:
summary: "Get resource usage history"
Expand Down Expand Up @@ -4691,7 +4736,7 @@ components:

Queue:
type: object
description: "A queue/process group with its active DAG-runs"
description: "A queue/process group with summary statistics"
properties:
name:
type: string
Expand All @@ -4704,21 +4749,40 @@ components:
type: integer
description: "Maximum number of concurrent runs allowed. For 'global' queues, this is the configured maxConcurrency. For 'dag-based' queues, this is the DAG's maxActiveRuns (default 1)"
minimum: 1
runningCount:
type: integer
description: "Number of currently running DAG-runs"
minimum: 0
queuedCount:
type: integer
description: "Number of queued DAG-runs waiting to execute"
minimum: 0
running:
type: array
description: "List of currently running DAG-runs"
items:
$ref: "#/components/schemas/DAGRunSummary"
queued:
type: array
description: "List of DAG-runs waiting to execute"
description: "List of currently running DAG-runs (bounded by maxConcurrency)"
items:
$ref: "#/components/schemas/DAGRunSummary"
required:
- name
- type
- runningCount
- queuedCount
- running
- queued

QueueItemsResponse:
type: object
description: "Paginated queue items response"
properties:
items:
type: array
description: "List of DAG-run summaries"
items:
$ref: "#/components/schemas/DAGRunSummary"
pagination:
$ref: "#/components/schemas/Pagination"
required:
- items
- pagination

QueuesSummary:
type: object
Expand Down
2 changes: 2 additions & 0 deletions internal/cmn/telemetry/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var (
)

// Collector implements prometheus.Collector interface
var _ prometheus.Collector = (*Collector)(nil)

type Collector struct {
startTime time.Time
version string
Expand Down
15 changes: 13 additions & 2 deletions internal/cmn/telemetry/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type mockDAGStore struct {
mock.Mock
}

var _ exec.DAGStore = (*mockDAGStore)(nil)

func (m *mockDAGStore) Create(ctx context.Context, fileName string, spec []byte) error {
args := m.Called(ctx, fileName, spec)
return args.Error(0)
Expand Down Expand Up @@ -103,6 +105,8 @@ type mockDAGRunStore struct {
mock.Mock
}

var _ exec.DAGRunStore = (*mockDAGRunStore)(nil)

// RemoveDAGRun implements models.DAGRunStore.
func (m *mockDAGRunStore) RemoveDAGRun(_ context.Context, _ exec.DAGRunRef) error {
panic("unimplemented")
Expand Down Expand Up @@ -174,12 +178,12 @@ func (m *mockDAGRunStore) RenameDAGRuns(ctx context.Context, oldName, newName st
return args.Error(0)
}

var _ exec.QueueStore = (*mockQueueStore)(nil)

type mockQueueStore struct {
mock.Mock
}

var _ exec.QueueStore = (*mockQueueStore)(nil)

// QueueWatcher implements execution.QueueStore.
func (m *mockQueueStore) QueueWatcher(_ context.Context) exec.QueueWatcher {
panic("unimplemented")
Expand Down Expand Up @@ -223,6 +227,11 @@ func (m *mockQueueStore) List(ctx context.Context, name string) ([]exec.QueuedIt
return args.Get(0).([]exec.QueuedItemData), args.Error(1)
}

func (m *mockQueueStore) ListPaginated(ctx context.Context, name string, pg exec.Paginator) (exec.PaginatedResult[exec.QueuedItemData], error) {
args := m.Called(ctx, name, pg)
return args.Get(0).(exec.PaginatedResult[exec.QueuedItemData]), args.Error(1)
}

func (m *mockQueueStore) All(ctx context.Context) ([]exec.QueuedItemData, error) {
args := m.Called(ctx)
if args.Get(0) == nil {
Expand All @@ -237,6 +246,8 @@ type mockServiceRegistry struct {
mock.Mock
}

var _ exec.ServiceRegistry = (*mockServiceRegistry)(nil)

func (m *mockServiceRegistry) Register(ctx context.Context, serviceName exec.ServiceName, hostInfo exec.HostInfo) error {
args := m.Called(ctx, serviceName, hostInfo)
return args.Error(0)
Expand Down
2 changes: 2 additions & 0 deletions internal/core/exec/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type QueueStore interface {
Len(ctx context.Context, name string) (int, error)
// List returns all items in the queue with the given name
List(ctx context.Context, name string) ([]QueuedItemData, error)
// ListPaginated returns paginated items for a specific queue
ListPaginated(ctx context.Context, name string, pg Paginator) (PaginatedResult[QueuedItemData], error)
// All returns all items in the queue
All(ctx context.Context) ([]QueuedItemData, error)
// ListByDAGName returns all items that has a specific DAG name
Expand Down
2 changes: 2 additions & 0 deletions internal/llm/providers/anthropic/anthropic.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func init() {
}

// Provider implements the llm.Provider interface for Anthropic Claude.
var _ llm.Provider = (*Provider)(nil)

type Provider struct {
config llm.Config
httpClient *llm.HTTPClient
Expand Down
2 changes: 2 additions & 0 deletions internal/llm/providers/gemini/gemini.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func init() {
}

// Provider implements the llm.Provider interface for Google Gemini.
var _ llm.Provider = (*Provider)(nil)

type Provider struct {
config llm.Config
httpClient *llm.HTTPClient
Expand Down
2 changes: 2 additions & 0 deletions internal/llm/providers/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func init() {
}

// Provider implements the llm.Provider interface for local OpenAI-compatible servers.
var _ llm.Provider = (*Provider)(nil)

type Provider struct {
config llm.Config
httpClient *llm.HTTPClient
Expand Down
2 changes: 2 additions & 0 deletions internal/llm/providers/openai/openai.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func init() {
}

// Provider implements the llm.Provider interface for OpenAI.
var _ llm.Provider = (*Provider)(nil)

type Provider struct {
config llm.Config
httpClient *llm.HTTPClient
Expand Down
2 changes: 2 additions & 0 deletions internal/llm/providers/openrouter/openrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func init() {
}

// Provider implements the llm.Provider interface for OpenRouter.
var _ llm.Provider = (*Provider)(nil)

type Provider struct {
config llm.Config
httpClient *llm.HTTPClient
Expand Down
54 changes: 54 additions & 0 deletions internal/persis/filequeue/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,60 @@ func (s *Store) List(ctx context.Context, name string) ([]exec.QueuedItemData, e
return items, nil
}

// ListPaginated returns paginated items for a specific queue.
// This implementation paginates at the file-path level BEFORE creating any
// QueuedFile objects, ensuring O(1) memory for the paginated items regardless
// of total queue size.
func (s *Store) ListPaginated(ctx context.Context, name string, pg exec.Paginator) (exec.PaginatedResult[exec.QueuedItemData], error) {
ctx = logger.WithValues(ctx, tag.Queue(name))
s.mu.Lock()
defer s.mu.Unlock()

limit := pg.Limit()
offset := pg.Offset()

// Build queue directory path
queueDir := filepath.Join(s.baseDir, name)
if _, err := os.Stat(queueDir); os.IsNotExist(err) {
return exec.NewPaginatedResult([]exec.QueuedItemData{}, 0, pg), nil
}

// Collect file paths ONLY (no parsing, no object creation)
// High priority first, then low - maintains proper queue ordering
patterns := []string{
filepath.Join(queueDir, "item_high_*.json"),
filepath.Join(queueDir, "item_low_*.json"),
}

var allFiles []string
for _, pattern := range patterns {
files, err := filepath.Glob(pattern)
if err != nil {
logger.Error(ctx, "Failed to glob queue files", tag.Error(err))
return exec.PaginatedResult[exec.QueuedItemData]{}, fmt.Errorf("failed to list queue files: %w", err)
}
// Lexicographic sort = chronological (timestamp encoded in filename)
sort.Strings(files)
allFiles = append(allFiles, files...)
}

total := len(allFiles)

// Apply pagination to file paths (efficient string slicing)
startIndex := min(offset, total)
endIndex := min(offset+limit, total)
paginatedFiles := allFiles[startIndex:endIndex]

// Create QueuedFile objects only for the paginated portion.
// QueuedFile is lazy-loaded - JSON is not read until Data() is called.
items := make([]exec.QueuedItemData, 0, len(paginatedFiles))
for _, file := range paginatedFiles {
items = append(items, NewQueuedFile(file))
}

return exec.NewPaginatedResult(items, total, pg), nil
}

func (s *Store) ListByDAGName(ctx context.Context, name, dagName string) ([]exec.QueuedItemData, error) {
items, err := s.List(ctx, name)
if err != nil {
Expand Down
Loading
Loading