Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 47 additions & 41 deletions python-package/basedosdados/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import shutil
import sys
import warnings
from functools import lru_cache
from functools import cached_property
from os import getenv
from pathlib import Path
from typing import Dict, List, Union
from typing import Dict, List, Literal, Optional, TypedDict, Union

import googleapiclient.discovery
import tomlkit
Expand All @@ -24,6 +24,14 @@
warnings.filterwarnings("ignore")


class Client(TypedDict):
bigquery_prod: bigquery.Client
bigquery_connection_prod: bigquery_connection_v1.ConnectionServiceClient
bigquery_staging: bigquery.Client
bigquery_connection_staging: bigquery_connection_v1.ConnectionServiceClient
storage_staging: storage.Client


class Base:
"""
Base class for all datasets
Expand All @@ -35,7 +43,7 @@ def __init__(
bucket_name=None,
billing_project_id=None,
overwrite_cli_config=False,
folder="staging",
mode="staging",
):
"""
Initialize the class
Expand All @@ -52,12 +60,14 @@ def __init__(
self.config = self._load_config()
self._config_log(config.verbose)
self.bucket_name = bucket_name or self.config["bucket_name"]
self.folder = folder
self.mode = mode
self.billing_project_id = (
billing_project_id
or self.config["gcloud-projects"]["staging"]["name"]
)
self.uri = f"gs://{self.bucket_name}/{folder}" + "/{dataset}/{table}/*"
self.uri = (
f"gs://{self.bucket_name}/{self.mode}" + "/{dataset}/{table}/*"
)
self._backend = Backend(self.config.get("api", {}).get("url", None))

@property
Expand Down Expand Up @@ -103,33 +113,32 @@ def _load_credentials(self, mode: str):
],
)

@property
@lru_cache(256)
def client(self):
@cached_property
def client(self) -> Client:
"""
Client for BigQuery
"""

