Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 58 additions & 29 deletions rbac_roles_cli.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
# python3 rbac_roles_cli.py -user <USER_NAME> -pass <PASSWORD> -u http://<IP>:<PORT> -r NEW_ROLE -d DAG_NAME_1 DAG_NAME_2

from typing import List
import requests
import argparse
from requests.auth import HTTPBasicAuth

def create_rbac_role_with_permissions(
airflow_url: str,
new_role_name: str,
dag_names: List[str],
google_access_token: str=None,
airflow_url: str,
new_role_name: str,
dag_names: List[str]
):
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
if google_access_token != None:
headers["Authorization"] = "Bearer " + google_access_token


read = "can_read"
edit = "can_edit"
create = "can_create"
Expand All @@ -38,7 +34,7 @@ def create_rbac_role_with_permissions(
read_permissions = make_permissions(read,[dag])
edit_permissions = make_permissions(edit, [dag])
delete_permissions = make_permissions(delete, [dag])
permissions += read_permissions + edit_permissions + delete_permissions
permissions += read_permissions + edit_permissions + delete_permissions

data = {
"actions": [
Expand All @@ -47,17 +43,8 @@ def create_rbac_role_with_permissions(
"name": new_role_name
}

airflow_url += "/api/v1/roles"
response = requests.post(airflow_url, json=data, headers=headers)
return data

if response.status_code == 403:
raise PermissionError(f"Error 403 returned, please check if your AirFlow account is Op/Admin or verify the dags exist. \n {response.json()}")
elif response.status_code == 401:
raise PermissionError(f"Error 401 returned, please check the access token if the page is protected by an authentication")
elif response.status_code == 200:
print(f"Role `{new_role_name}` successfuly created.")
else:
raise ConnectionError(f"An error occured during role creation: {response.json()}")

def make_permissions(action, resources):
permissions = []
Expand All @@ -72,17 +59,59 @@ def make_permission(action, resource):
}


def on_login(airflow_user_api, airflow_pass_api, airflow_url, data, new_role_name):

headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}

try:

credentials=HTTPBasicAuth(airflow_user_api, airflow_pass_api)

airflow_url += "/api/v1/roles"
response = requests.post(airflow_url, json=data, headers=headers, auth=credentials, timeout=5)

if response.status_code == 403:
raise RuntimeError(f"Error 403 returned, please check if your AirFlow account is Op/Admin or verify the dags exist. \n {response.json()}")
elif response.status_code == 401:
raise RuntimeError(f"Error 401 returned, please check the access token if the page is protected by an authentication")
elif response.status_code == 200:
print(f"Role `{new_role_name}` successfuly created.")
else:
raise RuntimeError(f"An error occured during role creation: {response.json()}")

except requests.exceptions.HTTPError as errh:
raise RuntimeError('A HTTPError error occurred') from errh
except requests.exceptions.ConnectionError as errc:
raise RuntimeError('A Connection error occurred') from errc
except requests.exceptions.Timeout as errt:
raise RuntimeError('A Timeout error occurred') from errt
except requests.exceptions.RequestException as err:
raise RuntimeError('A Request Exception error occurred') from err

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-user", "--airflow-user-api", required=True, help="User access to the Airflow UI root page")
parser.add_argument("-pass", "--airflow-pass-api", required=True, help="Password access to the composer Airflow UI root page")
parser.add_argument("-u", "--airflow-url", required=True, help="URL to the composer Airflow UI root page")
parser.add_argument("-r", "--role-name", required=True, help="Name of the new created role")
parser.add_argument("-t", "--access-token", required=True, help="Google access token used only if Airflow is managed by Cloud Composer")
parser.add_argument("-d", "--dags", nargs="+", required=True, help="List of accessible dags for the role")

args = parser.parse_args()
create_rbac_role_with_permissions(
args.airflow_url,
args.role_name,
args.dags,
args.access_token,
)

try:

data = create_rbac_role_with_permissions(
args.airflow_url,
args.role_name,
args.dags,
)

on_login(args.airflow_user_api, args.airflow_pass_api, args.airflow_url, data, args.role_name)

except RuntimeError as errh:
print("It didn't work:", errh)
if errh.__cause__:
print('Cause:', errh.__cause__)