-
Notifications
You must be signed in to change notification settings - Fork 134
Google Drive connector #1346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Google Drive connector #1346
Changes from 14 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
b6b0db5
Google Drive connector
austinweisgrau 947d55c
list files in folder method
austinweisgrau 20637d5
find file in folder method
austinweisgrau 41a6450
download file method
austinweisgrau 75eb33e
support team drives by default
austinweisgrau 7d5a4b0
Fix types for 3.9 compatibility
austinweisgrau 1ba9f52
ruff format
austinweisgrau 752aa91
Google Drive and Slides connectors
96806b6
Google Drive connector documentation
austinweisgrau b6aa880
Merge branch 'google-slides' into drive
austinweisgrau c923471
formatting fixes
austinweisgrau 7d1aacc
add type hints
austinweisgrau 893f12b
Merge branch 'main' into drive
austinweisgrau 150e849
Merge branch 'main' into drive
shaunagm 9ffab75
ruff/security fixes
austinweisgrau 18b100c
one more fix
austinweisgrau 3d06367
remove unused import
austinweisgrau File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,303 @@ | ||
| import logging | ||
| import os | ||
| import tempfile | ||
| import uuid | ||
| from typing import Optional, Union | ||
|
|
||
| from google.oauth2.credentials import Credentials | ||
| from googleapiclient.discovery import build | ||
| from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload | ||
|
|
||
| from parsons.google.utilities import ( | ||
| load_google_application_credentials, | ||
| setup_google_application_credentials, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class GoogleDrive: | ||
| """ | ||
| A connector for Google Drive | ||
|
|
||
| `Args:` | ||
| app_creds: dict | str | Credentials | ||
| Can be a dictionary of Google Drive API credentials, parsed from JSON provided | ||
| by the Google Developer Console, or a path string pointing to credentials | ||
| saved on disk, or a google.oauth2.credentials.Credentials object. Required | ||
| if env variable ``GOOGLE_DRIVE_CREDENTIALS`` is not populated. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| app_creds: Optional[Union[str, dict, Credentials]] = None, | ||
| ): | ||
| scopes = [ | ||
| "https://www.googleapis.com/auth/drive", | ||
| ] | ||
|
|
||
| if isinstance(app_creds, Credentials): | ||
| credentials = app_creds | ||
| else: | ||
| env_credentials_path = str(uuid.uuid4()) | ||
| setup_google_application_credentials( | ||
| app_creds, target_env_var_name=env_credentials_path | ||
| ) | ||
| credentials = load_google_application_credentials(env_credentials_path, scopes=scopes) | ||
|
|
||
| self.client = build( | ||
| "drive", | ||
| "v3", | ||
| credentials=credentials, | ||
| cache_discovery=False, | ||
| ) | ||
|
|
||
| def create_folder(self, name: str, parents: Union[list[str], str, None] = None) -> str: | ||
| if isinstance(parents, str): | ||
| parents = [parents] | ||
| elif parents is None: | ||
| parents = [] | ||
| response = ( | ||
| self.client.files() | ||
| .create( | ||
| body={ | ||
| "name": name, | ||
| "mimeType": "application/vnd.google-apps.folder", | ||
| "parents": parents, | ||
| }, | ||
| fields="id", | ||
| ) | ||
| .execute() | ||
| ) | ||
| return response.get("id") | ||
|
|
||
| def find_subfolder(self, subfolder_name: str, parent_folder_id: str) -> Optional[str]: | ||
| response = ( | ||
| self.client.files() | ||
| .list( | ||
| q=f"'{parent_folder_id}' in parents and mimeType='application/vnd.google-apps.folder'", | ||
| fields="files(id, name)", | ||
| ) | ||
| .execute() | ||
| ) | ||
| match = [i for i in response.get("files") if i.get("name") == subfolder_name] | ||
| if match: | ||
| result = match[0].get("id") | ||
| else: | ||
| result = None | ||
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show fixed
Hide fixed
|
||
| return result | ||
|
|
||
| def find_file_in_folder( | ||
| self, file_name: str, folder_id: str, fields: Optional[list[str]] = None | ||
| ) -> list[dict[str, str]]: | ||
| if not fields: | ||
| fields = ["id", "name"] | ||
| page_token = None | ||
| results = [] | ||
| while True: | ||
| response = ( | ||
| self.client.files() | ||
| .list( | ||
| q=f"'{folder_id}' in parents and name = '{file_name}'", | ||
| spaces="drive", | ||
| fields="nextPageToken, files({})".format(",".join(fields)), | ||
| pageToken=page_token, | ||
| ) | ||
| .execute() | ||
| ) | ||
| results.extend(response.get("files", [])) | ||
| page_token = response.get("nextPageToken") | ||
| if page_token is None: | ||
| break | ||
| return results | ||
|
|
||
| def list_files_in_folder( | ||
| self, folder_id: str, fields: Optional[list[str]] = None | ||
| ) -> list[dict[str, str]]: | ||
| if not fields: | ||
| fields = ["id", "name"] | ||
| page_token = None | ||
| results = [] | ||
| while True: | ||
| response = ( | ||
| self.client.files() | ||
| .list( | ||
| q=f"'{folder_id}' in parents", | ||
| spaces="drive", | ||
| fields="nextPageToken, files({})".format(",".join(fields)), | ||
| pageToken=page_token, | ||
| supportsTeamDrives=True, | ||
| includeItemsFromAllDrives=True, | ||
| ) | ||
| .execute() | ||
| ) | ||
| results.extend(response.get("files", [])) | ||
| page_token = response.get("nextPageToken") | ||
| if page_token is None: | ||
| break | ||
| return results | ||
|
|
||
| def empty_folder(self, folder_id: str) -> None: | ||
| folder_contents = self.list_files_in_folder(folder_id) | ||
| for drive_file in folder_contents: | ||
| self.client.files().delete( | ||
| fileId=drive_file.get("id"), | ||
| ).execute() | ||
|
|
||
| def download_file(self, file_id: str) -> str: | ||
| """Download file from Drive to disk. Returns local filepath.""" | ||
| filepath = tempfile.mkstemp()[1] | ||
| done = False | ||
|
|
||
| with open(filepath, "wb") as file: | ||
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show fixed
Hide fixed
|
||
| downloader = MediaIoBaseDownload(file, self.client.files().get_media(fileId=file_id)) | ||
| while not done: | ||
| status, done = downloader.next_chunk() | ||
| return filepath | ||
|
|
||
| def upload_file(self, file_path: str, parent_folder_id: str) -> str: | ||
| file_metadata = { | ||
| "name": os.path.basename(file_path), | ||
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show fixed
Hide fixed
|
||
| "parents": [parent_folder_id], | ||
| } | ||
| media = MediaFileUpload(file_path) | ||
| response = ( | ||
| self.client.files().create(body=file_metadata, media_body=media, fields="id").execute() | ||
| ) | ||
| return response.get("id") | ||
|
|
||
| def replace_file(self, file_path: str, file_id: str) -> str: | ||
| """Replace file in drive.""" | ||
| media = MediaFileUpload(file_path) | ||
| resp = self.client.files().update(fileId=file_id, media_body=media, fields="id").execute() | ||
| return resp.get("id") | ||
|
|
||
| def upsert_file(self, file_path: str, parent_folder_id: str) -> str: | ||
| """Create or replace file in drive folder, based on file name.""" | ||
| file_name = os.path.basename(file_path) | ||
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show fixed
Hide fixed
|
||
| match_response = ( | ||
| self.client.files() | ||
| .list( | ||
| q=f"name='{file_name}' and '{parent_folder_id}' in parents", | ||
| spaces="drive", | ||
| fields="files(id, name)", | ||
| ) | ||
| .execute() | ||
| .get("files", []) | ||
| ) | ||
| if match_response: | ||
| file_id = match_response[0].get("id") | ||
| result = self.replace_file(file_path, file_id) | ||
| else: | ||
| result = self.upload_file(file_path, parent_folder_id) | ||
| return result | ||
|
|
||
| def get_permissions(self, file_id: str) -> dict: | ||
| """ | ||
| `Args:` | ||
| file_id: str | ||
| this is the ID of the object you are hoping to share | ||
| `Returns:` | ||
| permission dict | ||
| """ | ||
|
|
||
| p = self.client.permissions().list(fileId=file_id).execute() | ||
|
|
||
| return p | ||
|
|
||
| def _share_object(self, file_id: str, permission_dict: dict) -> dict: | ||
| # Send the request to share the file | ||
| p = self.client.permissions().create(fileId=file_id, body=permission_dict).execute() | ||
|
|
||
| return p | ||
|
|
||
| def share_object( | ||
| self, | ||
| file_id: str, | ||
| email_addresses: Optional[list[str]] = None, | ||
| role: str = "reader", | ||
| type: str = "user", | ||
| ) -> list[dict]: | ||
| """ | ||
| `Args:` | ||
| file_id: str | ||
| this is the ID of the object you are hoping to share | ||
| email_addresses: list | ||
| this is the list of the email addresses you want to share; | ||
| set to a list of domains like `['domain']` if you choose `type='domain'`; | ||
| set to `None` if you choose `type='anyone'` | ||
| role: str | ||
| Options are -- owner, organizer, fileOrganizer, writer, commenter, reader | ||
| https://developers.google.com/drive/api/guides/ref-roles | ||
| type: str | ||
| Options are -- user, group, domain, anyone | ||
| `Returns:` | ||
| List of permission objects | ||
| """ | ||
| if role not in [ | ||
| "owner", | ||
| "organizer", | ||
| "fileOrganizer", | ||
| "writer", | ||
| "commenter", | ||
| "reader", | ||
| ]: | ||
| raise Exception( | ||
| f"{role} not from the allowed list of: \ | ||
| owner, organizer, fileOrganizer, writer, commenter, reader" | ||
| ) | ||
|
|
||
| if type not in ["user", "group", "domain", "anyone"]: | ||
| raise Exception( | ||
| f"{type} not from the allowed list of: \ | ||
| user, group, domain, anyone" | ||
| ) | ||
|
|
||
| if type == "domain": | ||
| permissions = [ | ||
| {"type": type, "role": role, "domain": email} for email in email_addresses | ||
| ] | ||
| else: | ||
| permissions = [ | ||
| {"type": type, "role": role, "emailAddress": email} for email in email_addresses | ||
| ] | ||
|
|
||
| new_permissions = [] | ||
| for permission in permissions: | ||
| p = self._share_object(file_id, permission) | ||
| new_permissions.append(p) | ||
|
|
||
| return new_permissions | ||
|
|
||
| def transfer_ownership(self, file_id: str, new_owner_email: str) -> None: | ||
| """ | ||
| `Args:` | ||
| file_id: str | ||
| this is the ID of the object you are hoping to share | ||
| new_owner_email: str | ||
| the email address of the intended new owner | ||
| `Returns:` | ||
| None | ||
| """ | ||
| permissions = self.client.permissions().list(fileId=file_id).execute() | ||
|
|
||
| # Find the current owner | ||
| current_owner_permission = next( | ||
| (p for p in permissions.get("permissions", []) if "owner" in p), None | ||
| ) | ||
|
|
||
| if current_owner_permission: | ||
| # Update the permission to transfer ownership | ||
| new_owner_permission = { | ||
| "type": "user", | ||
| "role": "owner", | ||
| "emailAddress": new_owner_email, | ||
| } | ||
| self.client.permissions().update( | ||
| fileId=file_id, | ||
| permissionId=current_owner_permission["id"], | ||
| body=new_owner_permission, | ||
| ).execute() | ||
| logger.info(f"Ownership transferred successfully to {new_owner_email}.") | ||
| else: | ||
| logger.info("File does not have a current owner.") | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| import os | ||
| import random | ||
| import string | ||
| import unittest | ||
|
|
||
| from parsons import GoogleDrive | ||
|
|
||
| # Test Slides: https://docs.google.com/presentation/d/19I-kicyaJV53KoPNwt77KJL10fHzWFdZ_c2mW4XJaxc | ||
|
|
||
|
|
||
| @unittest.skipIf(not os.environ.get("LIVE_TEST"), "Skipping because not running live test") | ||
| class TestGoogleDrive(unittest.TestCase): | ||
| def setUp(self): | ||
| self.gd = GoogleDrive() | ||
|
|
||
| def test_get_permissions(self): | ||
| file_id = "19I-kicyaJV53KoPNwt77KJL10fHzWFdZ_c2mW4XJaxc" | ||
| p = self.gd.get_permissions(file_id) | ||
| self.assertTrue(True, "anyoneWithLink" in [x["id"] for x in p["permissions"]]) | ||
|
||
|
|
||
| def test_share_object(self): | ||
| file_id = "19I-kicyaJV53KoPNwt77KJL10fHzWFdZ_c2mW4XJaxc" | ||
| email = "".join(random.choices(string.ascii_letters, k=10)) + "@gmail.com" | ||
| email_addresses = [email] | ||
|
|
||
| before = self.gd.get_permissions(file_id)["permissions"] | ||
| self.gd.share_object(file_id, email_addresses) | ||
| after = self.gd.get_permissions(file_id)["permissions"] | ||
| self.assertTrue(True, len(after) > len(before)) | ||
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show fixed
Hide fixed
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.