Skip to content

Commit f14f135

Browse files
authored
feat: queue items pagination (#1585)
* **New Features** * Added paginated queue items browsing with filtering options (running or queued). * Added log download capability for DAG runs, steps, and sub-DAG runs. * **Improvements** * Queue display now shows summary statistics with item counts and utilization metrics. * Enhanced queue interface with pagination controls and improved data presentation.
1 parent c19a003 commit f14f135

File tree

16 files changed

+1067
-501
lines changed

16 files changed

+1067
-501
lines changed

api/v2/api.gen.go

Lines changed: 473 additions & 287 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v2/api.yaml

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2688,8 +2688,8 @@ paths:
26882688

26892689
/queues:
26902690
get:
2691-
summary: "List all execution queues with active DAG-runs"
2692-
description: "Retrieves all queues showing both running and queued DAG-runs, organized by queue/process group"
2691+
summary: "List all execution queues with summary statistics"
2692+
description: "Returns queue list with running/queued counts. Use /queues/{name}/items for paginated item details."
26932693
operationId: "listQueues"
26942694
tags:
26952695
- "queues"
@@ -2709,6 +2709,51 @@ paths:
27092709
schema:
27102710
$ref: "#/components/schemas/Error"
27112711

2712+
/queues/{name}/items:
2713+
get:
2714+
summary: "Get paginated items for a specific queue"
2715+
description: "Returns paginated list of running or queued DAG-runs for the specified queue"
2716+
operationId: "listQueueItems"
2717+
tags:
2718+
- "queues"
2719+
parameters:
2720+
- $ref: "#/components/parameters/RemoteNode"
2721+
- $ref: "#/components/parameters/Page"
2722+
- $ref: "#/components/parameters/PerPage"
2723+
- name: name
2724+
in: path
2725+
description: "Queue name"
2726+
required: true
2727+
schema:
2728+
type: string
2729+
- name: type
2730+
in: query
2731+
description: "Item type to fetch"
2732+
required: false
2733+
schema:
2734+
type: string
2735+
enum: ["running", "queued"]
2736+
default: "queued"
2737+
responses:
2738+
"200":
2739+
description: "A successful response"
2740+
content:
2741+
application/json:
2742+
schema:
2743+
$ref: "#/components/schemas/QueueItemsResponse"
2744+
"404":
2745+
description: "Queue not found"
2746+
content:
2747+
application/json:
2748+
schema:
2749+
$ref: "#/components/schemas/Error"
2750+
default:
2751+
description: "Generic error response"
2752+
content:
2753+
application/json:
2754+
schema:
2755+
$ref: "#/components/schemas/Error"
2756+
27122757
/services/resources/history:
27132758
get:
27142759
summary: "Get resource usage history"
@@ -4691,7 +4736,7 @@ components:
46914736

46924737
Queue:
46934738
type: object
4694-
description: "A queue/process group with its active DAG-runs"
4739+
description: "A queue/process group with summary statistics"
46954740
properties:
46964741
name:
46974742
type: string
@@ -4704,21 +4749,40 @@ components:
47044749
type: integer
47054750
description: "Maximum number of concurrent runs allowed. For 'global' queues, this is the configured maxConcurrency. For 'dag-based' queues, this is the DAG's maxActiveRuns (default 1)"
47064751
minimum: 1
4752+
runningCount:
4753+
type: integer
4754+
description: "Number of currently running DAG-runs"
4755+
minimum: 0
4756+
queuedCount:
4757+
type: integer
4758+
description: "Number of queued DAG-runs waiting to execute"
4759+
minimum: 0
47074760
running:
47084761
type: array
4709-
description: "List of currently running DAG-runs"
4710-
items:
4711-
$ref: "#/components/schemas/DAGRunSummary"
4712-
queued:
4713-
type: array
4714-
description: "List of DAG-runs waiting to execute"
4762+
description: "List of currently running DAG-runs (bounded by maxConcurrency)"
47154763
items:
47164764
$ref: "#/components/schemas/DAGRunSummary"
47174765
required:
47184766
- name
47194767
- type
4768+
- runningCount
4769+
- queuedCount
47204770
- running
4721-
- queued
4771+
4772+
QueueItemsResponse:
4773+
type: object
4774+
description: "Paginated queue items response"
4775+
properties:
4776+
items:
4777+
type: array
4778+
description: "List of DAG-run summaries"
4779+
items:
4780+
$ref: "#/components/schemas/DAGRunSummary"
4781+
pagination:
4782+
$ref: "#/components/schemas/Pagination"
4783+
required:
4784+
- items
4785+
- pagination
47224786

47234787
QueuesSummary:
47244788
type: object

internal/cmn/telemetry/collector.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ var (
2525
)
2626

2727
// Collector implements prometheus.Collector interface
28+
var _ prometheus.Collector = (*Collector)(nil)
29+
2830
type Collector struct {
2931
startTime time.Time
3032
version string

internal/cmn/telemetry/collector_test.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ type mockDAGStore struct {
2323
mock.Mock
2424
}
2525

26+
var _ exec.DAGStore = (*mockDAGStore)(nil)
27+
2628
func (m *mockDAGStore) Create(ctx context.Context, fileName string, spec []byte) error {
2729
args := m.Called(ctx, fileName, spec)
2830
return args.Error(0)
@@ -103,6 +105,8 @@ type mockDAGRunStore struct {
103105
mock.Mock
104106
}
105107

108+
var _ exec.DAGRunStore = (*mockDAGRunStore)(nil)
109+
106110
// RemoveDAGRun implements models.DAGRunStore.
107111
func (m *mockDAGRunStore) RemoveDAGRun(_ context.Context, _ exec.DAGRunRef) error {
108112
panic("unimplemented")
@@ -174,12 +178,12 @@ func (m *mockDAGRunStore) RenameDAGRuns(ctx context.Context, oldName, newName st
174178
return args.Error(0)
175179
}
176180

177-
var _ exec.QueueStore = (*mockQueueStore)(nil)
178-
179181
type mockQueueStore struct {
180182
mock.Mock
181183
}
182184

185+
var _ exec.QueueStore = (*mockQueueStore)(nil)
186+
183187
// QueueWatcher implements execution.QueueStore.
184188
func (m *mockQueueStore) QueueWatcher(_ context.Context) exec.QueueWatcher {
185189
panic("unimplemented")
@@ -223,6 +227,11 @@ func (m *mockQueueStore) List(ctx context.Context, name string) ([]exec.QueuedIt
223227
return args.Get(0).([]exec.QueuedItemData), args.Error(1)
224228
}
225229

230+
func (m *mockQueueStore) ListPaginated(ctx context.Context, name string, pg exec.Paginator) (exec.PaginatedResult[exec.QueuedItemData], error) {
231+
args := m.Called(ctx, name, pg)
232+
return args.Get(0).(exec.PaginatedResult[exec.QueuedItemData]), args.Error(1)
233+
}
234+
226235
func (m *mockQueueStore) All(ctx context.Context) ([]exec.QueuedItemData, error) {
227236
args := m.Called(ctx)
228237
if args.Get(0) == nil {
@@ -237,6 +246,8 @@ type mockServiceRegistry struct {
237246
mock.Mock
238247
}
239248

249+
var _ exec.ServiceRegistry = (*mockServiceRegistry)(nil)
250+
240251
func (m *mockServiceRegistry) Register(ctx context.Context, serviceName exec.ServiceName, hostInfo exec.HostInfo) error {
241252
args := m.Called(ctx, serviceName, hostInfo)
242253
return args.Error(0)

internal/core/exec/queue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ type QueueStore interface {
2424
Len(ctx context.Context, name string) (int, error)
2525
// List returns all items in the queue with the given name
2626
List(ctx context.Context, name string) ([]QueuedItemData, error)
27+
// ListPaginated returns paginated items for a specific queue
28+
ListPaginated(ctx context.Context, name string, pg Paginator) (PaginatedResult[QueuedItemData], error)
2729
// All returns all items in the queue
2830
All(ctx context.Context) ([]QueuedItemData, error)
2931
// ListByDAGName returns all items that has a specific DAG name

internal/llm/providers/anthropic/anthropic.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ func init() {
3232
}
3333

3434
// Provider implements the llm.Provider interface for Anthropic Claude.
35+
var _ llm.Provider = (*Provider)(nil)
36+
3537
type Provider struct {
3638
config llm.Config
3739
httpClient *llm.HTTPClient

internal/llm/providers/gemini/gemini.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ func init() {
2525
}
2626

2727
// Provider implements the llm.Provider interface for Google Gemini.
28+
var _ llm.Provider = (*Provider)(nil)
29+
2830
type Provider struct {
2931
config llm.Config
3032
httpClient *llm.HTTPClient

internal/llm/providers/local/local.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ func init() {
2525
}
2626

2727
// Provider implements the llm.Provider interface for local OpenAI-compatible servers.
28+
var _ llm.Provider = (*Provider)(nil)
29+
2830
type Provider struct {
2931
config llm.Config
3032
httpClient *llm.HTTPClient

internal/llm/providers/openai/openai.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ func init() {
2424
}
2525

2626
// Provider implements the llm.Provider interface for OpenAI.
27+
var _ llm.Provider = (*Provider)(nil)
28+
2729
type Provider struct {
2830
config llm.Config
2931
httpClient *llm.HTTPClient

internal/llm/providers/openrouter/openrouter.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ func init() {
2525
}
2626

2727
// Provider implements the llm.Provider interface for OpenRouter.
28+
var _ llm.Provider = (*Provider)(nil)
29+
2830
type Provider struct {
2931
config llm.Config
3032
httpClient *llm.HTTPClient

0 commit comments

Comments
 (0)