1
1
"""Custom client handling, including LookerStream base class."""
2
2
3
3
from __future__ import annotations
4
- from enum import Enum
4
+
5
5
import json
6
6
import os
7
7
import typing as t
8
8
from datetime import datetime
9
+ from enum import Enum
9
10
from pathlib import Path
10
11
11
12
import looker_sdk
23
24
class LookerStream (Stream ):
24
25
"""Stream class for Looker streams."""
25
26
26
- def __init__ (self , tap : Tap ):
27
+ def __init__ (self , tap : Tap ) -> None :
28
+ """Initialize the stream object."""
27
29
super ().__init__ (tap )
28
30
os .environ ["LOOKERSDK_BASE_URL" ] = self .config .get ("base_url" )
29
31
os .environ ["LOOKERSDK_CLIENT_ID" ] = self .config .get ("client_id" )
@@ -40,6 +42,7 @@ def schema_filepath(self) -> Path | None:
40
42
return SCHEMAS_DIR / f"{ self .name } .json"
41
43
42
44
def convert_to_dict (self , obj : object ) -> dict :
45
+ """Convert object to dictionary."""
43
46
if isinstance (obj , Enum ):
44
47
return obj .value
45
48
if isinstance (obj , list ):
@@ -52,12 +55,15 @@ def convert_to_dict(self, obj: object) -> dict:
52
55
53
56
54
57
class LookerSystemActivityStream (LookerStream ):
55
- def __init__ (self , tap : Tap ):
58
+ """Stream class for Looker System Activity streams."""
59
+ def __init__ (self , tap : Tap ) -> None :
60
+ """Initialize the stream object."""
56
61
super ().__init__ (tap )
57
62
self .replication_key = f"{ self .name } _created_time"
58
63
self .primary_keys = [f"{ self .name } _id" ]
59
64
60
65
def replace_prefix (self , field : str ) -> str :
66
+ """Replace the prefix of a field name."""
61
67
return field .replace (f"{ self .name } _" , f"{ self .name } ." )
62
68
63
69
def get_records (
0 commit comments