-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbig_query.py
124 lines (104 loc) · 3.93 KB
/
big_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from google.cloud import bigquery
import pandas_gbq
import pandas as pd
import numpy as np
from google.oauth2 import service_account
import os
#Setting BQ credential in environment
os.environ.setdefault("GCLOUD_PROJECT", 'your-project')
service_account_file_path=os.environ.get("YOUR_PROJECT")
def connect_to_bq():
client = bigquery.Client.from_service_account_json(service_account_file_path)
return client
def bq_delete(schema,table_id,condition=''):
client=connect_to_bq()
if condition=='':
print('no columns is removed')
else:
query = 'Delete from `pacc-raw-data.'+schema+'.'+table_id+'` where '+condition
query_job = client.query(query)
results = query_job.result()
print(results)
def bq_query(query_string):
client=connect_to_bq()
query_job = client.query(query=query_string)
results = query_job.result()
print(results)
def bq_insert(schema,table_id,dataframe,condition='',unique_key='',job_config=bigquery.LoadJobConfig()):
client=connect_to_bq()
table_id_full = 'pacc-raw-data.'+schema+'.'+table_id
job_config = job_config
job_config._properties['load']['schemaUpdateOptions'] = ['ALLOW_FIELD_ADDITION']
if dataframe.to_dict('records')==[]:
print('No Insert')
else:
print('continue')
#deduplication
if unique_key=='':
print('No dedup')
else:
dataframe=dataframe.drop_duplicates(subset=unique_key)
print('Dedup completed')
#load
job = client.load_table_from_dataframe(
dataframe, table_id_full, job_config=job_config
)
#remove column with id matches the inserted rows
bq_delete(schema,table_id,condition=condition)
job.result()
table=client.get_table(table_id_full)
print(str(
"{} rows and {} columns to {}".format(
table.num_rows, len(table.schema), table_id
)
))
def bq_pandas(query_string):
credentials = service_account.Credentials.from_service_account_file(service_account_file_path)
querry_bq=pandas_gbq.read_gbq(query_string, project_id="pacc-raw-data", credentials=credentials)
return querry_bq
#BQ insert
def bq_insert_streaming(rows_to_insert,table_id,object):
client=connect_to_bq()
if rows_to_insert == []:
print('stop')
else:
errors = client.insert_rows_json(table_id, rows_to_insert) # Make an API request
if errors == []:
print("New rows have been added.")
else:
print("Encountered errors while inserting rows: {}".format(errors))
f = open('script_ipos_crm/errors.txt', 'a')
f.write(f"{object}\n")
f.write(f"{errors}\n")
f.close()
print("data written")
def bq_latest_date(date_schema,schema,table_id):
#finding latest date from BQ table
df=bq_pandas(query_string='select max(cast('+date_schema+' as date)) as '+date_schema+' from `pacc-raw-data.'+schema+'.'+table_id+'`')
print(df)
if df[date_schema].astype(str).to_list()[0]=='NaT':
recent_loaded_date='1970-01-01'
else:
recent_loaded_date=df[date_schema].astype(str).to_list()[0]
return recent_loaded_date
def append_tables(schema_to_append,schema_appended,table_id):
query_string='''
SELECT column_name FROM pacc-raw-data.'''+schema_to_append+'''.INFORMATION_SCHEMA.COLUMNS
WHERE table_name = '''+"'"+table_id+"'"
#Get columns name
df=bq_pandas(query_string=query_string)
column_lists=df['column_name'].astype(str).to_list()
columns=",\n".join(column_lists)
#Insert Query
query_string='''
insert into `pacc-raw-data.'''+schema_appended+'''.'''+table_id+'''` (
'''+columns+'''
)
select
'''+columns+'''
from
`pacc-raw-data.'''+schema_to_append+'''.'''+table_id+'''`
'''
print(query_string)
#BQ insert
bq_query(query_string)