Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 105 additions & 53 deletions cloud_governance/common/google_drive/google_drive_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class GoogleDriveOperations:
def __init__(self):
self.__environment_variables_dict = environment_variables.environment_variables_dict
self.__service = None
self.__sheet_id_cache = {}
if self.__environment_variables_dict.get('GOOGLE_APPLICATION_CREDENTIALS'):
self.__creds, _ = google.auth.default()
self.__service = build('sheets', 'v4', credentials=self.__creds, num_retries=self.RETRIES)
Expand All @@ -46,6 +47,9 @@ def create_work_sheet(self, gsheet_id: str, sheet_name: str):
if self.__service:
self.__service.spreadsheets().batchUpdate(spreadsheetId=gsheet_id,
body=create_worksheet_meta_data).execute()
cache_key = f"{gsheet_id}:{sheet_name}"
if cache_key in self.__sheet_id_cache:
del self.__sheet_id_cache[cache_key]
logger.info(f'{sheet_name} worksheet created')
else:
logger.info(f'{sheet_name} Worksheet Already present')
Expand Down Expand Up @@ -97,22 +101,40 @@ def append_values(self, spreadsheet_id, sheet_name: str, values: list, value_inp
except HttpError as error:
logger.info(f'An error occurred: {error}')

def _evict_sheet_cache(self, sheet_name: str, spreadsheet_id: str):
"""
Evict a sheet ID from cache (used when sheet is not found or has been modified)
@param sheet_name:
@param spreadsheet_id:
"""
cache_key = f"{spreadsheet_id}:{sheet_name}"
if cache_key in self.__sheet_id_cache:
del self.__sheet_id_cache[cache_key]
logger.info(f'Evicted stale cache entry for sheet: {sheet_name}')

@logger_time_stamp
def find_sheet_id_by_name(self, sheet_name: str, spreadsheet_id: str):
def find_sheet_id_by_name(self, sheet_name: str, spreadsheet_id: str, use_cache: bool = True):
"""
This method find the sheet id in the spreadsheet
@param sheet_name:
@param spreadsheet_id:
@param use_cache: If False, bypass cache and fetch fresh data
@return:
"""
cache_key = f"{spreadsheet_id}:{sheet_name}"
if use_cache and cache_key in self.__sheet_id_cache:
return self.__sheet_id_cache[cache_key]

Comment thread
coderabbitai[bot] marked this conversation as resolved.
if self.__service:
sheets_with_properties = self.__service.spreadsheets().get(spreadsheetId=spreadsheet_id,
fields='sheets.properties').execute().get(
'sheets')
for sheet in sheets_with_properties:
if 'title' in sheet['properties'].keys():
if sheet['properties']['title'] == sheet_name:
return sheet['properties']['sheetId']
sheet_id = sheet['properties']['sheetId']
self.__sheet_id_cache[cache_key] = sheet_id
return sheet_id
return ''

@logger_time_stamp
Expand All @@ -125,25 +147,35 @@ def delete_rows(self, spreadsheet_id: str, sheet_name: str, row_number: int):
@return:
"""
if self.__service:
try:
spreadsheet_data = [
{
"deleteDimension": {
"range": {
"sheetId": self.find_sheet_id_by_name(sheet_name=sheet_name,
spreadsheet_id=spreadsheet_id),
"dimension": "ROWS",
"startIndex": row_number,
"endIndex": row_number + 1
for attempt in range(2):
try:
use_cache = attempt == 0
sheet_id = self.find_sheet_id_by_name(sheet_name=sheet_name,
spreadsheet_id=spreadsheet_id,
use_cache=use_cache)
spreadsheet_data = [
{
"deleteDimension": {
"range": {
"sheetId": sheet_id,
"dimension": "ROWS",
"startIndex": row_number,
"endIndex": row_number + 1
}
}
}
}
]
update_data = {"requests": spreadsheet_data}
updating = self.__service.spreadsheets().batchUpdate(spreadsheetId=spreadsheet_id, body=update_data)
updating.execute()
except HttpError as error:
logger.into(f'An error occurred: {error}')
]
update_data = {"requests": spreadsheet_data}
updating = self.__service.spreadsheets().batchUpdate(spreadsheetId=spreadsheet_id, body=update_data)
updating.execute()
break
except HttpError as error:
if attempt == 0 and ('Unable to parse range' in str(error) or 'not found' in str(error).lower()):
self._evict_sheet_cache(sheet_name=sheet_name, spreadsheet_id=spreadsheet_id)
logger.info(f'Retrying delete_rows after cache eviction for sheet: {sheet_name}')
continue
logger.error(f'Failed to delete rows in sheet {sheet_name}: {error}')
raise

@logger_time_stamp
def paste_csv_to_gsheet(self, csv_path, spreadsheet_id: str, sheet_name: str):
Expand All @@ -160,25 +192,35 @@ def paste_csv_to_gsheet(self, csv_path, spreadsheet_id: str, sheet_name: str):
with open(csv_path, 'r') as csv_file:
csv_contents = csv_file.read()
if csv_contents:
sheet_id = self.find_sheet_id_by_name(sheet_name=sheet_name, spreadsheet_id=spreadsheet_id)
body = {
'requests': [{
'pasteData': {
"coordinate": {
"sheetId": sheet_id,
"rowIndex": "0",
"columnIndex": "0",
},
"data": csv_contents,
"type": 'PASTE_NORMAL',
"delimiter": ',',
for attempt in range(2):
try:
use_cache = attempt == 0
sheet_id = self.find_sheet_id_by_name(sheet_name=sheet_name, spreadsheet_id=spreadsheet_id,
use_cache=use_cache)
body = {
'requests': [{
'pasteData': {
"coordinate": {
"sheetId": sheet_id,
"rowIndex": 0,
"columnIndex": 0,
},
"data": csv_contents,
"type": 'PASTE_NORMAL',
"delimiter": ',',
}
}]
}
}]
}
request = self.__service.spreadsheets().batchUpdate(spreadsheetId=spreadsheet_id, body=body)
response = request.execute()
logger.info(f'Pasted data into the {sheet_name}')
return response
request = self.__service.spreadsheets().batchUpdate(spreadsheetId=spreadsheet_id, body=body)
response = request.execute()
logger.info(f'Pasted data into the {sheet_name}')
return response
except HttpError as error:
if attempt == 0 and ('Unable to parse range' in str(error) or 'not found' in str(error).lower()):
self._evict_sheet_cache(sheet_name=sheet_name, spreadsheet_id=spreadsheet_id)
logger.info(f'Retrying paste_csv_to_gsheet after cache eviction for sheet: {sheet_name}')
continue
raise error

@logger_time_stamp
def update_row_in_gsheet(self, data: list, gsheet_id: str, row: int, sheet_name: str):
Expand All @@ -187,19 +229,29 @@ def update_row_in_gsheet(self, data: list, gsheet_id: str, row: int, sheet_name:
@return:
"""
if self.__service:
sheet_id = self.find_sheet_id_by_name(sheet_name=sheet_name, spreadsheet_id=gsheet_id)
requests_body = {'requests': [{
'updateCells': {
'rows': [{"values": data}],
'fields': '*',
'start': {
"sheetId": sheet_id,
"rowIndex": row,
"columnIndex": '0'
}
}
}]}
request = self.__service.spreadsheets().batchUpdate(spreadsheetId=gsheet_id, body=requests_body)
response = request.execute()
logger.info(f'Updated the row in the worksheet {sheet_name}')
return response
for attempt in range(2):
try:
use_cache = attempt == 0
sheet_id = self.find_sheet_id_by_name(sheet_name=sheet_name, spreadsheet_id=gsheet_id,
use_cache=use_cache)
requests_body = {'requests': [{
'updateCells': {
'rows': [{"values": data}],
'fields': '*',
'start': {
"sheetId": sheet_id,
"rowIndex": row,
"columnIndex": 0
}
}
}]}
request = self.__service.spreadsheets().batchUpdate(spreadsheetId=gsheet_id, body=requests_body)
response = request.execute()
logger.info(f'Updated the row in the worksheet {sheet_name}')
return response
except HttpError as error:
if attempt == 0 and ('Unable to parse range' in str(error) or 'not found' in str(error).lower()):
self._evict_sheet_cache(sheet_name=sheet_name, spreadsheet_id=gsheet_id)
logger.info(f'Retrying update_row_in_gsheet after cache eviction for sheet: {sheet_name}')
continue
raise error