diff --git a/.gitignore b/.gitignore index 8dde1872..6ee6d7f7 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ artifacts/ __pycache__/ *.py[cod] *$py.class +**.DS_Store # C extensions *.so diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..b6812340 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,40 @@ +FROM python:3.8 + +# Basic toolchain +RUN apt-get update && apt-get install -y \ + apt-utils \ + build-essential \ + git \ + wget \ + unzip \ + yasm \ + pkg-config \ + libcurl4-openssl-dev \ + zlib1g-dev \ + htop \ + cmake \ + vim \ + nano \ + python3-pip \ + python3-dev \ + python3-tk \ + libx264-dev \ + gcc \ + # python-pytest \ + && cd /usr/local/bin \ + && pip3 install --upgrade pip \ + && apt-get autoremove -y + +RUN git clone -b develop https://github.com/AlgoveraAI/same-project.git + +WORKDIR /same-project + +ARG DEBIAN_FRONTEND=noninteractive + +RUN pip3 install . + +RUN python3.8 -m pip install jupyter +RUN python3.8 -m pip install nbconvert +ENV KF_PIPELINES_ENDPOINT_ENV='ml_pipeline.kubeflow.svc.cluster.local:8888' + +RUN chmod +x ./ocean.sh diff --git a/config.yaml b/config.yaml new file mode 100644 index 00000000..e69de29b diff --git a/docs/docs/blog/2022-xx-xx-Decentralized-AI-with-The-SAME-Project.md b/docs/docs/blog/2022-xx-xx-Decentralized-AI-with-The-SAME-Project.md new file mode 100644 index 00000000..2654ed2a --- /dev/null +++ b/docs/docs/blog/2022-xx-xx-Decentralized-AI-with-The-SAME-Project.md @@ -0,0 +1,66 @@ +# Developing and training AI models in the decentralized web + +## Ocean Protocol and Decentralized AI + +The SAME Project allows data scientists to easily turn their Jupyter notebooks into executable scripts that can automatically be sent to any compute pipeline. + +Ocean Protocol builds tools for the decentralized data economy, particularly, one of the core features of Ocean Protocol is the ability to train your models on private data, called Compute-to-Data (C2D). + +In C2D, the data scientist first searches the Ocean Market for data they want to traain their algorithm on. Once they found a dataset they like, they would buy access to that dataset through Ocean Protocol's data tokens, which act as tickets denoting who can access some dataset and under what conditions. The data scientist must then publish their model on the Ocean Market as well and execute a series of steps to train their algorithm on the dataset on a separate Compute Provider. More details on C2D can be found [here](https://blog.oceanprotocol.com/v2-ocean-compute-to-data-guide-9a3491034b64). + +Long-story short, the Ocean C2D is a perfect fit for the SAME Project, allowing data scientists to focus more on their model development rather than learning the ins and outs of Ocean Protocol's libraries. + +## SAME-Ocean Template Quickstart + +This short guide assumes you've already installed the SAME Project in your local environment, [here](https://sameproject.ml/getting-started/installing/) is a guide to get you started. + +While most of the Ocean deployment code is abstracted away in the SAME-Ocean template, there are some config parameters that you need to fill in to interact with the Ocean Market, in particular, you'll need a [Web3 wallet](https://metamask.io/) and a wallet private key. To ensure security, make sure to never expose your wallet private key anywhere outside your local environment. For running C2D, export your wallet private key as a local environment variable: +``` +export WALLET_PRIVATE_KEY=='YOUR_PRIVATE_KEY' +``` + +When you're ready to run C2D, navigate to your working Jupyter notebook and in your terminal run +``` +same run -t ocean +``` +Note that at the end of the command, you'll have to add the options shown below. This is done by adding `--option-name=value` +### SAME-Ocean Runtime Options + +* `algo-verified`: bool - specify whether algorithm was verified by the data provider for C2D +* `algo-pushed`: bool - specify whether algorithm was published to GitHub (currently required, aimed to be removed) +* `network`: str - network URL to access Ocean Market on +* `provider-address`: str - address of compute provider +* `wallet-private-key`: str - private key for paying transactions in the pipeline +* `dt-did`: str - Decentralized Identifier of the dataset (found through Ocean Market) +* `dt-pool`: str - address of the dataset liquidity pool (applicable if dataset has dynamic pricing) +* `algo-tag`: str - tag to refer to the model as +* `algo-version`: str - version number of the published model +* `algo-url`: str - GitHub URL to raw model code +* `algo-name`: str - name of model +* `author`: str - model author name +* `licence`: str - model licence +* `max-dt-price`: int - max price willing to pay for dataset (in OCEAN) + + +## The SAME Community + +SAME is entirely open-source and non-commercial. We plan on donating it to a foundation as soon as we can identify one that matches our project's goals. + +What can you do? Please join our community! + +### Public web content + +* [Website](https://sameproject.ml) +* [Google Group](https://groups.google.com/u/2/g/same-project) +* [Slack](https://join.slack.com/t/sameproject/shared_invite/zt-lq9rk2g6-Jyfv3AXu_qnX9LqWCmV7HA) + +### Come join our repo + +* [GitHub Organization](https://github.com/SAME-Project) / [GitHub Project](https://github.com/SAME-Project/same-project) +* Try it out (build instructions included) +* Complain about missing features +* EXPERTS ONLY: Add your own + +Regardless, we are very open to taking your feedback. Thank you so much - onward! + +-- The Co-founders of the SAME Project ([David Aronchick](https://twitter.com/aronchick) & [Luke Marsden](https://twitter.com/lmarsden)) diff --git a/ocean.sh b/ocean.sh new file mode 100644 index 00000000..6d570b7a --- /dev/null +++ b/ocean.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +cd /data/transformations/ + +mv algorithm hello.ipynb + +same init + +export KF_PIPELINES_ENDPOINT_ENV='ml_pipeline.kubeflow.svc.cluster.local:8888' + +echo KF_PIPELINES_ENDPOINT_ENV + +same run + +jupyter nbconvert hello.ipynb --to python + +python3.8 hello.py diff --git a/ocean_c2d/render_ocean.py b/ocean_c2d/render_ocean.py new file mode 100644 index 00000000..77adca6c --- /dev/null +++ b/ocean_c2d/render_ocean.py @@ -0,0 +1,210 @@ +from jinja2 import Environment, FileSystemLoader, select_autoescape +from pathlib import Path +from typing import Tuple +from uuid import uuid4 +from base64 import urlsafe_b64encode +import logging +import os +import time + +from sameproject.data.step import Step +from sameproject.ops import helpers +import sameproject.ops.explode + + +from sameproject.ops.code import get_magic_lines, remove_magic_lines, get_installable_packages +from sameproject.data.config import SameConfig +from sameproject.data import Step +from typing import Tuple, List +from io import BufferedReader +from pathlib import Path +import jupytext +import logging +import click + + +def compile(config: SameConfig, target: str) -> Tuple[Path, str]: + notebook = read_notebook(config.notebook.path) + all_steps = get_steps(notebook, config) + + return render( + target=target, + steps=all_steps, + config=config + ) + + +def read_notebook(notebook_path) -> dict: + logging.info(f"Using notebook from here: {notebook_path}") + try: + notebook_file_handle = Path(notebook_path) + ntbk_dict = jupytext.read(str(notebook_file_handle)) + except FileNotFoundError: + logging.fatal(f"No notebook found at {notebook_path}") + exit(1) + + return ntbk_dict + + +def get_steps(notebook: dict, config: SameConfig) -> dict: + """Parses the code in a notebook into a series of SAME execution steps.""" + + steps = {} + all_code = "" + code_buffer = [] + this_step_index = 0 + this_step_name = "same_step_000" + this_step_code = "" + this_step_cache_value = "P0D" + this_step_environment_name = "default" + this_step_tags = [] + + def save_step(): + steps[this_step_name] = Step( + name=this_step_name, + code=remove_magic_lines(this_step_code), + index=this_step_index, + cache_value=this_step_cache_value, + environment_name=this_step_environment_name, + tags=this_step_tags, + parameters=[], + packages_to_install=[], + frozen_box=False, # TODO: make immutable + ) + + # Inject pip requirements file if configured: + if "requirements" in config.notebook: + with open(config.notebook.requirements, "r") as file: + steps[this_step_name].requirements_file = file.read() + + for num, cell in enumerate(notebook["cells"]): + if "metadata" not in cell: # sanity check + continue + + if len(cell["metadata"]) > 0 and "tags" in cell["metadata"] and len(cell["metadata"]["tags"]) > 0: + for tag in cell["metadata"]["tags"]: + if tag.startswith("same_step_"): + if num > 0: # don't create empty step + this_step_code = "\n".join(code_buffer) + all_code += "\n" + this_step_code + save_step() + + code_buffer = [] + step_tag_num = int(tag.split("same_step_")[1]) + this_step_index = step_tag_num + this_step_name = f"same_step_{step_tag_num:03}" + this_step_code = "" + this_step_cache_value = "P0D" + this_step_environment_name = "default" + this_step_tags = [] + + elif str.startswith(tag, "cache="): + this_step_cache_value = str.split(tag, "=")[1] + elif str.startswith(tag, "environment="): + this_step_environment_name = str.split(tag, "=")[1] + else: + this_step_tags.append(tag) + + if cell["cell_type"] == "code": # might be a markdown cell + code_buffer.append("\n".join(jupytext.cell_to_text.LightScriptCellExporter(cell, "py").source)) + + this_step_code = "\n".join(code_buffer) + all_code += "\n" + this_step_code + save_step() + + magic_lines = get_magic_lines(all_code) + if len(magic_lines) > 0: + magic_lines_string = "\n".join(magic_lines) + logging.warning(f"""Notebook contains magic lines, which will be ignored:\n{magic_lines_string}""") + + # Remove magic lines from code so that we can continue: + all_code = remove_magic_lines(all_code) + + for k in steps: + steps[k].packages_to_install = get_installable_packages(all_code) + + return steps + + +def get_sorted_list_of_steps(notebook: dict, config: SameConfig) -> list: + """ + Given a notebook (as a dict), get a list of Step objects, sorted by their + index in the notebook. + """ + steps_dict = get_steps(notebook, config) + steps = list(steps_dict.values()) + steps_sorted_by_index = sorted(steps, key=lambda x: x.index) + return steps_sorted_by_index + + +def get_code(notebook: dict) -> str: + """Combines and returns all python code in the given notebook.""" + if "cells" not in notebook: + return "" + + code = [] + for cell in notebook["cells"]: + if cell["cell_type"] != "code": + continue + + code.append("\n".join( + jupytext.cell_to_text.LightScriptCellExporter(cell, "py").source + )) + + return "\n".join(code) + + +ocean_step_template = "step.jinja" + + +def render(compile_path: str, steps: list, same_config: dict) -> Tuple[Path, str]: + """Renders the notebook into a root file and a series of step files according to the target requirements. Returns an absolute path to the root file for deployment.""" + + templateDir = os.path.dirname(os.path.abspath(__file__)) + templateLoader = FileSystemLoader(templateDir) + env = Environment(trim_blocks=True, loader=templateLoader) + + root_file_string = _build_step_file(env, next(iter(steps.values())), same_config) + root_pipeline_name = f"root_pipeline_{uuid4().hex.lower()}" + root_path = Path(compile_path) / f"{root_pipeline_name}.py" + helpers.write_file(root_path, root_file_string) + + # for storing in the docker image + docker_path = same_config['notebook']['path'][:-5] + 'py' + helpers.write_file(docker_path, root_file_string) + os.remove(same_config['notebook']['path']) + return (compile_path, root_file_string) # note: root_file_string replaced root_pipeline_name + +def _build_step_file(env: Environment, step: Step, same_config) -> str: + with open(sameproject.ops.explode.__file__, "r") as f: + explode_code = f.read() + + requirements_file = None + if "requirements_file" in step: + requirements_file = urlsafe_b64encode(bytes(step.requirements_file, "utf-8")).decode() + + memory_limit = same_config.runtime_options.get( + "serialisation_memory_limit", + 512 * 1024 * 1024, # 512MB + ) + + same_env = same_config.runtime_options.get( + "same_env", + "default", + ) + + step_contract = { + "name": step.name, + "same_env": same_env, + "memory_limit": memory_limit, + "unique_name": step.unique_name, + "requirements_file": requirements_file, + "user_code": step.code, + "explode_code": urlsafe_b64encode(bytes(explode_code, "utf-8")).decode(), + "same_yaml": urlsafe_b64encode(bytes(same_config.to_yaml(), "utf-8")).decode(), + } + + return env.get_template(ocean_step_template).render(step_contract) + +if __name__ == "__main__": + compile("same.yaml", os.environ("AlGO")) \ No newline at end of file diff --git a/ocean_c2d/requirements.txt b/ocean_c2d/requirements.txt new file mode 100644 index 00000000..f1b2e920 --- /dev/null +++ b/ocean_c2d/requirements.txt @@ -0,0 +1,2 @@ +# Dependencies for /Users/jakub/Development/Algovera/Core/same-project/demo/test.ipynb: + diff --git a/ocean_c2d/same.yaml b/ocean_c2d/same.yaml new file mode 100644 index 00000000..64b82e05 --- /dev/null +++ b/ocean_c2d/same.yaml @@ -0,0 +1,14 @@ +apiVersion: sameproject.ml/v1alpha1 +environments: + default: + image_tag: combinatorml/jupyterlab-tensorflow-opencv:0.9 +metadata: + labels: [] + name: default_config + version: 0.0.0 +notebook: + name: test + path: /data/transformation/notebook.ipynb + requirements: /same-project/ocean_c2d/requirements.txt +run: + name: default_config run diff --git a/sameproject/__init__.py b/sameproject/__init__.py index 1677a825..34bcf63f 100644 --- a/sameproject/__init__.py +++ b/sameproject/__init__.py @@ -20,3 +20,4 @@ import sameproject.ops.aml.options import sameproject.ops.functions.options import sameproject.ops.kubeflow.options +import sameproject.ops.ocean.options diff --git a/sameproject/cli/init.py b/sameproject/cli/init.py index 58e94b0c..b33f9b76 100644 --- a/sameproject/cli/init.py +++ b/sameproject/cli/init.py @@ -47,74 +47,55 @@ def init(): # Start by looking for an existing same config in the current directory. cfg = find_same_config(recurse=False) if cfg is not None: - click.echo("An existing SAME config file was found at the following path:") - click.echo(f"\t{cfg}") - if not click.confirm("Do you want to replace it?", default=False): - exit(0) + print("An existing SAME config file was found at the following path:") + print(f"\t{cfg}") + exit(0) else: cfg = Path("./same.yaml") # Name of the pipeline: - pl_name = click.prompt( - "Name of this config:", - default="default_config", - type=name_type - ) + pl_name = "default_config" # Notebook data: - nb_path = click.prompt( - "Notebook path", - default=find_notebook(recurse=True), - type=file_type, - ) + nb_path = find_notebook(recurse=True) if not nb_path.exists(): - click.echo(f"No such file found: {nb_path}", err=True) + print(f"No such file found: {nb_path}") exit(1) nb_dict = read_notebook(nb_path) - nb_name = click.prompt("Notebook name", default=nb_name, type=str) + nb_name = str(nb_path.name).replace(".ipynb", "") # Docker image data: - image = click.prompt( - "Default docker image", - default="combinatorml/jupyterlab-tensorflow-opencv:0.9", - type=image_type - ) + image = "smejak/samedockers:0.1" + # image = "combinatorml/jupyterlab-tensorflow-opencv:0.9" # Requirements.txt data: req = find_requirements(recurse=False) if req is None: - if click.confirm("No requirements.txt found in current directory - would you like to create one?", default=True): - req_contents = f"# Dependencies for {nb_path.resolve()}:\n" - - writing_reqs = False - if click.confirm("Would you like SAME to fill in the requirements.txt for you?", default=True): - code = remove_magic_lines(get_code(nb_dict)) - modules = get_imported_modules(code) - pkg_info = get_package_info(modules) - - if len(pkg_info) > 0: - writing_reqs = True - click.echo("Found the following requirements for the notebook:") - for pkg in pkg_info: - click.echo(f"\t{pkg_info[pkg].name}=={pkg_info[pkg].version}") - else: - click.echo("No requirements found for the notebook.") - req_contents += render_package_info(pkg_info) + "\n" - - req = Path("requirements.txt") - with req.open("w") as file: - file.write(req_contents) - - if writing_reqs: - click.echo(f"Wrote requirements to {req.resolve()}.") - else: - click.echo(f"Wrote empty requirements file to {req.resolve()}.") + req_contents = f"# Dependencies for {nb_path.resolve()}:\n" + + writing_reqs = False + code = remove_magic_lines(get_code(nb_dict)) + modules = get_imported_modules(code) + pkg_info = get_package_info(modules) + + if len(pkg_info) > 0: + writing_reqs = True + print("Found the following requirements for the notebook:") + for pkg in pkg_info: + print(f"{pkg_info[pkg].name}=={pkg_info[pkg].version}") + else: + print("No requirements found for the notebook.") + req_contents += render_package_info(pkg_info) + "\n" + + req = Path("requirements.txt") + with req.open("w") as file: + file.write(req_contents) + + if writing_reqs: + print(f"Wrote requirements to {req.resolve()}.") + else: + print(f"Wrote empty requirements file to {req.resolve()}.") else: - req = click.prompt( - "Requirements.txt", - default=req, - type=file_type, - ) if req == "": req = None @@ -131,6 +112,13 @@ def init(): "image_tag": image, }, }, + "datasets": { + "data": { + "environments": { + "default": "/data/input", + }, + }, + }, "notebook": { "name": nb_name, "path": str(nb_path), @@ -144,14 +132,11 @@ def init(): if req is not None: same_config.notebook.requirements = str(req) - click.echo(f"About to write to {cfg.absolute()}:") - click.echo() - click.echo(same_config.to_yaml()) - if click.confirm("Is this okay?", default=True): - cfg.write_text(same_config.to_yaml()) - click.echo(f"Wrote config file to {cfg.absolute()}.") - click.echo() - click.echo("""You can now run 'same verify' to check that everything is configured correctly + print(f"About to write to {cfg.absolute()}:") + print(same_config.to_yaml()) + cfg.write_text(same_config.to_yaml()) + print(f"Wrote config file to {cfg.absolute()}.") + print("""You can now run 'same verify' to check that everything is configured correctly (requires docker locally), or you can run 'same run' to deploy the pipeline to a configured backend (e.g. Kubeflow Pipelines in a Kubernetes cluster file pointed to by ~/.kube/config or set in the KUBECONFIG environment variable). diff --git a/sameproject/cli/run.py b/sameproject/cli/run.py index 6883f747..f96413c1 100644 --- a/sameproject/cli/run.py +++ b/sameproject/cli/run.py @@ -26,7 +26,7 @@ "-t", "--target", default="kubeflow", - type=click.Choice(["aml", "kubeflow", "functions"]), + type=click.Choice(["aml", "kubeflow", "functions", "ocean"]), ) @click.option( "--persist-temp-files", diff --git a/sameproject/ops/backends.py b/sameproject/ops/backends.py index 93962919..f58136c3 100644 --- a/sameproject/ops/backends.py +++ b/sameproject/ops/backends.py @@ -5,6 +5,7 @@ import sameproject.ops.functions as functions import sameproject.ops.kubeflow as kubeflow import sameproject.ops.aml as aml +import sameproject.ops.ocean as ocean import sameproject.ops.helpers import tempfile import click @@ -15,6 +16,7 @@ def render(target: str, steps: list, config: SameConfig, compile_path: str = Non "aml": aml.render, "kubeflow": kubeflow.render, "functions": functions.render, + "ocean": ocean.render } render_function = target_renderers.get(target, None) @@ -33,6 +35,7 @@ def deploy(target: str, base_path: Path, root_file: str, config: SameConfig): "aml": aml.deploy, "kubeflow": kubeflow.deploy, "functions": functions.deploy, + "ocean": ocean.deploy } deploy_function = target_deployers.get(target, None) diff --git a/sameproject/ops/kubeflow/__init__.py b/sameproject/ops/kubeflow/__init__.py index 3edf3212..3cd4721c 100644 --- a/sameproject/ops/kubeflow/__init__.py +++ b/sameproject/ops/kubeflow/__init__.py @@ -1,2 +1,2 @@ from .render import render -from .deploy import deploy +from .deploy import deploy \ No newline at end of file diff --git a/sameproject/ops/kubeflow/deploy.py b/sameproject/ops/kubeflow/deploy.py index 39cab877..3e14fab6 100644 --- a/sameproject/ops/kubeflow/deploy.py +++ b/sameproject/ops/kubeflow/deploy.py @@ -2,14 +2,22 @@ from sameproject.ops import helpers from pathlib import Path import importlib + +import kfp.dsl as dsl import kfp + def deploy(base_path: Path, root_file: str, config: SameConfig): with helpers.add_path(str(base_path)): root_module = importlib.import_module(root_file) # python module - kfp_client = kfp.Client() # only supporting 'kubeflow' namespace + print("getting kfp_client") + kfp_client = kfp.Client(host="http://aff7367d8c2254073b6f563f2eb8efdc-b6898d80ac5be12c.elb.us-east-1.amazonaws.com") # only supporting 'kubeflow' namespace + print("got kfp_client") + + # dsl.BaseOp(name="data_collector").add_volume("/data/input") + return kfp_client.create_run_from_pipeline_func( root_module.root, arguments={}, diff --git a/sameproject/ops/kubeflow/root.jinja b/sameproject/ops/kubeflow/root.jinja index 127d1c9b..cc21228f 100644 --- a/sameproject/ops/kubeflow/root.jinja +++ b/sameproject/ops/kubeflow/root.jinja @@ -48,7 +48,7 @@ def root( secrets_by_env = {} {% for env_name in secrets_to_create_as_dict %} {% set secret = secrets_to_create_as_dict[env_name] %} - config.load_kube_config() + v1 = client.CoreV1Api() namespace = "kubeflow" name = "{{ experiment_name_safe }}" diff --git a/sameproject/ops/kubeflow/run_info.jinja b/sameproject/ops/kubeflow/run_info.jinja index 0a970d5f..5cfae7db 100644 --- a/sameproject/ops/kubeflow/run_info.jinja +++ b/sameproject/ops/kubeflow/run_info.jinja @@ -15,7 +15,7 @@ def run_info_fn( import dill import kfp - client = kfp.Client(host="http://ml-pipeline:8888") + client = kfp.Client(host="http://aff7367d8c2254073b6f563f2eb8efdc-b6898d80ac5be12c.elb.us-east-1.amazonaws.com") run_info = client.get_run(run_id=run_id) run_info_dict = { diff --git a/sameproject/ops/ocean/__init__.py b/sameproject/ops/ocean/__init__.py new file mode 100644 index 00000000..3cd4721c --- /dev/null +++ b/sameproject/ops/ocean/__init__.py @@ -0,0 +1,2 @@ +from .render import render +from .deploy import deploy \ No newline at end of file diff --git a/sameproject/ops/ocean/deploy.py b/sameproject/ops/ocean/deploy.py new file mode 100644 index 00000000..6683a1e7 --- /dev/null +++ b/sameproject/ops/ocean/deploy.py @@ -0,0 +1,25 @@ +from sameproject.data.config import SameConfig +from sameproject.ops import helpers +from pathlib import Path +import importlib +import kubernetes + +def create_job(logger, body, job): + try: + logger.debug(f"Creating job {job}") + batch_client = kubernetes.client.BatchV1Api() + obj = batch_client.create_namespaced_job(body["metadata"]["namespace"], job) + logger.info(f"{obj.kind} {obj.metadata.name} created") + except ApiException as e: + logger.debug(f"Exception when calling BatchV1Api->create_namespaced_job: {e}\n") + +def deploy(base_path: Path, root_file: str, config: SameConfig): + return + # with helpers.add_path(str(base_path)): + # root_module = importlib.import_module(root_file) # python module + + # client = boto3.client('ec2') + # return client.create_run_from_pipeline_func( + # root_module.root, + # arguments={}, + # ) diff --git a/sameproject/ops/ocean/options.py b/sameproject/ops/ocean/options.py new file mode 100644 index 00000000..caac07df --- /dev/null +++ b/sameproject/ops/ocean/options.py @@ -0,0 +1,89 @@ +from sameproject.ops.runtime_options import register_option + +register_option( + "algo_verified", + "Boolean specifying if published algorithm was accepted by the data publisher", + backend="ocean", +) + +register_option( + "algo_pushed", + "Boolean specifying if algorithm has already been pushed to GitHub.", + backend="ocean", +) + + +register_option( + "network", + "The network to use for publishing algorithm and getting the dataset.", + backend="ocean", +) + +register_option( + "provider_address", + "Address of compute provider", + backend="ocean", +) + +register_option( + "wallet_private_key", + "Private key of user wallet", + backend="ocean", +) + +register_option( + "dt_did", + "Datatoken DID", + backend="ocean", +) + +register_option( + "dt_pool", + "Pool address for datatoken", + backend="ocean", +) + +register_option( + "algo_tag", + "Tag to refer to algorithm by", + backend="ocean", +) + +register_option( + "algo_version", + "Version of algorithm", + backend="ocean", +) + +register_option( + "algo_url", + "URL where Algorithm is stored", + backend="ocean", +) + +register_option( + "algo_name", + "Name of algorithm", + backend="ocean", +) + +register_option( + "author", + "Author of algorithm", + backend="ocean", +) + +register_option( + "licence", + "Algorithm Licence", + backend="ocean", +) + +register_option( + "max_dt_price", + "Maximum price willing to spend on datatokens.", + backend="ocean", +) + + + diff --git a/sameproject/ops/ocean/render.py b/sameproject/ops/ocean/render.py new file mode 100644 index 00000000..c9db0ce1 --- /dev/null +++ b/sameproject/ops/ocean/render.py @@ -0,0 +1,64 @@ +from jinja2 import Environment, FileSystemLoader, select_autoescape +from pathlib import Path +from typing import Tuple +from uuid import uuid4 +from base64 import urlsafe_b64encode +import logging +import os +import time + +from sameproject.data.step import Step +from sameproject.ops import helpers +import sameproject.ops.explode + +ocean_step_template = "step.jinja" + + +def render(compile_path: str, steps: list, same_config: dict) -> Tuple[Path, str]: + """Renders the notebook into a root file and a series of step files according to the target requirements. Returns an absolute path to the root file for deployment.""" + + templateDir = os.path.dirname(os.path.abspath(__file__)) + templateLoader = FileSystemLoader(templateDir) + env = Environment(trim_blocks=True, loader=templateLoader) + + root_file_string = _build_step_file(env, next(iter(steps.values())), same_config) + root_pipeline_name = f"root_pipeline_{uuid4().hex.lower()}" + root_path = Path(compile_path) / f"{root_pipeline_name}.py" + helpers.write_file(root_path, root_file_string) + + # for storing in the docker image + docker_path = same_config['notebook']['path'][:-5] + 'py' + helpers.write_file(docker_path, root_file_string) + os.remove(same_config['notebook']['path']) + return (compile_path, root_file_string) # note: root_file_string replaced root_pipeline_name + +def _build_step_file(env: Environment, step: Step, same_config) -> str: + with open(sameproject.ops.explode.__file__, "r") as f: + explode_code = f.read() + + requirements_file = None + if "requirements_file" in step: + requirements_file = urlsafe_b64encode(bytes(step.requirements_file, "utf-8")).decode() + + memory_limit = same_config.runtime_options.get( + "serialisation_memory_limit", + 512 * 1024 * 1024, # 512MB + ) + + same_env = same_config.runtime_options.get( + "same_env", + "default", + ) + + step_contract = { + "name": step.name, + "same_env": same_env, + "memory_limit": memory_limit, + "unique_name": step.unique_name, + "requirements_file": requirements_file, + "user_code": step.code, + "explode_code": urlsafe_b64encode(bytes(explode_code, "utf-8")).decode(), + "same_yaml": urlsafe_b64encode(bytes(same_config.to_yaml(), "utf-8")).decode(), + } + + return env.get_template(ocean_step_template).render(step_contract) \ No newline at end of file diff --git a/sameproject/ops/ocean/root.jinja b/sameproject/ops/ocean/root.jinja new file mode 100644 index 00000000..e69de29b diff --git a/sameproject/ops/ocean/script.py b/sameproject/ops/ocean/script.py new file mode 100644 index 00000000..7bd54ba6 --- /dev/null +++ b/sameproject/ops/ocean/script.py @@ -0,0 +1,200 @@ +"""Boilerplate Ocean publishing and running c2d""" + +import os +import _init_paths +from ocean_lib.data_provider.data_service_provider import DataServiceProvider +from ocean_lib.common.agreements.service_types import ServiceTypes +from ocean_lib.web3_internal.constants import ZERO_ADDRESS +from ocean_lib.web3_internal.currency import to_wei +from ocean_lib.web3_internal.wallet import Wallet +from ocean_lib.assets import trusted_algorithms +from ocean_lib.services.service import Service +from ocean_lib.models.btoken import BToken #BToken is ERC20 +from ocean_lib.ocean.ocean import Ocean +from ocean_lib.config import Config + + +config = Config('config.ini') # Ocean requires a config file with network, metadata, block, and provider info +ocean = Ocean(config) +OCEAN_token = BToken(ocean.web3, ocean.OCEAN_address) +provider_url = DataServiceProvider.get_url(ocean.config) + + +""" +Algorithm publishing + +Requirements: + +- Model script on GitHub +- wallet private key as environment variable +- dataset we want to train on specified +- model metadata (name, date, compute, etc.) +""" + +wallet = Wallet(ocean.web3, os.getenv('TEST_PRIVATE_KEY1'), transaction_timeout=20, block_confirmations=config.block_confirmations) +print(f"wallet.address = '{wallet.address}'") +assert wallet.web3.eth.get_balance(wallet.address) > 0, "need ETH" + + +# Publish ALG datatoken +ALG_datatoken = ocean.create_data_token('ALG1', 'ALG1', wallet, blob=ocean.config.metadata_cache_uri) +ALG_datatoken.mint(wallet.address, to_wei(100), wallet) +print(f"ALG_datatoken.address = '{ALG_datatoken.address}'") + +# Specify metadata and service attributes, for "GPR" algorithm script. +# In same location as Branin test dataset. GPR = Gaussian Process Regression. +ALG_metadata = { + "main": { + "type": "algorithm", + "algorithm": { + "language": "python", + "format": "docker-image", + "version": "0.1", # project-specific + "container": { + "entrypoint": "python $ALGO", + "image": "oceanprotocol/algo_dockers", + "tag": "python-branin" # project-specific + } + }, + "files": [ + { + "url": "https://raw.githubusercontent.com/trentmc/branin/main/gpr.py", # project-specific + "index": 0, + "contentType": "text/text", + } + ], + "name": "gpr", "author": "Trent", "license": "CC0", # project-specific + "dateCreated": "2020-01-28T10:55:11Z" # project-specific + } +} +ALG_service_attributes = { + "main": { + "name": "ALG_dataAssetAccessServiceAgreement", + "creator": wallet.address, + "timeout": 3600 * 24, + "datePublished": "2020-01-28T10:55:11Z", + "cost": 1.0, # + } + } + +# Calc ALG service access descriptor. We use the same service provider as DATA +ALG_access_service = Service( + service_endpoint=provider_url, + service_type=ServiceTypes.CLOUD_COMPUTE, + attributes=ALG_service_attributes +) + +# Publish metadata and service info on-chain +ALG_ddo = ocean.assets.create( + metadata=ALG_metadata, # {"main" : {"type" : "algorithm", ..}, ..} + publisher_wallet=wallet, + services=[ALG_access_service], + data_token_address=ALG_datatoken.address) + +trusted_algorithms.add_publisher_trusted_algorithm('DATA_ddo', ALG_ddo.did, config.metadata_cache_uri) # project-specific +ocean.assets.update('DATA_ddo', publisher_wallet=wallet) # project-specific + +""" +Datatoken buying + +Requirements: +- wallet from previous step +- datatoken DID and pool address +""" + +did = 'SPECIFY' +pool_address = 'SPECIFY' + +wallet = Wallet(ocean.web3, private_key=private_key, transaction_timeout=20, block_confirmations=0) +assert wallet is not None, "Wallet error, initialize app again" +# Get asset, datatoken_address +asset = ocean.assets.resolve(did) +data_token_address = f'0x{did[7:]}' + +print('Executing Transaction') +#my wallet +print(f"Environment Wallet Address = '{wallet.address}'") +print(f"Wallet OCEAN = {pretty_ether_and_wei(OCEAN_token.balanceOf(wallet.address))}") +print(f"Wallet ETH = {pretty_ether_and_wei(ocean.web3.eth.get_balance(wallet.address))}") +#Verify that Bob has ETH +assert ocean.web3.eth.get_balance(wallet.address) > 0, "need test ETH" +#Verify that Bob has OCEAN +assert OCEAN_token.balanceOf(wallet.address) > 0, "need test OCEAN" +# print(f"I have {pretty_ether_and_wei(data_token.balanceOf(wallet.address), data_token.symbol())}.") +# assert data_token.balanceOf(wallet.address) >= to_wei(1), "Bob didn't get 1.0 datatokens" +#Bob points to the service object +fee_receiver = ZERO_ADDRESS # could also be market address +#Bob buys 1.0 datatokens - the amount needed to consume the dataset. +data_token = ocean.get_data_token(data_token_address) +print('Buying Data Token') +ocean.pool.buy_data_tokens( + pool_address, + amount=to_wei(1), # buy 1.0 datatoken + max_OCEAN_amount=to_wei(10), # pay up to 10.0 OCEAN + from_wallet=wallet +) +print(f"I have {pretty_ether_and_wei(data_token.balanceOf(wallet.address), data_token.symbol())}.") + + +""" +Running C2D +""" + +DATA_did = DATA_ddo.did # for convenience +ALG_did = ALG_ddo.did +DATA_DDO = ocean.assets.resolve(DATA_did) # make sure we operate on the updated and indexed metadata_cache_uri versions +ALG_DDO = ocean.assets.resolve(ALG_did) + +compute_service = DATA_DDO.get_service('compute') +algo_service = ALG_DDO.get_service('access') + +from ocean_lib.web3_internal.constants import ZERO_ADDRESS +from ocean_lib.models.compute_input import ComputeInput + +# order & pay for dataset +dataset_order_requirements = ocean.assets.order( + DATA_did, wallet.address, service_type=compute_service.type +) + +DATA_order_tx_id = ocean.assets.pay_for_service( + ocean.web3, + dataset_order_requirements.amount, + dataset_order_requirements.data_token_address, + DATA_did, + compute_service.index, + ZERO_ADDRESS, + wallet, + dataset_order_requirements.computeAddress, + ) + +# order & pay for algo +algo_order_requirements = ocean.assets.order( + ALG_did, wallet.address, service_type=algo_service.type +) +ALG_order_tx_id = ocean.assets.pay_for_service( + ocean.web3, + algo_order_requirements.amount, + algo_order_requirements.data_token_address, + ALG_did, + algo_service.index, + ZERO_ADDRESS, + wallet, + algo_order_requirements.computeAddress, +) + +compute_inputs = [ComputeInput(DATA_did, DATA_order_tx_id, compute_service.index)] +job_id = ocean.compute.start( + compute_inputs, + wallet, + algorithm_did=ALG_did, + algorithm_tx_id=ALG_order_tx_id, + algorithm_data_token=ALG_datatoken.address +) +print(f"Started compute job with id: {job_id}") + +# for monitoring C2D status +print(ocean.compute.status(DATA_did, job_id, wallet)) + +# retrieving result +result = ocean.compute.result_file(DATA_did, job_id, 0, wallet) # 0 index, means we retrieve the results from the first dataset index + diff --git a/sameproject/ops/ocean/step.jinja b/sameproject/ops/ocean/step.jinja new file mode 100644 index 00000000..851d636d --- /dev/null +++ b/sameproject/ops/ocean/step.jinja @@ -0,0 +1,12 @@ +{% autoescape off %} + +# User code for step, which we run in its own execution frame. +# Run the user's notebook code: +def root(): +{% filter indent(width=4) %} + {{ user_code }} +{% endfilter %} + +if __name__ == "__main__": + root() +{% endautoescape %} diff --git a/sameproject/vendor/conda b/sameproject/vendor/conda index d8ddda5e..0adcd595 160000 --- a/sameproject/vendor/conda +++ b/sameproject/vendor/conda @@ -1 +1 @@ -Subproject commit d8ddda5e1df107a7437884353e868678b6e7042b +Subproject commit 0adcd595c97d0c4e3b2645aebd50ded8d771d5eb diff --git a/test/conftest.py b/test/conftest.py index c548d3ce..5be3fd5c 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -23,6 +23,11 @@ "default": False, "help": "include pytorch, tensorflow and sklearn notebooks in backend tests", }], + ["ocean", { + "action": "store_true", + "default": False, + "help": "run ocean backend tests, requires ocean installation", + }], ] diff --git a/test/ops/ocean/test_ocean.py b/test/ops/ocean/test_ocean.py new file mode 100644 index 00000000..287e4fb1 --- /dev/null +++ b/test/ops/ocean/test_ocean.py @@ -0,0 +1,11 @@ +from sameproject.ops.backends import deploy +import test.testdata +import pytest +import yaml + + +@pytest.mark.ocean +@test.testdata.notebooks("oceandata") +def test_ocean_deploy(config, notebook, requirements, validation_fn): + deployment = deploy("ocean", "", "", config) + assert deployment == b'' \ No newline at end of file diff --git a/test/testdata/__init__.py b/test/testdata/__init__.py index e4d42885..f5162105 100644 --- a/test/testdata/__init__.py +++ b/test/testdata/__init__.py @@ -38,6 +38,8 @@ def notebooks(*args) -> Callable: entries.append(entry) if len(entries) == 0: + print(*args) + # print(_registry) raise Exception("Attempted to fetch non-existent testdata groups '{args}'.") return _get_decorator(entries) @@ -331,3 +333,11 @@ def _validate_features_datasets(res): "tensorflow", Path(__file__).parent / "tensorflow/variational_auto_encoder/same.yaml", ) + +# A selection of Ocean notebooks +_register_notebook( + "arithmetic", + "Returns a string.", + "oceandata", + Path(__file__).parent / "oceandata/arithmetic/same.yaml", +) \ No newline at end of file diff --git a/test/testdata/oceandata/arithmetic/arithmetic.ipynb b/test/testdata/oceandata/arithmetic/arithmetic.ipynb new file mode 100644 index 00000000..f5ccf5a8 --- /dev/null +++ b/test/testdata/oceandata/arithmetic/arithmetic.ipynb @@ -0,0 +1,21 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"Hello\")" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/test/testdata/oceandata/arithmetic/requirements.txt b/test/testdata/oceandata/arithmetic/requirements.txt new file mode 100644 index 00000000..e69de29b diff --git a/test/testdata/oceandata/arithmetic/same.yaml b/test/testdata/oceandata/arithmetic/same.yaml new file mode 100644 index 00000000..5e447682 --- /dev/null +++ b/test/testdata/oceandata/arithmetic/same.yaml @@ -0,0 +1,30 @@ +apiVersion: sameproject.ml/v1alpha1 +environments: + default: + image_tag: combinatorml/jupyterlab-tensorflow-opencv:0.9 +metadata: + labels: [] + name: default_config + version: 0.0.0 +notebook: + name: arithmetic + path: arithmetic.ipynb + requirements: requirements.txt +run: + name: default_config run +runtime_options: + algo_verified: False + algo_pushed: True + network: https://rinkeby.infura.io/v3/d163c48816434b0bbb3ac3925d6c6c80 + provider_address: '0x00bd138abd70e2f00903268f3db08f2d25677c9e' + wallet_private_key: '0xef4b441145c1d0f3b4bc6d61d29f5c6e502359481152f869247c7a4244d45209' + dt_did: 'did:op:d29293A09B8e5871b9028fc3CE232963050E9f69' + dt_pool: '0x35e256beA68eca220e0A71f14f2e1D5924bcf4a0' + algo_did: 'did:op:46Dcfe856CcBe6C9ebF40753360F18A914F04Ca7' + algo_tag: ari + algo_version: "1.0.0" + algo_name: arithmetic + author: Algovera + licence: MIT + max_dt_price: 20 + algo_url: "https://raw.githubusercontent.com/smejak/test/main/test.py" diff --git a/vendor/conda b/vendor/conda index 0b1312ce..0adcd595 160000 --- a/vendor/conda +++ b/vendor/conda @@ -1 +1 @@ -Subproject commit 0b1312ce65bf0fbb8cbea3750f07c32a2492c57a +Subproject commit 0adcd595c97d0c4e3b2645aebd50ded8d771d5eb