|
| 1 | +import logging |
| 2 | +import tempfile |
| 3 | +import uuid |
| 4 | +from pathlib import Path |
| 5 | +from typing import Optional, Union |
| 6 | + |
| 7 | +from google.oauth2.credentials import Credentials |
| 8 | +from googleapiclient.discovery import build |
| 9 | +from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload |
| 10 | + |
| 11 | +from parsons.google.utilities import ( |
| 12 | + load_google_application_credentials, |
| 13 | + setup_google_application_credentials, |
| 14 | +) |
| 15 | + |
| 16 | +logger = logging.getLogger(__name__) |
| 17 | + |
| 18 | + |
| 19 | +class GoogleDrive: |
| 20 | + """ |
| 21 | + A connector for Google Drive |
| 22 | +
|
| 23 | + `Args:` |
| 24 | + app_creds: dict | str | Credentials |
| 25 | + Can be a dictionary of Google Drive API credentials, parsed from JSON provided |
| 26 | + by the Google Developer Console, or a path string pointing to credentials |
| 27 | + saved on disk, or a google.oauth2.credentials.Credentials object. Required |
| 28 | + if env variable ``GOOGLE_DRIVE_CREDENTIALS`` is not populated. |
| 29 | + """ |
| 30 | + |
| 31 | + def __init__( |
| 32 | + self, |
| 33 | + app_creds: Optional[Union[str, dict, Credentials]] = None, |
| 34 | + ): |
| 35 | + scopes = [ |
| 36 | + "https://www.googleapis.com/auth/drive", |
| 37 | + ] |
| 38 | + |
| 39 | + if isinstance(app_creds, Credentials): |
| 40 | + credentials = app_creds |
| 41 | + else: |
| 42 | + env_credentials_path = str(uuid.uuid4()) |
| 43 | + setup_google_application_credentials( |
| 44 | + app_creds, target_env_var_name=env_credentials_path |
| 45 | + ) |
| 46 | + credentials = load_google_application_credentials(env_credentials_path, scopes=scopes) |
| 47 | + |
| 48 | + self.client = build( |
| 49 | + "drive", |
| 50 | + "v3", |
| 51 | + credentials=credentials, |
| 52 | + cache_discovery=False, |
| 53 | + ) |
| 54 | + |
| 55 | + def create_folder(self, name: str, parents: Union[list[str], str, None] = None) -> str: |
| 56 | + if isinstance(parents, str): |
| 57 | + parents = [parents] |
| 58 | + elif parents is None: |
| 59 | + parents = [] |
| 60 | + response = ( |
| 61 | + self.client.files() |
| 62 | + .create( |
| 63 | + body={ |
| 64 | + "name": name, |
| 65 | + "mimeType": "application/vnd.google-apps.folder", |
| 66 | + "parents": parents, |
| 67 | + }, |
| 68 | + fields="id", |
| 69 | + ) |
| 70 | + .execute() |
| 71 | + ) |
| 72 | + return response.get("id") |
| 73 | + |
| 74 | + def find_subfolder(self, subfolder_name: str, parent_folder_id: str) -> Optional[str]: |
| 75 | + response = ( |
| 76 | + self.client.files() |
| 77 | + .list( |
| 78 | + q=f"'{parent_folder_id}' in parents and mimeType='application/vnd.google-apps.folder'", |
| 79 | + fields="files(id, name)", |
| 80 | + ) |
| 81 | + .execute() |
| 82 | + ) |
| 83 | + match = [i for i in response.get("files") if i.get("name") == subfolder_name] |
| 84 | + result = match[0].get("id") if match else None |
| 85 | + return result |
| 86 | + |
| 87 | + def find_file_in_folder( |
| 88 | + self, file_name: str, folder_id: str, fields: Optional[list[str]] = None |
| 89 | + ) -> list[dict[str, str]]: |
| 90 | + if not fields: |
| 91 | + fields = ["id", "name"] |
| 92 | + page_token = None |
| 93 | + results = [] |
| 94 | + while True: |
| 95 | + response = ( |
| 96 | + self.client.files() |
| 97 | + .list( |
| 98 | + q=f"'{folder_id}' in parents and name = '{file_name}'", |
| 99 | + spaces="drive", |
| 100 | + fields="nextPageToken, files({})".format(",".join(fields)), |
| 101 | + pageToken=page_token, |
| 102 | + ) |
| 103 | + .execute() |
| 104 | + ) |
| 105 | + results.extend(response.get("files", [])) |
| 106 | + page_token = response.get("nextPageToken") |
| 107 | + if page_token is None: |
| 108 | + break |
| 109 | + return results |
| 110 | + |
| 111 | + def list_files_in_folder( |
| 112 | + self, folder_id: str, fields: Optional[list[str]] = None |
| 113 | + ) -> list[dict[str, str]]: |
| 114 | + if not fields: |
| 115 | + fields = ["id", "name"] |
| 116 | + page_token = None |
| 117 | + results = [] |
| 118 | + while True: |
| 119 | + response = ( |
| 120 | + self.client.files() |
| 121 | + .list( |
| 122 | + q=f"'{folder_id}' in parents", |
| 123 | + spaces="drive", |
| 124 | + fields="nextPageToken, files({})".format(",".join(fields)), |
| 125 | + pageToken=page_token, |
| 126 | + supportsTeamDrives=True, |
| 127 | + includeItemsFromAllDrives=True, |
| 128 | + ) |
| 129 | + .execute() |
| 130 | + ) |
| 131 | + results.extend(response.get("files", [])) |
| 132 | + page_token = response.get("nextPageToken") |
| 133 | + if page_token is None: |
| 134 | + break |
| 135 | + return results |
| 136 | + |
| 137 | + def empty_folder(self, folder_id: str) -> None: |
| 138 | + folder_contents = self.list_files_in_folder(folder_id) |
| 139 | + for drive_file in folder_contents: |
| 140 | + self.client.files().delete( |
| 141 | + fileId=drive_file.get("id"), |
| 142 | + ).execute() |
| 143 | + |
| 144 | + def download_file(self, file_id: str) -> str: |
| 145 | + """Download file from Drive to disk. Returns local filepath.""" |
| 146 | + filepath = tempfile.mkstemp()[1] |
| 147 | + done = False |
| 148 | + |
| 149 | + with Path(filepath).open(mode="wb") as file: |
| 150 | + downloader = MediaIoBaseDownload(file, self.client.files().get_media(fileId=file_id)) |
| 151 | + while not done: |
| 152 | + status, done = downloader.next_chunk() |
| 153 | + return filepath |
| 154 | + |
| 155 | + def upload_file(self, file_path: str, parent_folder_id: str) -> str: |
| 156 | + file_metadata = { |
| 157 | + "name": Path(file_path).name, |
| 158 | + "parents": [parent_folder_id], |
| 159 | + } |
| 160 | + media = MediaFileUpload(file_path) |
| 161 | + response = ( |
| 162 | + self.client.files().create(body=file_metadata, media_body=media, fields="id").execute() |
| 163 | + ) |
| 164 | + return response.get("id") |
| 165 | + |
| 166 | + def replace_file(self, file_path: str, file_id: str) -> str: |
| 167 | + """Replace file in drive.""" |
| 168 | + media = MediaFileUpload(file_path) |
| 169 | + resp = self.client.files().update(fileId=file_id, media_body=media, fields="id").execute() |
| 170 | + return resp.get("id") |
| 171 | + |
| 172 | + def upsert_file(self, file_path: str, parent_folder_id: str) -> str: |
| 173 | + """Create or replace file in drive folder, based on file name.""" |
| 174 | + file_name = Path(file_path).name |
| 175 | + match_response = ( |
| 176 | + self.client.files() |
| 177 | + .list( |
| 178 | + q=f"name='{file_name}' and '{parent_folder_id}' in parents", |
| 179 | + spaces="drive", |
| 180 | + fields="files(id, name)", |
| 181 | + ) |
| 182 | + .execute() |
| 183 | + .get("files", []) |
| 184 | + ) |
| 185 | + if match_response: |
| 186 | + file_id = match_response[0].get("id") |
| 187 | + result = self.replace_file(file_path, file_id) |
| 188 | + else: |
| 189 | + result = self.upload_file(file_path, parent_folder_id) |
| 190 | + return result |
| 191 | + |
| 192 | + def get_permissions(self, file_id: str) -> dict: |
| 193 | + """ |
| 194 | + `Args:` |
| 195 | + file_id: str |
| 196 | + this is the ID of the object you are hoping to share |
| 197 | + `Returns:` |
| 198 | + permission dict |
| 199 | + """ |
| 200 | + |
| 201 | + p = self.client.permissions().list(fileId=file_id).execute() |
| 202 | + |
| 203 | + return p |
| 204 | + |
| 205 | + def _share_object(self, file_id: str, permission_dict: dict) -> dict: |
| 206 | + # Send the request to share the file |
| 207 | + p = self.client.permissions().create(fileId=file_id, body=permission_dict).execute() |
| 208 | + |
| 209 | + return p |
| 210 | + |
| 211 | + def share_object( |
| 212 | + self, |
| 213 | + file_id: str, |
| 214 | + email_addresses: Optional[list[str]] = None, |
| 215 | + role: str = "reader", |
| 216 | + type: str = "user", |
| 217 | + ) -> list[dict]: |
| 218 | + """ |
| 219 | + `Args:` |
| 220 | + file_id: str |
| 221 | + this is the ID of the object you are hoping to share |
| 222 | + email_addresses: list |
| 223 | + this is the list of the email addresses you want to share; |
| 224 | + set to a list of domains like `['domain']` if you choose `type='domain'`; |
| 225 | + set to `None` if you choose `type='anyone'` |
| 226 | + role: str |
| 227 | + Options are -- owner, organizer, fileOrganizer, writer, commenter, reader |
| 228 | + https://developers.google.com/drive/api/guides/ref-roles |
| 229 | + type: str |
| 230 | + Options are -- user, group, domain, anyone |
| 231 | + `Returns:` |
| 232 | + List of permission objects |
| 233 | + """ |
| 234 | + if role not in [ |
| 235 | + "owner", |
| 236 | + "organizer", |
| 237 | + "fileOrganizer", |
| 238 | + "writer", |
| 239 | + "commenter", |
| 240 | + "reader", |
| 241 | + ]: |
| 242 | + raise Exception( |
| 243 | + f"{role} not from the allowed list of: \ |
| 244 | + owner, organizer, fileOrganizer, writer, commenter, reader" |
| 245 | + ) |
| 246 | + |
| 247 | + if type not in ["user", "group", "domain", "anyone"]: |
| 248 | + raise Exception( |
| 249 | + f"{type} not from the allowed list of: \ |
| 250 | + user, group, domain, anyone" |
| 251 | + ) |
| 252 | + |
| 253 | + if type == "domain": |
| 254 | + permissions = [ |
| 255 | + {"type": type, "role": role, "domain": email} for email in email_addresses |
| 256 | + ] |
| 257 | + else: |
| 258 | + permissions = [ |
| 259 | + {"type": type, "role": role, "emailAddress": email} for email in email_addresses |
| 260 | + ] |
| 261 | + |
| 262 | + new_permissions = [] |
| 263 | + for permission in permissions: |
| 264 | + p = self._share_object(file_id, permission) |
| 265 | + new_permissions.append(p) |
| 266 | + |
| 267 | + return new_permissions |
| 268 | + |
| 269 | + def transfer_ownership(self, file_id: str, new_owner_email: str) -> None: |
| 270 | + """ |
| 271 | + `Args:` |
| 272 | + file_id: str |
| 273 | + this is the ID of the object you are hoping to share |
| 274 | + new_owner_email: str |
| 275 | + the email address of the intended new owner |
| 276 | + `Returns:` |
| 277 | + None |
| 278 | + """ |
| 279 | + permissions = self.client.permissions().list(fileId=file_id).execute() |
| 280 | + |
| 281 | + # Find the current owner |
| 282 | + current_owner_permission = next( |
| 283 | + (p for p in permissions.get("permissions", []) if "owner" in p), None |
| 284 | + ) |
| 285 | + |
| 286 | + if current_owner_permission: |
| 287 | + # Update the permission to transfer ownership |
| 288 | + new_owner_permission = { |
| 289 | + "type": "user", |
| 290 | + "role": "owner", |
| 291 | + "emailAddress": new_owner_email, |
| 292 | + } |
| 293 | + self.client.permissions().update( |
| 294 | + fileId=file_id, |
| 295 | + permissionId=current_owner_permission["id"], |
| 296 | + body=new_owner_permission, |
| 297 | + ).execute() |
| 298 | + logger.info(f"Ownership transferred successfully to {new_owner_email}.") |
| 299 | + else: |
| 300 | + logger.info("File does not have a current owner.") |
0 commit comments