-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathpostgres.py
More file actions
4568 lines (3974 loc) · 181 KB
/
postgres.py
File metadata and controls
4568 lines (3974 loc) · 181 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
"""
import abc
import atexit
import contextlib
import copy
import datetime
import enum
import json
import logging
import math
import types
import os
import re
import threading
import typing
from functools import wraps
from typing import Any, Callable, Dict, Generator, List, Literal, Optional, Tuple, Type
from urllib.parse import urlparse
import fastapi
import psycopg2 # type: ignore
import psycopg2.extras # type: ignore
import psycopg2.pool # type: ignore
import pydantic
import yaml
from jwcrypto import jwe # type: ignore
from jwcrypto.common import JWException # type: ignore
from src.lib.data import storage
from src.lib.data.storage import constants
from src.lib.utils import (common, credentials, jinja_sandbox, login,
osmo_errors, role, validation)
from src.utils import auth, notify
from src.utils.secret_manager import Encrypted, SecretManager
def backend_action_queue_name(backend_name: str) -> str:
return f'backend-connections:{backend_name}'
class ExtraType(enum.Enum):
""" Setting for Pydantic Extra """
ALLOW = pydantic.Extra.allow
FORBID = pydantic.Extra.forbid
IGNORE = pydantic.Extra.ignore
class CredentialType(enum.Enum):
""" User profile type / table name if exist """
GENERIC = 'GENERIC'
REGISTRY = 'REGISTRY'
DATA = 'DATA'
class ConfigType(enum.Enum):
""" Type of Config to fetch or set """
SERVICE = 'SERVICE'
WORKFLOW = 'WORKFLOW'
DATASET = 'DATASET'
class ConfigHistoryType(enum.Enum):
""" Type of configs supported by config history """
SERVICE = 'SERVICE'
WORKFLOW = 'WORKFLOW'
DATASET = 'DATASET'
BACKEND = 'BACKEND'
POOL = 'POOL'
POD_TEMPLATE = 'POD_TEMPLATE'
GROUP_TEMPLATE = 'GROUP_TEMPLATE'
RESOURCE_VALIDATION = 'RESOURCE_VALIDATION'
BACKEND_TEST = 'BACKEND_TEST'
ROLE = 'ROLE'
class DownloadType(str, enum.Enum):
""" Type of Config to fetch or set """
DOWNLOAD = 'download'
@staticmethod
def from_str(label) -> 'DownloadType':
if label == 'download':
return DownloadType.DOWNLOAD
else:
raise NotImplementedError
def is_mounting(self) -> bool:
return self.value != 'download'
class PoolType(enum.Enum):
""" Pool type for amount of info to output """
VERBOSE = 'VERBOSE'
EDITABLE = 'EDITABLE'
MINIMAL = 'MINIMAL'
class PoolStatus(enum.Enum):
""" Represents the types of statuses a pool can have. """
ONLINE = 'ONLINE'
OFFLINE = 'OFFLINE'
MAINTENANCE = 'MAINTENANCE'
class ClusterResources(pydantic.BaseModel):
cpus: int = pydantic.Field(4, alias='cpu')
gpus: int = pydantic.Field(0, alias='nvidia.com/gpu')
ephemeral_storage: str = pydantic.Field('50Gi', alias='ephemeral-storage')
memory: str = '20Gi'
class PostgresConfig(pydantic.BaseModel):
""" Manages the config for the postgres database. """
postgres_host: str = pydantic.Field(
command_line='postgres_host',
env='OSMO_POSTGRES_HOST',
default='localhost',
description='The hostname of the postgres server to connect to.')
postgres_port: int = pydantic.Field(
command_line='postgres_port',
env='OSMO_POSTGRES_PORT',
default=5432,
description='The port of the postgres server to connect to.')
postgres_user: str = pydantic.Field(
command_line='postgres_user',
env='OSMO_POSTGRES_USER',
default='postgres',
description='The user of the postgres server.')
postgres_password: str = pydantic.Field(
command_line='postgres_password',
env='OSMO_POSTGRES_PASSWORD',
description='The password to connect to the postgres server.')
postgres_database_name: str = pydantic.Field(
command_line='postgres_database_name',
env='OSMO_POSTGRES_DATABASE_NAME',
default='osmo_db',
description='The database name for postgres server.')
postgres_reconnect_retry: int = pydantic.Field(
command_line='postgres_reconnect_retry',
env='OSMO_POSTGRES_RECONNECT_RETRY',
type=validation.positive_integer,
default=5,
description='Reconnect try count after connection error')
mek_file: str = pydantic.Field(
command_line='mek_file',
env='OSMO_MEK_FILE',
default='/home/osmo/vault-agent/secrets/vault-secrets.yaml',
description='Path to the file that stores master encryption keys'
)
method: Literal['dev'] | None = pydantic.Field(
command_line='method',
default=None,
description='If set to "dev", use the default local mek file'
'ingoring `mek_file` field.')
dev_user: str = pydantic.Field(
command_line='dev_user',
default='testuser',
description='If method is set to "dev", the browser flow to the service will use this '
'user name.')
# Deployment configuration fields from Helm values for auto-initialization
osmo_image_location: str | None = pydantic.Field(
command_line='osmo_image_location',
default=None,
description='The image registry location for OSMO images')
osmo_image_tag: str | None = pydantic.Field(
command_line='osmo_image_tag',
default=None,
description='The image tag for OSMO images')
service_hostname: str | None = pydantic.Field(
command_line='service_hostname',
default=None,
description='The public hostname for the OSMO service (used for URL generation)')
postgres_pool_minconn: int = pydantic.Field(
command_line='postgres_pool_minconn',
type=validation.positive_integer,
env='OSMO_POSTGRES_POOL_MINCONN',
default=1,
description='Minimum number of connections to keep in the connection pool')
postgres_pool_maxconn: int = pydantic.Field(
command_line='postgres_pool_maxconn',
type=validation.positive_integer,
env='OSMO_POSTGRES_POOL_MAXCONN',
default=10,
description='Maximum number of connections allowed in the connection pool')
schema_version: str = pydantic.Field(
command_line='schema_version',
env='OSMO_SCHEMA_VERSION',
default='public',
description='pgroll schema version to use. '
'Set to "public" to use the default schema without pgroll versioning.')
def retry(func=None, *, reconnect: bool = True):
"""
Retry database operations in case of connection/pool errors.
Handles psycopg2 InterfaceError, DatabaseError, and pool.PoolError.
When reconnect is True and an error occurs, the connection pool is
recreated before retrying.
"""
def decorator(fn):
@wraps(fn)
def retry_wrapper(*args, **kwargs):
self = args[0]
last_error: Exception | None = None
for _ in range(self.config.postgres_reconnect_retry):
try:
return fn(*args, **kwargs)
except (psycopg2.InterfaceError, psycopg2.DatabaseError,
psycopg2.pool.PoolError) as error:
logging.error('Database/pool error, retrying: %s', str(error))
last_error = error
if reconnect:
self.connect()
except osmo_errors.OSMOError as error:
raise error
except Exception as error: # pylint: disable=broad-except
raise osmo_errors.OSMODatabaseError(f'Error: {str(error)}')
if last_error:
raise osmo_errors.OSMODatabaseError(f'Error: {str(last_error)}')
return retry_wrapper
if func is None:
return decorator
else:
return decorator(func)
class PostgresConnector:
""" Manages the connection to the postgres database using a ThreadedConnectionPool. """
_instance: 'PostgresConnector | None' = None
_pool: psycopg2.pool.ThreadedConnectionPool | None
_pool_lock: threading.Lock
_pool_semaphore: threading.Semaphore
@staticmethod
def get_instance():
""" Static access method. """
if not PostgresConnector._instance:
raise osmo_errors.OSMOError(
'Postgres Connector has not been created!')
return PostgresConnector._instance
def _create_pool(self, search_path: str | None = None):
"""Create the ThreadedConnectionPool and semaphore."""
try:
if self.config.postgres_pool_minconn > self.config.postgres_pool_maxconn:
raise osmo_errors.OSMOUsageError(
'postgres_pool_minconn cannot be greater than postgres_pool_maxconn')
self._pool = psycopg2.pool.ThreadedConnectionPool(
minconn=self.config.postgres_pool_minconn,
# +1 to ensure we never exhaust the pool
# This leaves 1 connection for retry/recovery scenarios
maxconn=self.config.postgres_pool_maxconn + 1,
host=self.config.postgres_host,
port=self.config.postgres_port,
database=self.config.postgres_database_name,
user=self.config.postgres_user,
password=self.config.postgres_password,
options=f'-csearch_path={search_path}' if search_path else None
)
self._pool_semaphore = threading.Semaphore(self.config.postgres_pool_maxconn)
except (psycopg2.DatabaseError, psycopg2.OperationalError) as error:
logging.error('Database Error while creating connection pool: %s', str(error))
raise osmo_errors.OSMOConnectionError(str(error))
def connect(self):
"""Create or recreate the connection pool."""
with self._pool_lock:
if self._pool is not None:
try:
self._pool.closeall()
except Exception: # pylint: disable=broad-except
pass
schema = self.config.schema_version
self._create_pool(search_path=schema if schema != 'public' else None)
def _is_connection_healthy(self, conn) -> bool:
"""Check if a connection is still healthy."""
if conn is None or conn.closed:
return False
try:
with conn.cursor() as cur:
cur.execute('SELECT 1')
# Rollback to ensure clean state after the check
conn.rollback()
return True
except (psycopg2.DatabaseError, psycopg2.InterfaceError):
return False
@contextlib.contextmanager
def _get_connection(self, autocommit: bool = False) -> Generator:
"""
Context manager for acquiring a connection from the pool.
Uses a semaphore to limit concurrent connections and prevent pool exhaustion.
Threads will block on the semaphore if all connections are in use.
Args:
autocommit: If True, set the connection to autocommit mode.
Yields:
A database connection from the pool.
"""
pool = self._pool
semaphore = self._pool_semaphore
if pool is None:
raise osmo_errors.OSMOConnectionError('Connection pool is not initialized.')
# Acquire semaphore - blocks if all connections are in use
semaphore.acquire()
conn = None
try:
conn = pool.getconn()
# Validate the connection
if not self._is_connection_healthy(conn):
# Return bad connection and get a fresh one
try:
pool.putconn(conn, close=True)
except Exception: # pylint: disable=broad-except
pass
conn = pool.getconn()
if autocommit:
# Rollback any pending transaction before setting autocommit
# set_session cannot be called inside a transaction
conn.rollback()
conn.set_session(autocommit=True)
yield conn
finally:
if conn is not None:
try:
# Rollback any uncommitted transaction to ensure clean state
conn.rollback()
# Reset autocommit mode before returning to pool
if autocommit:
conn.set_session(autocommit=False)
pool.putconn(conn)
except Exception: # pylint: disable=broad-except
# If we can't return it properly, close it
try:
pool.putconn(conn, close=True)
except Exception: # pylint: disable=broad-except
pass
# Always release the semaphore
semaphore.release()
def __init__(self, config: PostgresConfig):
if PostgresConnector._instance:
raise osmo_errors.OSMOError(
'Only one instance of Postgres Connector can exist!')
logging.debug('Connecting to postgres server at %s:%s...', config.postgres_host,
config.postgres_port)
self.config = config
self._pool_lock = threading.Lock()
self._create_pool()
logging.debug('Finished connecting to postgres database')
logging.debug('Initializing secret manager')
PostgresConnector._instance = self
mek_file = self.config.mek_file
if self.config.method == 'dev':
ExtraArgBaseModel.set_extra(ExtraType.ALLOW)
mek_file = os.path.join(os.path.dirname(__file__), '..', 'secret_manager', 'mek.yaml')
self.secret_manager = SecretManager(
mek_file,
self.read_uek, self.write_uek, self.read_current_kid, self.add_user)
logging.debug('Secret manager initialized')
logging.debug('Initializing tables')
self._init_tables()
logging.debug('Tables initialized')
logging.debug('Initializing configs')
self._init_configs()
logging.debug('Configs initialized')
# Recreate pool with search_path set to the pgroll versioned schema
if self.config.schema_version != 'public':
logging.debug('Switching to pgroll schema: %s', self.config.schema_version)
self.connect()
# Register cleanup on exit
atexit.register(self.close)
def close(self):
"""Close all connections in the pool."""
with self._pool_lock:
if self._pool is not None:
try:
self._pool.closeall()
logging.debug('Connection pool closed')
except Exception: # pylint: disable=broad-except
pass
self._pool = None
def __del__(self):
try:
self.close()
except Exception: # pylint: disable=broad-except
pass
@property
def method(self) -> str | None:
return self.config.method
@retry
def execute_fetch_command(self, command: str,
args: Tuple, return_raw: bool = False) -> List[Any]:
"""
Connects and executes a command to fetch info from the database.
Args:
command (str): The command to execute.
args (Tuple): Any args for the command.
return_raw (bool): Return the psycopg2 RealDictRow objects instead of
pydantic DynamicModel objects.
Raises:
OSMODatabaseError: Error while executing the database command.
Returns:
Any results from the command.
"""
with self._get_connection() as conn:
cur = None
try:
cur = conn.cursor(
cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(command, args)
rows = cur.fetchall()
if not return_raw:
# Cast memoryview objects to bytes and provide attribute access
rows = [
types.SimpleNamespace(**{k: common.handle_memoryview(v)
for k, v in row.items()})
for row in rows]
cur.close()
conn.commit()
return rows
except (psycopg2.DatabaseError, psycopg2.InterfaceError) as error:
try:
if cur is not None:
cur.close()
conn.rollback()
except Exception: # pylint: disable=broad-except
pass
raise error
except Exception as error: # pylint: disable=broad-except
raise osmo_errors.OSMODatabaseError(
f'Error during executing command {command}: {error}')
finally:
if cur is not None:
cur.close()
@retry
def execute_commit_command(self, command: str, args: Tuple):
"""
Connects and executes a command that updates the database.
Args:
command (str): The command to execute.
args (Tuple): Any args for the command.
Raises:
OSMODatabaseError: Error while executing the database command.
"""
with self._get_connection() as conn:
cur = None
try:
cur = conn.cursor()
cur.execute(command, args)
cur.close()
conn.commit()
except (psycopg2.DatabaseError, psycopg2.InterfaceError) as error:
try:
if cur is not None:
cur.close()
conn.rollback()
except Exception: # pylint: disable=broad-except
pass
raise error
except Exception as error: # pylint: disable=broad-except
raise osmo_errors.OSMODatabaseError(
f'Error during executing command {command}: {error}')
finally:
if cur is not None:
cur.close()
@retry(reconnect=False)
def execute_autocommit_command(self, command: str, args: Tuple):
"""
Connects and executes a command on the database in autocommit mode.
Args:
command (str): The command to execute.
args (Tuple): Any args for the command.
Raises:
OSMODatabaseError: Error while executing the database command.
"""
with self._get_connection(autocommit=True) as conn:
cursor = None
try:
cursor = conn.cursor()
cursor.execute(command, args)
except (psycopg2.DatabaseError, psycopg2.InterfaceError) as error:
raise error
except Exception as error: # pylint: disable=broad-except
raise osmo_errors.OSMODatabaseError(
f'Error during executing command {command}: {error}')
finally:
if cursor is not None:
cursor.close()
def mogrify(self, entries: List[Tuple]):
"""
Run mogrify on a list of tuples and turn it into a string that can be used
for inserting multiple rows. This prevents SQL injections from happening
when constructing the string that defines these rows.
All the tuples need to have the same number of elements.
Args:
entries (List[tuple]): Each entry defines the attributes for each row.
Raises:
OSMODatabaseError: Error while executing the database command.
"""
with self._get_connection() as conn:
cur = conn.cursor()
entry_length = len(entries[0])
for entry in entries:
if len(entry) != entry_length:
raise osmo_errors.OSMOSchemaError(
'Mogrify: entries do not have the same number of elements!')
input_str = f'({", ".join(["%s"] * entry_length)})'
final_str = ', '.join(
cur.mogrify(input_str, entry).decode('utf-8') for entry in entries)
cur.close()
return final_str
def get_configs(self, config_type: ConfigType):
""" Get all the config values. """
cmd = 'SELECT * FROM configs WHERE type = %s;'
result = self.execute_fetch_command(cmd, (config_type.value,))
if not result:
raise osmo_errors.OSMODatabaseError('Configs are not found.')
result_dicts = {}
primative_types = {str, int, float, pydantic.SecretStr}
config_class: Type[DynamicConfig]
if config_type == ConfigType.SERVICE:
hints = typing.get_type_hints(ServiceConfig)
config_class = ServiceConfig
elif config_type == ConfigType.WORKFLOW:
hints = typing.get_type_hints(WorkflowConfig)
config_class = WorkflowConfig
elif config_type == ConfigType.DATASET:
hints = typing.get_type_hints(DatasetConfig)
config_class = DatasetConfig
else:
raise osmo_errors.OSMOServerError(f'Config type: {config_type.value} unknown')
for model in result:
if model.key not in hints:
continue
item_type = hints[model.key]
if item_type in primative_types:
result_dicts[model.key] = model.value
else:
result_dicts[model.key] = json.loads(model.value)
return config_class.deserialize(result_dicts, self)
def get_service_configs(self) -> 'ServiceConfig':
return self.get_configs(ConfigType.SERVICE)
def get_workflow_configs(self) -> 'WorkflowConfig':
return self.get_configs(ConfigType.WORKFLOW)
def get_dataset_configs(self) -> 'DatasetConfig':
return self.get_configs(ConfigType.DATASET)
def get_method(self) -> Optional[Literal['dev']]:
return self.config.method
def decrypt_credential(self, db_row) -> Dict:
result = {}
payload = PostgresConnector.decode_hstore(db_row.payload)
for key, value in payload.items():
try:
jwetoken = jwe.JWE()
jwetoken.deserialize(value)
encrypted = Encrypted(value)
cmd = (
'UPDATE credential SET payload[%s] = %s WHERE '
'user_name = %s AND cred_name = %s AND '
'AND payload[%s] = %s;'
)
cmd_args = (key, db_row.user_name, db_row.cred_name, key, value)
decrypted = self.secret_manager.decrypt(
encrypted, db_row.user_name,
self.generate_update_secret_func(cmd, cmd_args))
result[key] = decrypted.value
except (JWException, osmo_errors.OSMONotFoundError):
result[key] = value
encrypted = self.secret_manager.encrypt(value, db_row.user_name)
cmd = (
'UPDATE credential SET payload[%s] = %s WHERE '
'user_name = %s AND cred_name = %s;'
)
self.execute_commit_command(
cmd, (key, encrypted.value, db_row.user_name, db_row.cred_name))
return result
def encrypt_dict(self, input_dict: Dict, user: str) -> Dict:
result = {}
for key, value in input_dict.items():
encrypted = self.secret_manager.encrypt(value, user)
result[key] = encrypted.value
return result
def set_config(self, key: str, value: str | None, config_type: ConfigType):
""" Set the config value for the given key. """
cmd = 'UPDATE configs SET value = %s WHERE key = %s and type = %s;'
return self.execute_commit_command(cmd, (value, key, config_type.value))
@classmethod
def encode_hstore(cls, key_val_data: Dict) -> str:
""" Encodes a dictionary into a hstore string. """
return ','.join([f'"{key}"=>"{value}"' for key, value in key_val_data.items()])
@classmethod
def decode_hstore(cls, hstore_data: str) -> Dict:
""" Decodes a hstore string into a dictionary. """
field_regex = r'[^()\'"]+'
return {tp[0]: tp[1] for tp in re.findall(f'"({field_regex})"=>"({field_regex})"',
hstore_data)}
def _set_default_config(self, key: str, value: str, config_type: ConfigType):
""" Set the default config value for the given key. """
cmd = 'INSERT INTO configs (key, value, type) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING;'
return self.execute_commit_command(cmd, (str(key), str(value), config_type.value))
def _init_tables(self):
""" Initializes tables if not exist. """
# Install hstore extension
create_cmd = 'CREATE EXTENSION IF NOT EXISTS hstore SCHEMA public;'
self.execute_commit_command(create_cmd, ())
# Creates table for dynamic configs.
create_cmd = '''
CREATE TABLE IF NOT EXISTS configs (
key TEXT,
value TEXT,
type TEXT,
PRIMARY KEY (key, type)
);
'''
self.execute_commit_command(create_cmd, ())
# Creates table for roles
create_cmd = """
CREATE TABLE IF NOT EXISTS roles (
name TEXT,
description TEXT,
policies JSONB[],
immutable BOOLEAN,
sync_mode TEXT NOT NULL DEFAULT 'import',
PRIMARY KEY (name)
);
"""
self.execute_commit_command(create_cmd, ())
# Creates table for role external mappings (many-to-many)
create_cmd = """
CREATE TABLE IF NOT EXISTS role_external_mappings (
role_name TEXT NOT NULL REFERENCES roles(name) ON DELETE CASCADE,
external_role TEXT NOT NULL,
PRIMARY KEY (role_name, external_role)
);
"""
self.execute_commit_command(create_cmd, ())
# Create index for external role lookups
create_cmd = """
CREATE INDEX IF NOT EXISTS idx_role_external_mappings_external_role
ON role_external_mappings (external_role);
"""
self.execute_commit_command(create_cmd, ())
# Creates table for dynamic configs.
create_cmd = '''
CREATE TABLE IF NOT EXISTS backends (
name TEXT,
description TEXT,
k8s_uid TEXT,
k8s_namespace TEXT,
dashboard_url TEXT,
grafana_url TEXT,
scheduler_settings TEXT,
tests TEXT[] DEFAULT ARRAY[]::text[],
last_heartbeat TIMESTAMP,
created_date TIMESTAMP,
router_address TEXT,
version TEXT DEFAULT '',
node_conditions JSONB DEFAULT '{
"rules": {"Ready": "True"},
"prefix": "osmo.nvidia.com/"
}'::jsonb,
PRIMARY KEY (name)
);
'''
self.execute_commit_command(create_cmd, ())
# Creates table for dynamic configs.
create_cmd = '''
CREATE TABLE IF NOT EXISTS resource_validations (
name TEXT,
resource_validations JSONB[],
PRIMARY KEY (name)
);
'''
self.execute_commit_command(create_cmd, ())
# Creates table for dynamic configs.
create_cmd = '''
CREATE TABLE IF NOT EXISTS pod_templates (
name TEXT,
pod_template JSONB,
PRIMARY KEY (name)
);
'''
self.execute_commit_command(create_cmd, ())
# Creates table for group templates.
create_cmd = '''
CREATE TABLE IF NOT EXISTS group_templates (
name TEXT,
group_template JSONB,
PRIMARY KEY (name)
);
'''
self.execute_commit_command(create_cmd, ())
# Creates table for dynamic configs.
create_cmd = '''
CREATE TABLE IF NOT EXISTS pools (
name TEXT,
description TEXT,
backend TEXT,
download_type TEXT,
default_platform TEXT,
platforms JSONB,
default_exec_timeout TEXT,
default_queue_timeout TEXT,
max_exec_timeout TEXT,
max_queue_timeout TEXT,
default_exit_actions JSONB,
common_default_variables JSONB,
common_resource_validations TEXT[],
parsed_resource_validations JSONB,
common_pod_template TEXT[],
parsed_pod_template JSONB,
common_group_templates TEXT[],
parsed_group_templates JSONB,
enable_maintenance BOOLEAN,
resources JSONB,
topology_keys JSONB,
PRIMARY KEY (name)
);
'''
self.execute_commit_command(create_cmd, ())
# Creates table for workflows.
create_cmd = '''
CREATE TABLE IF NOT EXISTS workflows (
workflow_name TEXT,
job_id INT,
workflow_id TEXT,
workflow_uuid TEXT,
submitted_by TEXT,
cancelled_by TEXT,
logs TEXT,
events TEXT,
submit_time TIMESTAMP,
start_time TIMESTAMP,
end_time TIMESTAMP,
exec_timeout INT,
queue_timeout INT,
backend TEXT,
pool TEXT,
version INT,
outputs TEXT,
status TEXT,
failure_message TEXT,
parent_name TEXT,
parent_job_id TEXT,
app_uuid TEXT,
app_version INT,
plugins JSONB,
priority TEXT DEFAULT 'NORMAL',
PRIMARY KEY (workflow_uuid),
CONSTRAINT workflows_name_job UNIQUE(workflow_name, job_id),
CONSTRAINT workflows_workflow_id UNIQUE(workflow_id)
);
'''
self.execute_commit_command(create_cmd, ())
# Creates indices for workflow table
index_cmds = [
'''
CREATE INDEX CONCURRENTLY IF NOT EXISTS workflow_list_index
ON workflows
USING btree (submitted_by, pool, status, submit_time ASC);
''',
'''
CREATE INDEX CONCURRENTLY IF NOT EXISTS workflow_list_index_pool_status
ON workflows
USING btree (pool, status, submit_time ASC);
'''
]
for cmd in index_cmds:
self.execute_autocommit_command(cmd, ())
# Creates table for workflow tags.
create_cmd = '''
CREATE TABLE IF NOT EXISTS workflow_tags (
workflow_uuid TEXT REFERENCES workflows (workflow_uuid),
tag TEXT,
PRIMARY KEY (workflow_uuid, tag)
);
'''
self.execute_commit_command(create_cmd, ())
# Creates table for groups.
create_cmd = '''
CREATE TABLE IF NOT EXISTS groups (
workflow_id TEXT,
name TEXT,
group_uuid TEXT,
spec JSONB,
status TEXT,
failure_message TEXT,
processing_start_time TIMESTAMP,
scheduling_start_time TIMESTAMP,
initializing_start_time TIMESTAMP,
start_time TIMESTAMP,
end_time TIMESTAMP,
remaining_upstream_groups HSTORE,
downstream_groups HSTORE,
outputs TEXT,
cleaned_up BOOLEAN,
scheduler_settings TEXT,
group_template_resource_types JSONB DEFAULT '[]'::jsonb,
PRIMARY KEY (group_uuid),
CONSTRAINT groups_id_name UNIQUE(workflow_id, name)
);
'''
self.execute_commit_command(create_cmd, ())
# Creates table for tasks.
create_cmd = '''
CREATE TABLE IF NOT EXISTS tasks (
workflow_id TEXT,
name TEXT,
retry_id INT,
task_db_key TEXT,
task_uuid TEXT,
group_name TEXT,
status TEXT,
failure_message TEXT,
exit_code INT,
scheduling_start_time TIMESTAMP,
initializing_start_time TIMESTAMP,
start_time TIMESTAMP,
end_time TIMESTAMP,
input_download_start_time TIMESTAMP,
input_download_end_time TIMESTAMP,
output_upload_start_time TIMESTAMP,
output_upload_end_time TIMESTAMP,
last_heartbeat TIMESTAMP,
node_name TEXT,
gpu_count FLOAT,
cpu_count FLOAT,
disk_count FLOAT,
memory_count FLOAT,
exit_actions JSONB,
lead BOOLEAN,
refresh_token BYTEA,
pod_name TEXT,
pod_ip TEXT,
PRIMARY KEY (task_db_key),
CONSTRAINT tasks_uuid_retry UNIQUE(task_uuid, retry_id),
CONSTRAINT tasks_id_name UNIQUE(workflow_id, retry_id, name)
);
'''
self.execute_commit_command(create_cmd, ())
# Creates indices for task table
index_cmds = [
'''
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS tasks_status_id_name
ON tasks
USING btree (status, workflow_id, retry_id, name);
'''
]
for cmd in index_cmds:
self.execute_autocommit_command(cmd, ())
# Creates table for tasks/groups.
create_cmd = '''
CREATE TABLE IF NOT EXISTS task_io (
workflow_id TEXT,
group_name TEXT,
task_name TEXT,
retry_id INT,
uuid TEXT,
url TEXT,
type TEXT,
storage_bucket TEXT,
start_time TIMESTAMP,
end_time TIMESTAMP,
size FLOAT,
operation_type TEXT,
download_type TEXT,
number_of_files INT,
PRIMARY KEY (uuid)
);
'''
self.execute_commit_command(create_cmd, ())
# Creates table for apps
create_cmd = '''
CREATE TABLE IF NOT EXISTS apps (
uuid TEXT,
name TEXT,
owner TEXT,
created_date TIMESTAMP,
description TEXT,
PRIMARY KEY (uuid),
CONSTRAINT apps_name UNIQUE(name)
);
'''
self.execute_commit_command(create_cmd, ())
# Creates table for apps versions
create_cmd = '''
CREATE TABLE IF NOT EXISTS app_versions (
uuid TEXT,
version INT,
created_by TEXT,
created_date TIMESTAMP,
status TEXT,
uri TEXT,
PRIMARY KEY (uuid, version),
FOREIGN KEY (uuid)
REFERENCES apps (uuid)
ON DELETE CASCADE
);
'''
self.execute_commit_command(create_cmd, ())
# Creates table for resources.
create_cmd = '''
CREATE TABLE IF NOT EXISTS resources (
name TEXT,
backend TEXT,
available BOOLEAN,
allocatable_fields HSTORE,
label_fields HSTORE,
taints JSONB[],
usage_fields HSTORE,
non_workflow_usage_fields HSTORE,
conditions TEXT[],
PRIMARY KEY (name, backend)
);
'''
self.execute_commit_command(create_cmd, ())
# Creates table for matching resource name to corresponding pool and platform