-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathobjects.py
More file actions
1339 lines (1148 loc) · 54.3 KB
/
objects.py
File metadata and controls
1339 lines (1148 loc) · 54.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
"""
import collections
import datetime
import json
import math
from typing import Any, Dict, List, NamedTuple, Optional, Protocol, Set
import yaml
import pydantic
from src.lib.data import storage
from src.lib.data.storage.credentials import credentials as data_credentials
from src.lib.utils import credentials, common, osmo_errors, priority as wf_priority
from src.lib.utils.redact import redact_secrets
import src.lib.utils.logging
from src.utils.job import app, common as task_common, jobs, kb_objects, task, workflow
from src.utils import connectors, static_config, yaml as util_yaml
from src.utils.metrics import metrics
class WorkflowServiceConfig(connectors.RedisConfig, connectors.PostgresConfig,
src.lib.utils.logging.LoggingConfig,
static_config.StaticConfig, metrics.MetricsCreatorConfig):
""" Manages configuration specific to the workflow service. """
host: str = pydantic.Field(
command_line='host',
default='http://0.0.0.0:8000',
description='The url to bind to when serving the workflow service.')
device_endpoint: str | None = pydantic.Field(
command_line='device_endpoint',
default=None,
description='The url to bind to when authenticating with the device endpoint.')
device_client_id: str | None = pydantic.Field(
command_line='device_client_id',
default=None,
description='The client id to use when authenticating with the device endpoint.')
browser_endpoint: str | None = pydantic.Field(
command_line='browser_endpoint',
default=None,
description='The url to bind to when authenticating with the browser endpoint.')
browser_client_id: str | None = pydantic.Field(
command_line='browser_client_id',
default=None,
description='The client id to use when authenticating with the browser endpoint.')
token_endpoint: str | None = pydantic.Field(
command_line='token_endpoint',
default=None,
description='The url to bind to when authenticating with the token endpoint.')
logout_endpoint: str | None = pydantic.Field(
command_line='logout_endpoint',
default=None,
description='The url to bind to when authenticating with the logout endpoint.')
client_install_url: str | None = pydantic.Field(
command_line='client_install_url',
default=None,
description='The URL for the client install script shown in version update messages.')
progress_file: str = pydantic.Field(
command_line='progress_file',
env='OSMO_PROGRESS_FILE',
default='/var/run/osmo/last_progress',
description='The file to write progress timestamps to (For liveness/startup probes)')
progress_iter_frequency: str = pydantic.Field(
command_line='progress_iter_frequency',
env='OSMO_PROGRESS_ITER_FREQUENCY',
default='15s',
description='How often to write to progress file when processing tasks in a loop ('
'e.g. write to progress every 1 minute processed, like uploaded to DB). '
'Format needs to be <int><unit> where unit can be either s (seconds) and '
'm (minutes).')
default_admin_username: str | None = pydantic.Field(
command_line='default_admin_username',
env='OSMO_DEFAULT_ADMIN_USERNAME',
default=None,
description='The username for the default admin user to create on startup. '
'If set, default_admin_password must also be set.')
default_admin_password: str | None = pydantic.Field(
command_line='default_admin_password',
env='OSMO_DEFAULT_ADMIN_PASSWORD',
default=None,
description='The password (access token value) for the default admin user. '
'Must be set if default_admin_username is set.')
@pydantic.root_validator()
@classmethod
def validate_default_admin(cls, values):
"""
Validate that if default_admin_username is set, default_admin_password must also be set
"""
username = values.get('default_admin_username')
password = values.get('default_admin_password')
if username and not password:
raise ValueError(
'default_admin_password must be set when default_admin_username is specified')
return values
class WorkflowServiceContext(pydantic.BaseModel):
""" Shared context that needs to be access from all api methods. """
config: WorkflowServiceConfig
database: connectors.PostgresConnector
_instance: Optional['WorkflowServiceContext'] = None
class Config:
arbitrary_types_allowed = True
extra = 'forbid'
@classmethod
def set(cls, instance: 'WorkflowServiceContext'):
cls._instance = instance
@classmethod
def get(cls) -> 'WorkflowServiceContext':
if cls._instance is None:
raise ValueError(
'Using WorkflowServiceContext before initialization.')
return cls._instance
class ResourceUsage(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Object storing resource usage information. """
quota_used: str
quota_free: str
quota_limit: str
total_usage: str
total_capacity: str
total_free: str
class PoolResourceUsage(connectors.PoolMinimal, extra=pydantic.Extra.forbid):
""" Object storing pool information. """
resource_usage: ResourceUsage
class PoolNodeSetResourceUsage(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Object storing pool node set information. """
pools: List[PoolResourceUsage]
class PoolResponse(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Object storing pool information. """
node_sets: List[PoolNodeSetResourceUsage]
resource_sum: ResourceUsage
class SubmitResponse(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Object storing workflow name, logs, and spec after submission. """
# The name of the newly created workflow
name: str
overview: Optional[str]
logs: Optional[str]
spec: Optional[str]
dashboard_url: Optional[str]
@classmethod
@pydantic.root_validator
def logs_or_spec(cls, values):
if (values['logs'] is not None, values['spec'] is not None).count(True) != 1:
raise ValueError('Exactly one of "logs" or "spec" must be set')
return values
class CancelResponse(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Object storing workflow name. """
name: str
class ListEntry(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Entry for list API results. """
user: str
name: str
workflow_uuid: str
submit_time: datetime.datetime
start_time: datetime.datetime | None
end_time: datetime.datetime | None
queued_time: datetime.timedelta
duration: datetime.timedelta | None
status: workflow.WorkflowStatus
overview: str
logs: str
error_logs: str | None
grafana_url: str | None
dashboard_url: str | None
pool: str | None
app_owner: str | None
app_name: str | None
app_version: int | None
priority: str
@classmethod
def from_db_row(cls, row: Any, base_url: str,
backend_lookup: Dict) -> 'ListEntry':
""" Create ListEntry from the DB query result. """
context = WorkflowServiceContext.get()
config = context.config
overview = f'{base_url}/workflows/{row["workflow_id"]}'
if config.method == 'dev':
overview = f'{base_url}/api/workflow/{row["workflow_id"]}'
return ListEntry.construct(
user=row['submitted_by'], name=row['workflow_id'],
workflow_uuid=row['workflow_uuid'],
submit_time=row['submit_time'],
start_time=row['start_time'], end_time=row['end_time'],
status=workflow.WorkflowStatus(row['status']),
queued_time=get_workflow_queued_time(row, use_raw_row=True),
duration=get_workflow_duration(row, use_raw_row=True),
overview=overview,
logs=f'{base_url}/api/workflow/{row["workflow_id"]}/logs',
error_logs=f'{base_url}/api/workflow/{row["workflow_id"]}/error_logs' if \
str(row['status']).startswith('FAILED') else None,
grafana_url=generate_grafana_url(
row['workflow_uuid'], row['backend'], row['start_time'],
row['end_time'], backend_lookup),
dashboard_url=generate_dashboard_url(row['workflow_uuid'],
row['backend'], backend_lookup),
pool=row['pool'],
app_owner=row['app_owner'],
app_name=row['app_name'],
app_version=row['app_version'],
priority=row['priority'])
class ListResponse(pydantic.BaseModel, extra=pydantic.Extra.forbid):
workflows: List[ListEntry]
more_entries: bool
@classmethod
def from_db_rows(cls, rows: Any, base_url: str, more_entries: bool) -> 'ListResponse':
backend_lookup: Dict = {}
workflows = [ListEntry.from_db_row(row, base_url, backend_lookup) for row in rows]
return ListResponse.construct(workflows=workflows, more_entries=more_entries)
class ListTaskSummaryEntry(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Entry for task list API results. """
user: str
pool: str | None
storage: int # Gi
cpu: int
memory: int # Gi
gpu: int
priority: str
@classmethod
def from_db_row(cls, row: Any) -> 'ListTaskSummaryEntry':
""" Create ListEntry from the DB query result. """
return ListTaskSummaryEntry(
user=row['submitted_by'],
pool=row['pool'],
storage=row['disk_count'],
cpu=row['cpu_count'],
memory=row['memory_count'],
gpu=row['gpu_count'],
priority=row['priority'],
)
class ListTaskAggregatedEntry(ListTaskSummaryEntry, extra=pydantic.Extra.forbid):
""" Entry for task list API results, aggregated by workflow. """
workflow_id: str
@classmethod
def from_db_row(cls, row: Any) -> 'ListTaskAggregatedEntry':
return ListTaskAggregatedEntry.construct(
workflow_id=row['workflow_id'],
**ListTaskSummaryEntry.from_db_row(row).dict()
)
class ListTaskSummaryResponse(pydantic.BaseModel, extra=pydantic.Extra.forbid):
summaries: List[ListTaskSummaryEntry]
@classmethod
def from_db_rows(cls, rows: Any) -> 'ListTaskSummaryResponse':
summaries = [ListTaskSummaryEntry.from_db_row(row) for row in rows]
return ListTaskSummaryResponse(summaries=summaries)
class ListTaskAggregatedResponse(pydantic.BaseModel, extra=pydantic.Extra.forbid):
summaries: List[ListTaskAggregatedEntry]
@classmethod
def from_db_rows(cls, rows: Any) -> 'ListTaskAggregatedResponse':
summaries = [ListTaskAggregatedEntry.from_db_row(row) for row in rows]
return ListTaskAggregatedResponse(summaries=summaries)
class TaskEntry(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Entry for task GET API result. """
workflow_id: str
task_name: str
node: str | None
start_time: datetime.datetime | None
end_time: datetime.datetime | None
status: task.TaskGroupStatus
storage: int # Gi
cpu: int
memory: int # Gi
gpu: int
@classmethod
def from_db_row(cls, row: Dict) -> 'TaskEntry':
""" Create TaskEntry from the DB query result. """
return TaskEntry(
workflow_id=row['workflow_id'],
task_name=row['name'],
node=row['node_name'],
start_time=row['start_time'],
end_time=row['end_time'],
status=task.TaskGroupStatus(row['status']),
storage=row['disk_count'],
cpu=row['cpu_count'],
memory=row['memory_count'],
gpu=row['gpu_count'],
)
class ListTaskEntry(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Entry for task list API results. """
user: str
workflow_id: str
workflow_uuid: str
task_name: str
retry_id: int
pool: str | None
node: str | None
start_time: datetime.datetime | None
end_time: datetime.datetime | None
duration: datetime.timedelta | None
status: task.TaskGroupStatus
overview: str
logs: str
error_logs: str | None
grafana_url: str | None
dashboard_url: str | None
storage: int # Gi
cpu: int
memory: int # Gi
gpu: int
priority: str
@classmethod
def from_db_row(cls, row: Any, base_url: str,
backend_lookup: Dict) -> 'ListTaskEntry':
""" Create ListEntry from the DB query result. """
context = WorkflowServiceContext.get()
config = context.config
overview = f'{base_url}/workflows/{row["workflow_id"]}'
if config.method == 'dev':
overview = f'{base_url}/api/workflow/{row["workflow_id"]}'
return ListTaskEntry(
user=row['submitted_by'],
workflow_id=row['workflow_id'],
workflow_uuid=row['workflow_uuid'],
task_name=row['name'],
pool=row['pool'],
retry_id=row['retry_id'],
node=row['node_name'],
start_time=row['start_time'],
end_time=row['end_time'],
status=task.TaskGroupStatus(row['status']),
duration=get_workflow_duration(row, use_raw_row=True),
overview=overview,
logs=f'{base_url}/api/workflow/{row["workflow_id"]}/logs?task_name={row["name"]}',
error_logs=
f'{base_url}/api/workflow/{row["workflow_id"]}/error_logs?task_name={row["name"]}'\
if str(row['status']).startswith('FAILED') else None,
grafana_url=generate_grafana_url(
row['workflow_uuid'], row['backend'], row['start_time'],
row['end_time'], backend_lookup),
dashboard_url=generate_dashboard_url(row['workflow_uuid'],
row['backend'], backend_lookup),
storage=row['disk_count'],
cpu=row['cpu_count'],
memory=row['memory_count'],
gpu=row['gpu_count'],
priority=row['priority'],
)
class ListTaskResponse(pydantic.BaseModel, extra=pydantic.Extra.forbid):
tasks: List[ListTaskEntry]
@classmethod
def from_db_rows(cls, rows: Any, base_url: str) -> 'ListTaskResponse':
backend_lookup: Dict = {}
tasks = [ListTaskEntry.from_db_row(row, base_url, backend_lookup) for row in rows]
return ListTaskResponse(tasks=tasks)
class TaskQueryResponse(pydantic.BaseModel):
""" Represents the queryed group information. """
name: str
retry_id: int
status: task.TaskGroupStatus
failure_message: str | None
exit_code: int | None = None
logs: str
error_logs: str | None = None
processing_start_time: datetime.datetime | None = None
scheduling_start_time: datetime.datetime | None = None
initializing_start_time: datetime.datetime | None = None
events: str
start_time: datetime.datetime | None = None
end_time: datetime.datetime | None = None
input_download_start_time: datetime.datetime | None = None
input_download_end_time: datetime.datetime | None = None
output_upload_start_time: datetime.datetime | None = None
dashboard_url: str | None = None
pod_name: str
pod_ip: str | None = None
task_uuid: str
node_name: str | None = None
lead: bool = False
class GroupQueryResponse(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Represents the queryed task information. """
name: str
status: task.TaskGroupStatus
start_time: datetime.datetime | None = None
end_time: datetime.datetime | None = None
processing_start_time: datetime.datetime | None = None
scheduling_start_time: datetime.datetime | None = None
initializing_start_time: datetime.datetime | None = None
remaining_upstream_groups: Set[str] | None
downstream_groups: Set[str] | None
failure_message: str | None = None
tasks: List[TaskQueryResponse] = []
class WorkflowQueryResponse(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Represents the queryed workflow information. """
name: str
uuid: str
submitted_by: str
cancelled_by: str | None
spec: str
template_spec: str
logs: str
events: str
overview: str
parent_name: str | None
parent_job_id: int | None
dashboard_url: str | None
grafana_url: str | None
tags: List[str] = []
submit_time: datetime.datetime
start_time: datetime.datetime | None
end_time: datetime.datetime | None
exec_timeout: datetime.timedelta | None
queue_timeout: datetime.timedelta | None
duration: datetime.timedelta | None
queued_time: datetime.timedelta
status: workflow.WorkflowStatus
outputs: str = ''
groups: List[GroupQueryResponse]
pool: str | None
backend: str | None
app_owner: str | None
app_name: str | None
app_version: int | None
plugins: task_common.WorkflowPlugins
priority: str
@classmethod
def fetch_from_db(cls, database: connectors.PostgresConnector,
name: str, skip_groups: bool = False, verbose: bool = False
) -> 'WorkflowQueryResponse':
""" Fetch workflow information from the database. """
workflow_obj = workflow.Workflow.fetch_from_db(database, name, fetch_groups=False)
base_url = database.get_workflow_service_url()
logs = f'{base_url}/api/workflow/{workflow_obj.workflow_id}/logs?last_n_lines=1000'
events = f'{base_url}/api/workflow/{workflow_obj.workflow_id}/events'
spec = f'{base_url}/api/workflow/{workflow_obj.workflow_id}/spec'
template_spec = f'{spec}?use_template=true'
context = WorkflowServiceContext.get()
config = context.config
overview = f'{base_url}/workflows/{workflow_obj.workflow_id}'
if config.method == 'dev':
overview = f'{base_url}/api/workflow/{workflow_obj.workflow_id}'
groups = [] if skip_groups else get_groups(
database, workflow_obj.workflow_id, logs, events,
base_url, workflow_obj.backend, verbose)
app_info: app.App | None = None
if workflow_obj.app_uuid:
try:
app_info = app.App.fetch_from_db_from_uuid(database, workflow_obj.app_uuid)
except osmo_errors.OSMOUserError:
pass
return WorkflowQueryResponse(
name=workflow_obj.workflow_id,
uuid=workflow_obj.workflow_uuid,
submitted_by=workflow_obj.user,
cancelled_by=workflow_obj.cancelled_by,
spec=spec,
template_spec=template_spec,
logs=logs,
events=events,
overview=overview,
parent_name=workflow_obj.parent_name,
parent_job_id=workflow_obj.parent_job_id,
dashboard_url=generate_dashboard_url(workflow_obj.workflow_uuid,
workflow_obj.backend),
grafana_url=generate_grafana_url(workflow_obj.workflow_uuid,
workflow_obj.backend,
workflow_obj.start_time,
workflow_obj.end_time),
tags = get_workflow_tags(workflow_obj.workflow_uuid),
submit_time=workflow_obj.submit_time,
start_time=workflow_obj.start_time, end_time=workflow_obj.end_time,
queued_time=get_workflow_queued_time(workflow_obj),
exec_timeout=workflow_obj.timeout.exec_timeout,
queue_timeout=workflow_obj.timeout.queue_timeout,
duration=get_workflow_duration(workflow_obj), status=workflow_obj.status,
outputs=workflow_obj.outputs, groups=groups,
pool=workflow_obj.pool,
backend=workflow_obj.backend,
app_owner=app_info.owner if app_info else None,
app_name=app_info.name if app_info else None,
app_version=workflow_obj.app_version,
plugins=workflow_obj.plugins,
priority=workflow_obj.priority)
class ResourcesResponse(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Object storing execution cluster node resource information. """
resources: List[workflow.ResourcesEntry]
class PoolResourcesEntry(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Entry for resources API results. """
pool: str
platform: str
status: connectors.PoolStatus
usage_fields: Dict
allocatable_fields: Dict
backend: str
class PoolResourcesResponse(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Object storing execution cluster node resource information. """
pools: List[PoolResourcesEntry]
class DataUploadResponse(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Object storing Upload Response. """
version_id: str
container: str
endpoint_url: str
path: str
class DataDownloadResponse(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Object storing Download Response. """
location: str
container: str
endpoint_url: str
class CredentialRecord(NamedTuple):
cred_type: str
profile: str | None
payload: str
class CredentialProtocol(Protocol):
""" Protocol for credentials. """
@staticmethod
def type() -> connectors.CredentialType:
pass
def to_db_row(self, user: str, postgres: connectors.PostgresConnector) -> CredentialRecord:
pass
def valid_cred(self, workflow_config: connectors.WorkflowConfig):
pass
class UserRegistryCredential(
credentials.RegistryCredential,
extra=pydantic.Extra.forbid,
):
""" Authentication information for a Docker registry. """
auth: str = pydantic.Field(
description='The authentication token for the Docker registry') # type: ignore
@staticmethod
def type() -> connectors.CredentialType:
return connectors.CredentialType.REGISTRY
def to_db_row(self, user: str, postgres: connectors.PostgresConnector) -> CredentialRecord:
payload = {'username': self.username, 'auth': self.auth}
payload = postgres.encrypt_dict(payload, user)
return CredentialRecord(self.type().value,
self.registry,
connectors.PostgresConnector.encode_hstore(payload))
def valid_cred(self, workflow_config: connectors.WorkflowConfig):
self.registry = common.registry_parse(self.registry)
if self.registry in workflow_config.credential_config.disable_registry_validation:
return
response = common.registry_auth(f'https://{self.registry}/v2/',
self.username, self.auth)
if response.status_code != 200:
raise osmo_errors.OSMOCredentialError('Registry authentication failed.')
class UserDataCredential(
data_credentials.DataCredentialBase,
extra=pydantic.Extra.forbid,
):
""" Authentication information for a data service. """
access_key_id: str = pydantic.Field(
...,
description='The authentication key for a data backend',
)
access_key: str = pydantic.Field(
...,
description='The authentication secret for a data backend',
)
@staticmethod
def type() -> connectors.CredentialType:
return connectors.CredentialType.DATA
def to_db_row(self, user: str, postgres: connectors.PostgresConnector) -> CredentialRecord:
payload = {
'access_key_id': self.access_key_id,
'access_key': self.access_key,
}
if self.region:
payload['region'] = self.region
if self.override_url:
payload['override_url'] = self.override_url
payload = postgres.encrypt_dict(payload, user)
return CredentialRecord(
self.type().value,
self.endpoint,
connectors.PostgresConnector.encode_hstore(payload),
)
def valid_cred(self, workflow_config: connectors.WorkflowConfig):
storage_info = storage.construct_storage_backend(self.endpoint, True)
if storage_info.scheme in workflow_config.credential_config.disable_data_validation:
return
data_cred = data_credentials.StaticDataCredential(
endpoint=self.endpoint,
access_key_id=self.access_key_id,
access_key=pydantic.SecretStr(self.access_key),
region=self.region,
override_url=self.override_url,
)
storage_info.data_auth(data_cred)
class UserCredential(
pydantic.BaseModel,
extra=pydantic.Extra.forbid,
):
""" Generic authentication information. """
credential: Dict[str, str] = pydantic.Field(
description='The credential dictionary that contains authentication information'
)
@staticmethod
def type() -> connectors.CredentialType:
return connectors.CredentialType.GENERIC
def to_db_row(self, user: str, postgres: connectors.PostgresConnector) -> CredentialRecord:
payload = postgres.encrypt_dict(self.credential, user)
return CredentialRecord(self.type().value, None,
connectors.PostgresConnector.encode_hstore(payload))
@staticmethod
def from_db_row(rows) -> List:
creds = [{
'cred_name': row.cred_name,
'cred_type': row.cred_type,
'profile': row.profile,
} for row in rows]
return creds
@staticmethod
def commit_cmd() -> str:
cmd = '''
INSERT INTO credential
(user_name, cred_name, cred_type, profile, payload)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (user_name, profile) DO UPDATE SET
cred_name = EXCLUDED.cred_name,
cred_type = EXCLUDED.cred_type,
payload = EXCLUDED.payload;
'''
return cmd
def valid_cred(self, workflow_config: connectors.WorkflowConfig):
# pylint: disable=unused-argument
pass
class CredentialOptions(pydantic.BaseModel):
""" Credential options """
registry_credential: Optional[UserRegistryCredential] = pydantic.Field(
description='Authentication information for a Docker registry')
data_credential: Optional[UserDataCredential] = pydantic.Field(
description='Authentication information for a data service')
generic_credential: Optional[UserCredential] = pydantic.Field(
description='Generic authentication information')
@pydantic.root_validator(pre=True)
def validate_credential(cls, values): # pylint: disable=no-self-argument
""" A valid credential can only be one of the three types """
num_fields_set = sum(1 for value in values.values()
if value is not None)
if num_fields_set != 1:
raise osmo_errors.OSMOUserError(
f'Exactly one of the following must be set {cls.__fields__.keys()}')
return values
def get_credential(self) -> CredentialProtocol:
if self.registry_credential is not None:
return self.registry_credential
elif self.data_credential is not None:
return self.data_credential
elif self.generic_credential is not None:
return self.generic_credential
else:
raise osmo_errors.OSMOUserError(
f'Exactly one of the following must be set: {self.__fields__.keys()}')
class CredentialGetResponse(pydantic.BaseModel):
""" Credential Response. """
credentials: List[Dict[str, Optional[str]]]
class RouterResponse(pydantic.BaseModel):
""" Router Information Response. """
router_address: str
key: str
cookie: str
class WorkflowSubmitInfo(pydantic.BaseModel):
''' Performs the workflow submission in steps '''
context: WorkflowServiceContext
base32_id: str = ''
name: str = ''
parent_workflow_id: str | None
app_uuid: str | None = None
app_version: int | None = None
user: str
pool: str = ''
priority: wf_priority.WorkflowPriority = wf_priority.WorkflowPriority.NORMAL
backend: str = ''
def build_workflow_object(self,
rendered_spec: workflow.WorkflowSpec,
group_and_task_uuids: Dict,
remaining_upstream_groups: Dict,
downstream_groups: Dict,
failure_message: str | None = None) -> workflow.Workflow:
workflow_obj = workflow.Workflow.from_workflow_spec(
self.context.database, self.name,
self.base32_id, self.user, rendered_spec, self.context.config.redis_url,
group_and_task_uuids, remaining_upstream_groups, downstream_groups,
status=workflow.WorkflowStatus.PENDING\
if failure_message is None else workflow.WorkflowStatus.FAILED_SUBMISSION,
failure_message=failure_message or '', parent_workflow_id=self.parent_workflow_id,
app_uuid=self.app_uuid, app_version=self.app_version, priority=self.priority)
return workflow_obj
def insert_failed_submission_to_db(self, failure_message: str) -> workflow.Workflow:
workflow_obj = workflow.Workflow.from_workflow(
self.context.database, self.name,
self.base32_id, self.user, backend=self.backend, pool=self.pool,
failure_message=failure_message or '',
parent_workflow_id=self.parent_workflow_id, app_uuid=self.app_uuid,
app_version=self.app_version, priority=self.priority)
workflow_obj.insert_to_db()
return workflow_obj
def construct_workflow_dict(self, template_spec: workflow.TemplateSpec) -> Dict:
# Render the workflow spec
# Verify pool
pool_info = connectors.Pool.fetch_from_db(self.context.database, self.pool)
self.backend = pool_info.backend
try:
updated_workflow_txt = template_spec.load_template_with_variables()
updated_workflow_dict: Dict[str, Any] = yaml.safe_load(updated_workflow_txt)
except yaml.YAMLError as yaml_error:
err_msg=f'Workflow spec is not properly formatted: {yaml_error}'
# Construct a workflow ID with format <name>-<number>
# Appending failed in the front because the base32_id may start with a number, which
# fails the regex check.
self.name = f'failed-{self.base32_id}'
# Needs to be in the above format to pass the deconstruct_workflow_id() function in
# the from_workflow() call
try:
self.insert_failed_submission_to_db(str(yaml_error))
except: # pylint: disable=bare-except
pass
raise osmo_errors.OSMOUsageError(err_msg, workflow_id=self.base32_id)
self.name = updated_workflow_dict.get('workflow', {}).get('name', '')
self.name = self.name if self.name else f'failed-{self.base32_id}'
return updated_workflow_dict
def construct_workflow_spec_from_dict(self, workflow_dict: Dict)\
-> workflow.WorkflowSpec:
try:
versioned_workflow_spec = workflow.VersionedWorkflowSpec(
**workflow_dict)
except pydantic.ValidationError as err:
try:
self.insert_failed_submission_to_db(str(err))
except: # pylint: disable=bare-except
pass
raise osmo_errors.OSMOUsageError(f'{err}', workflow_id=self.name)
return versioned_workflow_spec.workflow
def update_dataset_buckets(self, workflow_spec: workflow.WorkflowSpec):
postgres = connectors.PostgresConnector.get_instance()
dataset_config = postgres.get_dataset_configs()
default_user_bucket = connectors.UserProfile.fetch_from_db(postgres, self.user).bucket
def _fetch_bucket(dataset_info_bucket: str) -> str:
if dataset_info_bucket:
bucket = dataset_info_bucket
elif default_user_bucket:
bucket = default_user_bucket
elif dataset_config.default_bucket:
bucket = dataset_config.default_bucket
else:
raise osmo_errors.OSMOUserError(
'No default bucket set. Specify default bucket using the '
'"osmo profile set" CLI.')
return bucket
def _update_bucket(task_obj: task.TaskSpec):
for dataset_input in task_obj.inputs + task_obj.outputs:
if isinstance(dataset_input, task.DatasetInputOutput):
dataset_info = common.DatasetStructure(dataset_input.dataset.name,
workflow_spec=True)
dataset_info.bucket = _fetch_bucket(dataset_info.bucket)
dataset_input.dataset.name = dataset_info.full_name
elif isinstance(dataset_input, task.UpdateDatasetOutput):
dataset_info = common.DatasetStructure(dataset_input.update_dataset.name,
workflow_spec=True)
dataset_info.bucket = _fetch_bucket(dataset_info.bucket)
dataset_input.update_dataset.name = dataset_info.full_name
for group in workflow_spec.groups:
for group_task in group.tasks:
_update_bucket(group_task)
for task_obj in workflow_spec.tasks:
_update_bucket(task_obj)
def send_workflow_spec_to_queue(self, workflow_id: str, workflow_dict: Dict,
original_templated_spec: str | None = None):
# Convert file contents to YamlLiteral for better output format
def convert_task_file_contents(curr_task_spec: Dict):
for file in curr_task_spec.get('files', []):
file['contents'] = util_yaml.YamlLiteral(file['contents'])
for task_spec in workflow_dict['workflow'].get('tasks', []):
convert_task_file_contents(task_spec)
for group in workflow_dict['workflow'].get('groups', []):
for task_spec in group.get('tasks', []):
convert_task_file_contents(task_spec)
workflow_spec = yaml.dump(workflow_dict, default_flow_style=False, allow_unicode=True)
# Redact secrets in the workflow spec
workflow_spec = ''.join(redact_secrets((workflow_spec,)))
files = [
jobs.File(path=common.WORKFLOW_SPEC_FILE_NAME, content=workflow_spec)
]
if original_templated_spec is not None:
# Redact secrets in the original templated spec
original_templated_spec = ''.join(redact_secrets((original_templated_spec,)))
files.append(jobs.File(
path=common.TEMPLATED_WORKFLOW_SPEC_FILE_NAME,
content=original_templated_spec))
upload_spec_job = jobs.UploadWorkflowFiles(
workflow_id=workflow_id,
workflow_uuid=self.base32_id,
files=files)
upload_spec_job.send_job_to_queue()
def validate_workflow_spec(
self, rendered_spec: workflow.WorkflowSpec,
group_and_task_uuids: Dict[str, common.UuidPattern],
roles: List[str],
original_templated_spec: str | None,
priority: wf_priority.WorkflowPriority = wf_priority.WorkflowPriority.NORMAL):
"""
Validate workflow spec by checking:
- if user has the necessary credentials to pull datasets
- if the workflow can match any resource node that has enough allocatables for the workflow
- if this workflow's Docker containers can be pull with user and service Docker credentials
If validation fails, insert this workflow entry into the database, and upload the spec.
"""
remaining_upstream_groups: Dict = collections.defaultdict(set)
downstream_groups: Dict = collections.defaultdict(set)
upload_workflow_spec = True
try:
# Validate tasks
rendered_spec.validate_name_and_inputs()
# Check if pool is online
pool_info = connectors.Pool.fetch_from_db(self.context.database, self.pool)
if pool_info.status == connectors.PoolStatus.MAINTENANCE:
if 'osmo-admin' not in roles:
upload_workflow_spec = False
raise osmo_errors.OSMOUsageError(
f'Pool {self.pool} is undergoing maintenance. '
'Users will have to wait until maintenance is completed before submission.')
# Validate priority
backend_info = connectors.Backend.fetch_from_db(self.context.database, self.backend)
object_factory = kb_objects.get_k8s_object_factory(backend_info)
if not object_factory.priority_supported():
if priority != wf_priority.WorkflowPriority.NORMAL:
upload_workflow_spec = False
required_priority = wf_priority.WorkflowPriority.NORMAL.value
raise osmo_errors.OSMOUsageError(
f'Backend {self.backend} does not support priority. '
f'Workflows must be submitted with {required_priority} priority',
workflow_id=self.name)
# Validate gpu counts for NORMAL/HIGH priority workflows
if priority != wf_priority.WorkflowPriority.LOW:
for group_obj_spec in rendered_spec.groups:
group_gpus = 0
for task_obj_spec in group_obj_spec.tasks:
group_gpus += task_obj_spec.resources.gpu or 0
if pool_info.resources.gpu and pool_info.resources.gpu.guarantee != -1 and \
group_gpus > pool_info.resources.gpu.guarantee:
raise osmo_errors.OSMOUsageError(
f'Pool {self.pool} has {pool_info.resources.gpu.guarantee} GPUs '
'guaranteed for NORMAL/HIGH priority workflows and '
f'group {group_obj_spec.name} requires {group_gpus} GPUs. '
'Higher gpu counts must be submitted with '
f'{wf_priority.WorkflowPriority.LOW.value} priority',
workflow_id=self.name)
# Validate the resources
resources_list = get_resources(pools=[self.pool], verbose=True).resources
platform_to_resource_map: Dict[str, List[workflow.ResourcesEntry]] = {}
for resource in resources_list:
for platform in resource.pool_platform_labels[self.pool]:
if platform not in platform_to_resource_map:
platform_to_resource_map[platform] = [resource]
else:
platform_to_resource_map[platform].append(resource)
rendered_spec.validate_resources(platform_to_resource_map)
rendered_spec.validate_credentials(self.user)
except Exception as err: # pylint: disable=broad-except
if upload_workflow_spec:
workflow_obj = self.build_workflow_object(
failure_message=(str(err) if not isinstance(err, osmo_errors.OSMOError)