return dict(
bigquery_prod=bigquery.Client(
return {
"bigquery_prod": bigquery.Client(
credentials=self._load_credentials("prod"),
project=self.config["gcloud-projects"]["prod"]["name"],
),
bigquery_connection_prod=bigquery_connection_v1.ConnectionServiceClient(
"bigquery_connection_prod": bigquery_connection_v1.ConnectionServiceClient(
credentials=self._load_credentials("prod")
),
bigquery_staging=bigquery.Client(
"bigquery_staging": bigquery.Client(
credentials=self._load_credentials("staging"),
project=self.config["gcloud-projects"]["staging"]["name"],
),
bigquery_connection_staging=bigquery_connection_v1.ConnectionServiceClient(
"bigquery_connection_staging": bigquery_connection_v1.ConnectionServiceClient(
credentials=self._load_credentials("staging")
),
storage_staging=storage.Client(
"storage_staging": storage.Client(
credentials=self._load_credentials("staging"),
project=self.config["gcloud-projects"]["staging"]["name"],
),
)
}

@staticmethod
def _input_validator(context, default="", with_lower=True):
Expand Down Expand Up @@ -379,34 +388,31 @@ def _load_config(self):
)

@staticmethod
def _check_folder(folder: str = "staging"):
def _check_mode(mode: str) -> Optional[Literal[True]]:
"""
Checks if the folder is valid
Checks if the mode is valid
"""
if folder is not None and isinstance(folder, str):
return
if isinstance(mode, str) and len(mode.strip()) > 0:
return True

raise Exception(
"This folder does not accept values ​​equal to None and different from string."
"We recommend the following names for the folder:"
"'staging', 'raw', 'header', 'auxiliary_files', 'architecture' or organization_name"
)
msg = f"Mode {mode} is not supported. We recommend the following names for the folder: 'staging', 'raw', 'header', 'auxiliary_files', 'architecture' or organization_name"
raise Exception(msg)

def _get_project_id(self, project_gcp: str) -> str:
def _get_project_id(self, mode: str) -> str:
"""
Get the project ID.
"""
return self.config["gcloud-projects"][project_gcp]["name"]
return self.config["gcloud-projects"][mode]["name"]

def _get_project_number(self, project_gcp: str) -> str:
def _get_project_number(self, mode: str) -> str:
"""
Get the project number from project ID.
"""
credentials = self._load_credentials(project_gcp)
credentials = self._load_credentials(mode)
crm_service = googleapiclient.discovery.build(
"cloudresourcemanager", "v1", credentials=credentials
)
project_id = self._get_project_id(project_gcp)
project_id = self._get_project_id(mode)

return (
crm_service.projects()
Expand All @@ -415,19 +421,19 @@ def _get_project_number(self, project_gcp: str) -> str:
)

def _get_project_iam_policy(
self, project_gcp: str
self, mode: str
) -> Dict[str, Union[str, int, List[Dict[str, Union[str, List[str]]]]]]:
"""
Get the project IAM policy.
"""
credentials = self._load_credentials(project_gcp)
credentials = self._load_credentials(mode)
service = googleapiclient.discovery.build(
"cloudresourcemanager", "v1", credentials=credentials
)
policy = (
service.projects()
.getIamPolicy(
resource=self._get_project_id(project_gcp),
resource=self._get_project_id(mode),
body={"options": {"requestedPolicyVersion": 1}},
)
.execute()
Expand All @@ -439,43 +445,43 @@ def _set_project_iam_policy(
policy: Dict[
str, Union[str, int, List[Dict[str, Union[str, List[str]]]]]
],
project_gcp: str,
mode: str,
):
"""
Set the project IAM policy.
"""
credentials = self._load_credentials(project_gcp)
credentials = self._load_credentials(mode)
service = googleapiclient.discovery.build(
"cloudresourcemanager", "v1", credentials=credentials
)
service.projects().setIamPolicy(
resource=self._get_project_id(project_gcp), body={"policy": policy}
resource=self._get_project_id(mode), body={"policy": policy}
).execute()

def _grant_role(self, role: str, member: str, project_gcp: str):
def _grant_role(self, role: str, member: str, mode: str):
"""
Grant a role to a member.
"""
policy = self._get_project_iam_policy(project_gcp)
policy = self._get_project_iam_policy(mode)
try:
binding = next(b for b in policy["bindings"] if b["role"] == role)
except StopIteration:
binding = {"role": role, "members": []}
policy["bindings"].append(binding)
if member not in binding["members"]:
binding["members"].append(member)
self._set_project_iam_policy(policy, project_gcp)
self._set_project_iam_policy(policy, mode)

def _revoke_role(self, role: str, member: str, project_gcp: str):
def _revoke_role(self, role: str, member: str, mode: str):
"""
Revoke a role from a member.
"""
policy = self._get_project_iam_policy(project_gcp)
policy = self._get_project_iam_policy(mode)
try:
binding = next(b for b in policy["bindings"] if b["role"] == role)
except StopIteration:
return
else:
if member in binding["members"]:
binding["members"].remove(member)
self._set_project_iam_policy(policy, project_gcp)
self._set_project_iam_policy(policy, mode)
13 changes: 7 additions & 6 deletions python-package/basedosdados/download/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,6 @@ def _download_blob_from_bucket(
client: _GoogleClient,
bucket_name: str,
savepath: Path,
user_project: str = "basedosdados-dev",
) -> None:
"""
Download a blob from a bucket to the path specified.
Expand All @@ -383,7 +382,9 @@ def _download_blob_from_bucket(
Returns:
None
"""
bucket = client["storage"].bucket(bucket_name, user_project=user_project)
bucket = client["storage"].bucket(
bucket_name, user_project=client["storage"].project
)
for blob in bucket.list_blobs():
filepath = savepath / (blob.name.split("-")[-1] + ".csv.gz")
blob.download_to_filename(filepath)
Expand All @@ -392,7 +393,6 @@ def _download_blob_from_bucket(
def _create_bucket(
client: _GoogleClient,
bucket_name: str,
user_project: str = "basedosdados-dev",
) -> None:
"""
Create a new bucket in a specific location with standard storage class.
Expand All @@ -405,21 +405,22 @@ def _create_bucket(
None
"""
storage_client = client["storage"]
bucket = storage_client.bucket(bucket_name, user_project=user_project)
bucket = storage_client.bucket(
bucket_name, user_project=client["storage"].project
)

# standard storage class are adequate for data
# stored for only brief periods of time
bucket.storage_class = "STANDARD"

storage_client.create_bucket(
bucket, location="US", user_project=user_project
bucket, location="US", user_project=client["storage"].project
)


def _delete_bucket(
client: _GoogleClient,
bucket_name: str,
user_project: str = "basedosdados-dev",
) -> None:
"""Forceably deletes a bucket.

Expand Down
7 changes: 4 additions & 3 deletions python-package/basedosdados/upload/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ def __init__(
self,
name: str,
location: str = None,
project_gcp: str = "staging",
mode: str = "staging",
friendly_name: str = None,
description: str = None,
**kwargs,
):
super().__init__(**kwargs)
self._name = name
self._location = location or "US"
self._mode = mode
self._friendly_name = friendly_name
self._description = description
self._project = self.config["gcloud-projects"][project_gcp]["name"]
self._project = self.config["gcloud-projects"][self._mode]["name"]
self._parent = f"projects/{self._project}/locations/{self._location}"

@property
Expand All @@ -54,7 +55,7 @@ def connection(self) -> Union[BQConnection, None]:
"""
Returns connection object.
"""
client = self.client[f"bigquery_connection_{self._project}"]
client = self.client[f"bigquery_connection_{self._mode}"]
request = GetConnectionRequest(
name=f"{self._parent}/connections/{self._name}"
)
Expand Down
Loading
Loading