Skip to content

Partitions #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
venv/
.vscode/
__pycache__/

48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,50 @@
# dss-code-samples
Various code samples for using DSS

## Refactoring

### Getting started

(DSS >= 8.0.3)

#### Use within DSS (as project library)
- Register in Project Lib Git
- No need to specify remote DSS params
- Profit

#### Outside of DSS
- Clone repository, tarzip it
- Create virtualenv with dss requirements and tarzipped archive
- Profit ...?

You can reuse them as they are, customize them for your own needs, and even package them into plugins.

Create a dedicated virtual environment and install the following packages:
* `dataiku-internal-client`: follow the instructions in the [DSS doc](https://doc.dataiku.com/dss/latest/python-api/outside-usage.html#installing-the-package)
* `dataikuapi`:
```
$ pip install dataiku-api-client
```
* `pandas`:
```
$ pip install "pandas>=1.0,<1.1"
```

### Structure

```
dss-code-samples
|_admin
|_applications
|_datasets
|_formulas
|_metrics_and_checks
|_machine_learning
|_partitioning
|_scenarios
|_statistics
|_webapps
```



File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Empty file added admin/__init__.py
Empty file.
64 changes: 64 additions & 0 deletions admin/job_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import dataiku
from datetime import datetime


def list_jobs_by_status(client=None, project_key=None):
"""List jobs by current status in a given project.

Args:
client: A handle on the target DSS instance
project_key: A string representing the target project key

Returns:
jobs_by_status: A dict of lists mapping jobs and their states
"""

project = client.get_project(project_key)
jobs_by_status = {"RUNNING": [],
"FAILED": [],
"DONE": [],
"ABORTED": []}
for job in project.list_jobs():
if not job["stableState"]:
jobs_by_status["RUNNING"].append(job)
else:
jobs_by_status[job["state"]].append(job)
return jobs_by_status


def filter_jobs_by_start_date(client=None, project_key=None, start_date=None):
"""List jobs that were started after a specific date.

Args:
client: A handle on the target DSS instance
project_key: A string representing the target project key
start_date: A string of the form 'YYYY/mm/dd'

Returns:
jobs_after_start_date: A dict of lists mapping jobs and their states

"""
jobs_by_status = list_jobs_by_status(client, project_key)
start_date_timestamp = int(datetime.strptime(start_date, "%Y/%m/%d").strftime("%s")) * 1000
is_after_start_date = lambda x, d: x["def"]["initiationTimestamp"] > d
jobs_after_start_date = {_status: [job for job in _list if is_after_start_date(job, start_date_timestamp)] for _status, _list in jobs_by_status.items()}
return jobs_after_start_date


def abort_all_running_jobs(client=None, project_key=None):
"""Terminate all running jobs in a project.

Args:
client: A handle on the target DSS instance
project_key: A string representing the target project key
"""

project = client.get_project(project_key)
aborted_jobs = []
for job in project.list_jobs():
if not job["stableState"]:
job_id = job["def"]["id"]
aborted_jobs.append(job_id)
project.get_job(job_id).abort()
print(f"Deleted {len(aborted_jobs)} running jobs")

31 changes: 31 additions & 0 deletions admin/project_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import dataiku

def edit_project_permissions(client=None, project_key=None, group=None, perms=None, revoke=False):
"""Grant or revoke project permissions for a given group.

Args:
client: A handle on the target DSS instance
project_key: A string representing the target project key
group: A string representing the target group name
perms: A list of permissions to grant
revoke: A boolean for completely revoking access to the project
"""

prj = client.get_project(project_key)
perm_obj = prj.get_permissions()
perm_list = perm_obj["permissions"]
for p in perm_list:
if p["group"] == group:
print("Deleting existing permissions...")
perm_list.remove(p)
if revoke:
perm_obj["permissions"] = perm_list
print(f"Revoking all permissions on project {project_key} for group {group}")
else:
if not perms:
print("Missing permission list, will grant ADMIN instead...")
perms = ["admin"]
new_group_perms = dict({"group": group}, **{p: True for p in perms})
perm_obj["permissions"].append(new_group_perms)
print(f"Granting {perms} to group {group} on project {project_key}...")
prj.set_permissions(perm_obj)
6 changes: 6 additions & 0 deletions admin/spark_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import dataiku

def add_spark_config(client=None, config=None):
return NotImplementedError


12 changes: 12 additions & 0 deletions datasets/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Datasets

## TODO

- [ ] Programmatically build partitions
- [ ] Retrieve last build date (PR #3)
- [ ] Read from/write to non-local-FS-folders
- [ ] Run containerized execution with input/output data in managed folders
- [ ] Flag and delete orphaned datasets
- [ ] Schema propagation from updated dataset
- [ ] Create "Upload" dataset and add/replace file(s)

Empty file added datasets/__init__.py
Empty file.
32 changes: 32 additions & 0 deletions datasets/dataset_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import dataiku
from datetime import datetime

def get_last_build_date(client=None, project_key=None, dataset=None):
"""Returns a datetime onject representing the last time an output
dataset was built.
Args:
client: A handle on the target DSS instance.
project_key: A string representing the target project key.
dataset: name of dataset,
"""
dataset_info = dataiku.Dataset("test_append").get_files_info()
last_modif = dataset_info.get("globalPaths")[0].get("lastModified")
dt = datetime.fromtimestamp(last_modif/1000)
return dt

def build_todays_partition(client=None, project_key=None, dataset=None):
"""Build parition of today's date in specified dataset.
Return status of build.
Args:
client: A handle on the target DSS instance.
project_key: A string representing the target project key.
dataset: name of dataset,
"""
now = datetime.now()
partition = now.strftime("%Y-%m-%d")
project = client.get_project(project_key)
dataset = project.get_dataset(dataset)
job = dataset.build(partitions=partition)
return job.get_status()


3 changes: 3 additions & 0 deletions deployer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- Keep a database connection persistent to speed up response time by opening a connection pool outside of the the API function
- Image classification scoring on a custom deep learning model
-
89 changes: 89 additions & 0 deletions machine_learning/mltask_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import dataiku

def get_best_model(client=None,
project_key=None,
analysis_id=None,
ml_task_id=None,
metric=None):
"""Return the 'best model' (according to the input metric) of a ML task.

Args:
client: A handle on the DSS instance
project_key: A string representing the target project key
analysis_id: A string linking to the target visual analysis.
Can be found in the analysis URL or via
dataikuapi.dss.project.DSSProject.list_analyses()
ml_task_id: A string linking to the target MLTask in a given analysis.
Can be found in the ML task URL or via
dataikuapi.dss.analysis.DSSAnalysis.list_ml_tasks()
metric: A string defining which metric to use for performance ranking

Returns:
ml_task: A handle to interact with the ML task.
Useful when (re)deploying the model.
best_model_snippet: A string containing the ID of the ML task's 'best model'

"""
prj = client.get_project(project_key)
analysis = prj.get_analysis(analysis_id)
ml_task = analysis.get_ml_task(ml_task_id)
trained_models = ml_task.get_trained_models_ids()
trained_models_snippets = [ml_task.get_trained_model_snippet(m) for m in trained_models]
# Assumes that for your metric, "higher is better"
best_model_snippet = max(trained_models_snippets, key=lambda x:x[metric])
best_model_id = best_model_snippet["fullModelId"]
return ml_task, best_model_id


def deploy_with_best_model(client=None,
project_key=None,
analysis_id=None,
ml_task_id=None,
metric=None,
saved_model_name=None,
training_dataset=None):
"""Create a new Saved Model in the Flow with the 'best model' of a ML task.

Args:
client: A handle on the DSS instance
project_key: A string representing the target project key.
analysis_id: A string linking to the target visual analysis.
Can be found in the analysis URL or via
dataikuapi.dss.project.DSSProject.list_analyses().
ml_task_id: A string linking to the target MLTask in a given analysis.
Can be found in the ML task URL or via
dataikuapi.dss.analysis.DSSAnalysis.list_ml_tasks().
metric: A string defining which metric to use for performance ranking.
saved_model_name: A string to name the newly-created Saved Model.
training_dataset: A string representing the name of the dataset
used as train set.

"""
ml_task, best_model_id = get_best_model(client,
project_key,
analysis_id,
ml_task_id,
metric)
ml_task.deploy_to_flow(best_model_id,
saved_model_name,
training_dataset)

def update_with_best_model(client=None,
project_key=None,
analysis_id=None,
ml_task_id=None,
metric=None,
saved_model_name=None,
activate=True):
"""Update an existing Saved Model in the Flow with the 'best model'
of a ML task.
"""
ml_task, best_model_id = get_best_model(client,
project_key,
analysis_id,
ml_task_id,
metric)
training_recipe_name = f"train_{saved_model_name}"
ml_task.redeploy_to_flow(model_id=best_model_id,
recipe_name=training_recipe_name,
activate=activate)
30 changes: 30 additions & 0 deletions machine_learning/saved_model_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import dataiku

def explore_saved_models(client=None, project_key=None):
"""List saved models of a project and give details on the active versions.

Args:
client: A handle on the target DSS instance
project_key: A string representing the target project key

Returns:
smdl_list: A dict with all saved model ids and perf + algorithm
for the active versions.

"""
smdl_list = []
prj = client.get_project(project_key)
smdl_ids = [x["id"] for x in prj.list_saved_models()]
for smdl in smdl_ids:
data = {}
obj = prj.get_saved_model(smdl)
data["version_ids"] = [m["id"] for m in obj.list_versions()]
active_version_id = obj.get_active_version()["id"]
active_version_details = obj.get_version_details(active_version_id)
data["active_version"] = {"id": active_version_id,
"algorithm": active_version_details.details["actualParams"]["resolved"]["algorithm"],
"performance_metrics": active_version_details.get_performance_metrics()}
smdl_list.append(data)
return smdl_list


4 changes: 4 additions & 0 deletions metrics_and_checks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Metrics and checks

- [ ] Retrieve metrics history of a dataset
- [ ] Retrieve metrics history of a model
27 changes: 27 additions & 0 deletions projects/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
- Build all
```python
client = dataiku.api_client()
project = client.get_project(dataiku.default_project_key())
flow = project.get_flow()
graph = flow.get_graph()
for k,v in graph.data.get('nodes').items():
if v.get('successors') == []:
definition = {
"type" : 'RECURSIVE_BUILD',
"outputs" : [{"id": k}]
}
print('Building dataset {}'.format(k))
job = project.start_job(definition)
```
Will need adjustments if there are saved models.

- Build specific tags only
- Build specific zones only
- Detect schema changes on a dataset and propagate them
```python
settings = dataset.get_settings()
settings.get_raw()["schema"] = {"columns":[]}
settings.save()
new_settings = dataset.autodetect_settings()
new_settings.save()
```
13 changes: 13 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
certifi==2020.6.20
chardet==3.0.4
idna==2.8
numpy==1.19.4
pandas==1.0.5
python-dateutil==2.8.0
pytz==2019.2
requests==2.22.0
six==1.15.0
urllib3==1.25.11
dataiku-api-client==8.0.0
http://localhost:40000/public/packages/dataiku-internal-client.tar.gz

8 changes: 8 additions & 0 deletions scenarios/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Scenarios

- [ ] Implement a timeout for a particular scenario step

- View all the "run after scenario" dependences between projects
> you'll need to write some code using the public API to loop over the scenario settings, look for follow_scenariorun triggers, and build the dependency tree yourself


Loading