forked from DalgoT4D/DDP_backend
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathairbyte.py
More file actions
115 lines (98 loc) · 4.57 KB
/
Copy pathairbyte.py
File metadata and controls
115 lines (98 loc) · 4.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""track sync times, number of records, volume of data by client and connection"""
from django.db import models
from ddpui.models.org import Org
class SyncStats(models.Model):
"""single table to track connection sync stats"""
org = models.ForeignKey(Org, on_delete=models.CASCADE)
connection_id = models.CharField(max_length=36)
job_id = models.IntegerField(null=True)
attempt = models.IntegerField(default=0)
status = models.TextField()
sync_type = models.CharField(
max_length=20,
choices=[("manual", "manual"), ("orchestrate", "orchestrate")],
)
sync_time = models.DateTimeField()
sync_duration_s = models.BigIntegerField(default=0)
sync_records = models.BigIntegerField(default=0)
sync_data_volume_b = models.BigIntegerField(default=0)
def __str__(self) -> str:
return f"SyncStats[{self.org.name}|{self.connection_id}]"
def to_json(self) -> dict:
return {
"org": self.org.slug,
"connection_id": self.connection_id,
"attempt": self.attempt,
"status": self.status,
"sync_time": self.sync_time,
"sync_duration_s": self.sync_duration_s,
"sync_records": self.sync_records,
"sync_data_volume_b": self.sync_data_volume_b,
}
class AirbyteJob(models.Model):
"""model to track airbyte jobs"""
job_id = models.BigIntegerField(primary_key=True, null=False)
job_type = models.CharField(
max_length=20
) # check_connection_source┃check_connection_destination┃discover_schema┃get_spec┃sync┃reset_connection┃refresh┃clear
config_id = models.CharField(max_length=100) # connection_id, source_id, destination_id
status = models.CharField(
max_length=20
) # pending┃running┃incomplete┃failed┃succeeded┃cancelled
reset_config = models.JSONField(
null=True
) # contains information about how a reset was configured. only populated if the job was a reset.
refresh_config = models.JSONField(
null=True
) # contains information about how a refresh was configured. only populated if the job was a refresh.
stream_stats = models.JSONField(
null=True
) # contains information about the streams processed by the job
records_emitted = models.BigIntegerField(default=0)
bytes_emitted = models.BigIntegerField(default=0)
records_committed = models.BigIntegerField(default=0)
bytes_committed = models.BigIntegerField(default=0)
attempts = models.JSONField(
null=True
) # contains information about the attempts made for this job. only populated if the job has attempts.
started_at = models.DateTimeField(null=True) # because the api spec says this will be optional
ended_at = models.DateTimeField(
null=True
) # when the job ended; can be null if we pull or sync an ongonig job
created_at = models.DateTimeField() # when the job was created in airbyte
updated_at = models.DateTimeField(auto_now=True) # when the django record was last updated
def __str__(self) -> str:
return f"AirbyteJob[Job ID: {self.job_id}|Job Type: {self.job_type}|Status: {self.status}]"
@property
def duration(self):
"""Returns duration in seconds between created_at and ended_at, or None if either is missing."""
if self.created_at and self.ended_at:
delta = self.ended_at - self.created_at
return int(delta.total_seconds())
return None
@property
def attempt_duration(self):
"""Returns duration in seconds between started_at and ended_at, or None if either is missing."""
if self.started_at and self.ended_at:
delta = self.ended_at - self.started_at
return int(delta.total_seconds())
return None
@property
def last_attempt_no(self):
"""Returns the last attempt number if attempts exist, otherwise returns None."""
if self.attempts:
attempts_list = list(map(lambda x: x["attempt"], self.attempts))
return max([attempt["id"] for attempt in attempts_list])
return None
@property
def latest_failed_attempt_id(self):
"""Returns the ID of the latest failed attempt if attempts exist, otherwise returns None."""
if self.attempts:
failed_attempts = [
attempt["attempt"]
for attempt in self.attempts
if attempt["attempt"]["status"] == "failed"
]
if failed_attempts:
return max(failed_attempts, key=lambda x: x["id"])["id"]
return None