-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathclient.go
More file actions
803 lines (663 loc) · 28.2 KB
/
client.go
File metadata and controls
803 lines (663 loc) · 28.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prefect
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1"
"github.com/PrefectHQ/prefect-operator/internal/portforward"
"github.com/go-logr/logr"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// isSuccessStatusCode returns true if the HTTP status code indicates success (2xx range)
func isSuccessStatusCode(statusCode int) bool {
return 200 <= statusCode && statusCode < 300
}
// PrefectClient defines the interface for interacting with the Prefect API
type PrefectClient interface {
// CreateOrUpdateDeployment creates a new deployment or updates an existing one
CreateOrUpdateDeployment(ctx context.Context, deployment *DeploymentSpec) (*Deployment, error)
// GetDeployment retrieves a deployment by ID
GetDeployment(ctx context.Context, id string) (*Deployment, error)
// GetDeploymentByName retrieves a deployment by name and flow ID
GetDeploymentByName(ctx context.Context, name, flowID string) (*Deployment, error)
// UpdateDeployment updates an existing deployment
UpdateDeployment(ctx context.Context, id string, deployment *DeploymentSpec) (*Deployment, error)
// DeleteDeployment deletes a deployment
DeleteDeployment(ctx context.Context, id string) error
// CreateOrGetFlow creates a new flow or returns an existing one with the same name
CreateOrGetFlow(ctx context.Context, flow *FlowSpec) (*Flow, error)
// GetFlowByName retrieves a flow by name
GetFlowByName(ctx context.Context, name string) (*Flow, error)
// CreateWorkPool creates a new work pool
CreateWorkPool(ctx context.Context, workPool *WorkPoolSpec) (*WorkPool, error)
// GetWorkPool retrieves a work pool by name
GetWorkPool(ctx context.Context, name string) (*WorkPool, error)
// UpdateWorkPool updates an existing work pool
UpdateWorkPool(ctx context.Context, name string, workPool *WorkPoolSpec) error
// DeleteWorkPool deletes a work pool
DeleteWorkPool(ctx context.Context, id string) error
// GetWorkerMetadata retrieves aggregate metadata for all worker types
GetWorkerMetadata(ctx context.Context) (map[string]WorkerMetadata, error)
}
// HTTPClient represents an HTTP client interface for testing
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}
// Client implements the PrefectClient interface
type Client struct {
BaseURL string
APIKey string
HTTPClient *http.Client
log logr.Logger
// PortForwardClient is used to port-forward to the Prefect server when running outside the cluster
PortForwardClient portforward.PortForwarder
}
// NewClient creates a new Prefect API client
func NewClient(baseURL, apiKey string, log logr.Logger) *Client {
return &Client{
BaseURL: baseURL,
APIKey: apiKey,
HTTPClient: &http.Client{Timeout: 30 * time.Second},
log: log,
}
}
// NewClientFromServerReference creates a new PrefectClient from a PrefectServerReference
func NewClientFromServerReference(serverRef *prefectiov1.PrefectServerReference, apiKey string, fallbackNamespace string, log logr.Logger) (*Client, error) {
// Create a base client first to check if we're running in cluster
baseClient := NewClient("", apiKey, log)
// Determine if we need port-forwarding
needsPortForwarding := !baseClient.isRunningInCluster() && serverRef.IsInCluster()
// Set the base URL based on whether we need port-forwarding
var baseURL string
if needsPortForwarding {
// When port-forwarding, use localhost with port 14200
baseURL = "http://localhost:14200/api"
log.V(1).Info("Using localhost for port-forwarding", "url", baseURL)
} else {
// Use the server's namespace, or fallback to the provided namespace
namespace := serverRef.Namespace
if namespace == "" {
namespace = fallbackNamespace // Use provided fallback instead of hardcoded "default"
}
baseURL = serverRef.GetAPIURL(namespace)
log.V(1).Info("Using in-cluster URL", "url", baseURL)
}
client := NewClient(baseURL, apiKey, log)
if needsPortForwarding {
// Initialize port-forwarding client with local port 14200 and remote port 4200
portForwardClient := portforward.NewKubectlPortForwarder(serverRef.Namespace, serverRef.Name, 14200, 4200)
client.PortForwardClient = portForwardClient
// Set up port-forwarding
stopCh := make(chan struct{}, 1)
readyCh := make(chan struct{}, 1)
errCh := make(chan error, 1)
go func() {
errCh <- client.PortForwardClient.ForwardPorts(stopCh, readyCh)
}()
select {
case err := <-errCh:
return nil, err
case <-readyCh:
log.V(1).Info("Port-forwarding is ready")
}
}
return client, nil
}
// NewClientFromK8s creates a new PrefectClient from a PrefectServerReference and Kubernetes client
// This combines API key retrieval with client creation for convenience
func NewClientFromK8s(ctx context.Context, serverRef *prefectiov1.PrefectServerReference, k8sClient client.Client, namespace string, log logr.Logger) (*Client, error) {
// Get the API key from the server reference
apiKey, err := serverRef.GetAPIKey(ctx, k8sClient, namespace)
if err != nil {
return nil, fmt.Errorf("failed to get API key: %w", err)
}
// Create client using the existing factory function, passing the namespace as fallback
return NewClientFromServerReference(serverRef, apiKey, namespace, log)
}
// DeploymentSpec represents the request payload for creating/updating deployments
type DeploymentSpec struct {
Name string `json:"name"`
FlowID string `json:"flow_id"`
Description *string `json:"description,omitempty"`
Version *string `json:"version,omitempty"`
Tags []string `json:"tags,omitempty"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
JobVariables map[string]interface{} `json:"job_variables,omitempty"`
WorkQueueName *string `json:"work_queue_name,omitempty"`
WorkPoolName *string `json:"work_pool_name,omitempty"`
Paused *bool `json:"paused,omitempty"`
Schedules []DeploymentSchedule `json:"schedules,omitempty"`
ConcurrencyLimit *int `json:"concurrency_limit,omitempty"`
GlobalConcurrencyLimits []string `json:"global_concurrency_limits,omitempty"`
Entrypoint *string `json:"entrypoint,omitempty"`
Path *string `json:"path,omitempty"`
PullSteps []map[string]interface{} `json:"pull_steps,omitempty"`
ParameterOpenAPISchema map[string]interface{} `json:"parameter_openapi_schema,omitempty"`
EnforceParameterSchema *bool `json:"enforce_parameter_schema,omitempty"`
}
// Deployment represents a Prefect deployment
type Deployment struct {
ID string `json:"id"`
Created time.Time `json:"created"`
Updated time.Time `json:"updated"`
Name string `json:"name"`
Version *string `json:"version"`
Description *string `json:"description"`
FlowID string `json:"flow_id"`
Paused bool `json:"paused"`
Tags []string `json:"tags"`
Parameters map[string]interface{} `json:"parameters"`
JobVariables map[string]interface{} `json:"job_variables"`
WorkQueueName *string `json:"work_queue_name"`
WorkPoolName *string `json:"work_pool_name"`
Status string `json:"status"`
Schedules []DeploymentSchedule `json:"schedules"`
ConcurrencyLimit *int `json:"concurrency_limit"`
GlobalConcurrencyLimits []string `json:"global_concurrency_limits"`
Entrypoint *string `json:"entrypoint"`
Path *string `json:"path"`
PullSteps []map[string]interface{} `json:"pull_steps"`
ParameterOpenAPISchema map[string]interface{} `json:"parameter_openapi_schema"`
EnforceParameterSchema bool `json:"enforce_parameter_schema"`
}
// Schedule represents a Prefect deployment schedule.
// Supports interval, cron, and rrule schedule types.
// Exactly one of Interval, Cron, or RRule should be specified.
type Schedule struct {
ID string `json:"id,omitempty"`
// === INTERVAL SCHEDULE FIELDS ===
// Maps to: IntervalSchedule schema in Prefect API
Interval *float64 `json:"interval,omitempty"` // seconds (required for interval)
AnchorDate *time.Time `json:"anchor_date,omitempty"` // anchor date for interval schedules
// === CRON SCHEDULE FIELDS ===
// Maps to: CronSchedule schema in Prefect API
Cron *string `json:"cron,omitempty"` // cron expression (required for cron)
DayOr *bool `json:"day_or,omitempty"` // day/day_of_week connection logic
// === RRULE SCHEDULE FIELDS ===
// Maps to: RRuleSchedule schema in Prefect API
RRule *string `json:"rrule,omitempty"` // RFC 5545 RRULE string (required for rrule)
// === COMMON FIELDS ===
// Shared across all schedule types
Timezone *string `json:"timezone,omitempty"`
Active *bool `json:"active,omitempty"`
MaxScheduledRuns *int `json:"max_scheduled_runs,omitempty"`
}
// DeploymentSchedule represents a deployment schedule in the Prefect API.
// This matches the DeploymentScheduleCreate schema which wraps the schedule object.
type DeploymentSchedule struct {
// Slug is a unique identifier for the schedule
Slug *string `json:"slug,omitempty"`
// Schedule contains the actual schedule configuration (interval, cron, or rrule)
Schedule Schedule `json:"schedule"`
// Active indicates if the schedule is active
Active *bool `json:"active,omitempty"`
// MaxScheduledRuns limits the number of scheduled runs
MaxScheduledRuns *int `json:"max_scheduled_runs,omitempty"`
// MaxActiveRuns limits the number of active runs
MaxActiveRuns *int `json:"max_active_runs,omitempty"`
// Catchup indicates if the worker should catch up on late runs
Catchup *bool `json:"catchup,omitempty"`
// Parameters are schedule-specific parameters
Parameters map[string]interface{} `json:"parameters,omitempty"`
}
// FlowSpec represents the request payload for creating flows
type FlowSpec struct {
Name string `json:"name"`
Tags []string `json:"tags,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
}
// Flow represents a Prefect flow
type Flow struct {
ID string `json:"id"`
Created time.Time `json:"created"`
Updated time.Time `json:"updated"`
Name string `json:"name"`
Tags []string `json:"tags"`
Labels map[string]string `json:"labels"`
}
type WorkPoolSpec struct {
Name string `json:"name,omitempty"`
Description *string `json:"description,omitempty"`
Type string `json:"type,omitempty"`
BaseJobTemplate map[string]interface{} `json:"base_job_template,omitempty"`
IsPaused *bool `json:"is_paused,omitempty"`
ConcurrencyLimit *int `json:"concurrency_limit,omitempty"`
// StorageConfiguration map[string]interface{} `json:"storage_configuration,omitempty"`
}
type WorkPool struct {
ID string `json:"id"`
Created time.Time `json:"created"`
Updated time.Time `json:"updated"`
Name string `json:"name"`
Type string `json:"type"`
Description *string `json:"description"`
BaseJobTemplate map[string]interface{} `json:"base_job_template"`
IsPaused bool `json:"is_paused"`
ConcurrencyLimit *int `json:"concurrency_limit"`
Status string `json:"status"`
DefaultQueueID *string `json:"default_queue_id"`
// StorageConfiguration map[string]interface{} `json:"storage_configuration,omitempty"`
}
// CreateOrUpdateDeployment creates or updates a deployment using the Prefect API
func (c *Client) CreateOrUpdateDeployment(ctx context.Context, deployment *DeploymentSpec) (*Deployment, error) {
url := fmt.Sprintf("%s/deployments/", c.BaseURL)
c.log.V(1).Info("Creating or updating deployment", "url", url, "deployment", deployment.Name)
jsonData, err := json.Marshal(deployment)
if err != nil {
return nil, fmt.Errorf("failed to marshal deployment: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to make request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
if !isSuccessStatusCode(resp.StatusCode) {
return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body))
}
var result Deployment
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
c.log.V(1).Info("Deployment created or updated successfully", "deploymentId", result.ID)
return &result, nil
}
// GetDeployment retrieves a deployment by ID
func (c *Client) GetDeployment(ctx context.Context, id string) (*Deployment, error) {
url := fmt.Sprintf("%s/deployments/%s", c.BaseURL, id)
c.log.V(1).Info("Getting deployment", "url", url, "deploymentId", id)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to make request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
if !isSuccessStatusCode(resp.StatusCode) {
return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body))
}
var result Deployment
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
c.log.V(1).Info("Deployment retrieved successfully", "deploymentId", result.ID)
return &result, nil
}
// GetDeploymentByName retrieves a deployment by name and flow ID
// This is a simplified implementation - in reality, you might need to use the deployments filter API
func (c *Client) GetDeploymentByName(ctx context.Context, name, flowID string) (*Deployment, error) {
// TODO: Implement proper filtering API call
// For now, this is a placeholder that would need to be implemented based on Prefect's filter API
return nil, fmt.Errorf("GetDeploymentByName not yet implemented - use GetDeployment with ID")
}
// UpdateDeployment updates an existing deployment
func (c *Client) UpdateDeployment(ctx context.Context, id string, deployment *DeploymentSpec) (*Deployment, error) {
url := fmt.Sprintf("%s/deployments/%s", c.BaseURL, id)
c.log.V(1).Info("Updating deployment", "url", url, "deploymentId", id)
jsonData, err := json.Marshal(deployment)
if err != nil {
return nil, fmt.Errorf("failed to marshal deployment updates: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "PATCH", url, bytes.NewBuffer(jsonData))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if c.APIKey != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to make request: %w", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
c.log.Error(err, "failed to close response body")
}
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
if !isSuccessStatusCode(resp.StatusCode) {
return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body))
}
var result Deployment
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
c.log.V(1).Info("Deployment updated successfully", "deploymentId", id)
return &result, nil
}
// DeleteDeployment deletes a deployment
func (c *Client) DeleteDeployment(ctx context.Context, id string) error {
url := fmt.Sprintf("%s/deployments/%s", c.BaseURL, id)
c.log.V(1).Info("Deleting deployment", "url", url, "deploymentId", id)
req, err := http.NewRequestWithContext(ctx, "DELETE", url, nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
resp, err := c.HTTPClient.Do(req)
if err != nil {
return fmt.Errorf("failed to make request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}
if !isSuccessStatusCode(resp.StatusCode) {
return fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body))
}
c.log.V(1).Info("Deployment deleted successfully", "deploymentId", id)
return nil
}
// CreateOrGetFlow creates a new flow or returns an existing one with the same name
func (c *Client) CreateOrGetFlow(ctx context.Context, flow *FlowSpec) (*Flow, error) {
// Check if flow already exists
existingFlow, err := c.GetFlowByName(ctx, flow.Name)
if err != nil {
return nil, fmt.Errorf("failed to check for existing flow: %w", err)
}
if existingFlow != nil {
c.log.V(1).Info("Flow already exists, returning existing flow", "flowName", flow.Name, "flowId", existingFlow.ID)
return existingFlow, nil
}
// If flow doesn't exist, create it
url := fmt.Sprintf("%s/flows/", c.BaseURL)
c.log.V(1).Info("Creating new flow", "url", url, "flowName", flow.Name)
jsonData, err := json.Marshal(flow)
if err != nil {
return nil, fmt.Errorf("failed to marshal flow spec: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if c.APIKey != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to make request: %w", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
c.log.Error(err, "failed to close response body")
}
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
if !isSuccessStatusCode(resp.StatusCode) {
return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body))
}
var result Flow
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
c.log.V(1).Info("Flow created successfully", "flowName", flow.Name, "flowId", result.ID)
return &result, nil
}
// GetFlowByName retrieves a flow by name
func (c *Client) GetFlowByName(ctx context.Context, name string) (*Flow, error) {
url := fmt.Sprintf("%s/flows/name/%s", c.BaseURL, name)
c.log.V(1).Info("Getting flow by name", "url", url, "flowName", name)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if c.APIKey != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to make request: %w", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
c.log.Error(err, "failed to close response body")
}
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
if resp.StatusCode == http.StatusNotFound {
return nil, nil // Flow doesn't exist
}
if !isSuccessStatusCode(resp.StatusCode) {
return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body))
}
var result Flow
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
c.log.V(1).Info("Flow retrieved successfully", "flowName", name, "flowId", result.ID)
return &result, nil
}
// GetWorkPool retrieves a deployment by ID
func (c *Client) GetWorkPool(ctx context.Context, name string) (*WorkPool, error) {
url := fmt.Sprintf("%s/work_pools/%s", c.BaseURL, name)
c.log.V(1).Info("Getting work pool", "url", url, "name", name)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to make request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
if resp.StatusCode == http.StatusNotFound {
return nil, nil
}
if !isSuccessStatusCode(resp.StatusCode) {
return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body))
}
var result WorkPool
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
c.log.V(1).Info("Work pool retrieved successfully", "workPool", result.ID)
return &result, nil
}
func (c *Client) CreateWorkPool(ctx context.Context, workPool *WorkPoolSpec) (*WorkPool, error) {
url := fmt.Sprintf("%s/work_pools/", c.BaseURL)
c.log.V(1).Info("Creating or updating work pool", "url", url, "workPool", workPool.Name)
jsonData, err := json.Marshal(workPool)
if err != nil {
return nil, fmt.Errorf("failed to marshal work pool: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to make request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
if !isSuccessStatusCode(resp.StatusCode) {
return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body))
}
var result WorkPool
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
c.log.V(1).Info("Work pool created successfully", "workPoolID", result.ID)
return &result, nil
}
// UpdateWorkPool updates an existing work pool
func (c *Client) UpdateWorkPool(ctx context.Context, name string, workPool *WorkPoolSpec) error {
url := fmt.Sprintf("%s/work_pools/%s", c.BaseURL, name)
c.log.V(1).Info("Updating work pool", "url", url, "name", name)
jsonData, err := json.Marshal(workPool)
if err != nil {
return fmt.Errorf("failed to marshal work pool updates: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "PATCH", url, bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if c.APIKey != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return fmt.Errorf("failed to make request: %w", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
c.log.Error(err, "failed to close response body")
}
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}
if !isSuccessStatusCode(resp.StatusCode) {
return fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body))
}
c.log.V(1).Info("Work pool updated successfully", "name", workPool.Name)
return nil
}
// DeleteWorkPool deletes a work pool by ID
func (c *Client) DeleteWorkPool(ctx context.Context, name string) error {
url := fmt.Sprintf("%s/work_pools/%s", c.BaseURL, name)
c.log.V(1).Info("Deleting work pool", "url", url, "workPoolName", name)
req, err := http.NewRequestWithContext(ctx, "DELETE", url, nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
resp, err := c.HTTPClient.Do(req)
if err != nil {
return fmt.Errorf("failed to make request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}
if !isSuccessStatusCode(resp.StatusCode) {
return fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body))
}
c.log.V(1).Info("Work pool deleted successfully", "workPoolName", name)
return nil
}
// isRunningInCluster checks if the operator is running in-cluster
func (c *Client) isRunningInCluster() bool {
_, err := rest.InClusterConfig()
return err == nil
}
type WorkerMetadata struct {
Type string `json:"type"`
Description string `json:"description"`
DisplayName string `json:"display_name"`
DocumentationURL string `json:"documentation_url"`
InstallCommand string `json:"install_command"`
IsBeta bool `json:"is_beta"`
LogoURL string `json:"logo_url"`
DefaultBaseJobTemplate map[string]interface{} `json:"default_base_job_configuration"`
}
// GetWorkerMetadata retrieves aggregate metadata for all worker types
func (c *Client) GetWorkerMetadata(ctx context.Context) (map[string]WorkerMetadata, error) {
url := fmt.Sprintf("%s/collections/views/aggregate-worker-metadata", c.BaseURL)
c.log.V(1).Info("Getting aggregate worker metadata", "url", url)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to make request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
if resp.StatusCode == http.StatusNotFound {
return nil, nil
}
if !isSuccessStatusCode(resp.StatusCode) {
return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body))
}
var result map[string]map[string]WorkerMetadata
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
c.log.V(1).Info("Worker metadata retrieved successfully")
metadata := map[string]WorkerMetadata{}
for _, integration := range result {
for workerType, worker := range integration {
metadata[workerType] = worker
}
}
return metadata, nil
}