|
| 1 | +"""Executes a job locally for testing purposes. |
| 2 | +
|
| 3 | +To run this script use the following command: |
| 4 | +
|
| 5 | +``` |
| 6 | +invoke nbshell \ |
| 7 | + --plain \ |
| 8 | + --file development/run_example_job.py \ |
| 9 | + --env RUN_SSOT_TARGET_JOB=False \ |
| 10 | + --env RUN_SSOT_JOB_DRY_RUN=True |
| 11 | +``` |
| 12 | +
|
| 13 | +Passing environment variables to the script is optional. The script will default to running the data source job with a dry run enabled. |
| 14 | +""" |
| 15 | + |
| 16 | +import json |
| 17 | +import os |
| 18 | + |
| 19 | +from django.core.management import call_command |
| 20 | +from nautobot.core.settings_funcs import is_truthy |
| 21 | +from nautobot.extras.choices import SecretsGroupAccessTypeChoices |
| 22 | +from nautobot.extras.choices import SecretsGroupSecretTypeChoices |
| 23 | +from nautobot.extras.models import ExternalIntegration |
| 24 | +from nautobot.extras.models import Job |
| 25 | +from nautobot.extras.models import Secret |
| 26 | +from nautobot.extras.models import SecretsGroup |
| 27 | +from nautobot.extras.models import SecretsGroupAssociation |
| 28 | + |
| 29 | +_TOKEN = 40 * "a" |
| 30 | +os.environ["NAUTOBOT_DEMO_TOKEN"] = _TOKEN |
| 31 | + |
| 32 | +_NAUTOBOT_DEMO_URL = "https://demo.nautobot.com" |
| 33 | +_DRY_RUN = is_truthy(os.getenv("RUN_SSOT_JOB_DRY_RUN", "True")) |
| 34 | + |
| 35 | +module_name = "nautobot_ssot.jobs.examples" |
| 36 | +is_target_job = is_truthy(os.getenv("RUN_SSOT_TARGET_JOB", "False")) |
| 37 | +job_class_name = "ExampleDataTarget" if is_target_job else "ExampleDataSource" |
| 38 | + |
| 39 | +job = Job.objects.get(module_name=module_name, job_class_name=job_class_name) |
| 40 | +if not job.enabled: |
| 41 | + job.enabled = True |
| 42 | + job.validated_save() |
| 43 | + |
| 44 | +nautobot_demo, created = ExternalIntegration.objects.get_or_create( |
| 45 | + name="Nautobot Demo", |
| 46 | + defaults={ |
| 47 | + "remote_url": _NAUTOBOT_DEMO_URL, |
| 48 | + "verify_ssl": False, |
| 49 | + }, |
| 50 | +) |
| 51 | + |
| 52 | +if created: |
| 53 | + secret = Secret.objects.create( |
| 54 | + name="nautobot-demo-token", |
| 55 | + provider="environment-variable", |
| 56 | + parameters={"variable": "NAUTOBOT_DEMO_TOKEN"}, |
| 57 | + ) |
| 58 | + secrets_group = SecretsGroup.objects.create(name="Nautobot Demo Group") |
| 59 | + SecretsGroupAssociation.objects.create( |
| 60 | + secret=secret, |
| 61 | + secrets_group=secrets_group, |
| 62 | + access_type=SecretsGroupAccessTypeChoices.TYPE_HTTP, |
| 63 | + secret_type=SecretsGroupSecretTypeChoices.TYPE_TOKEN, |
| 64 | + ) |
| 65 | + nautobot_demo.secrets_group = secrets_group |
| 66 | + nautobot_demo.validated_save() |
| 67 | + |
| 68 | +data: dict = { |
| 69 | + "debug": True, |
| 70 | + "dryrun": _DRY_RUN, |
| 71 | + "memory_profiling": False, |
| 72 | +} |
| 73 | + |
| 74 | +if is_target_job: |
| 75 | + data["target"] = str(nautobot_demo.pk) |
| 76 | + data["target_url"] = _NAUTOBOT_DEMO_URL |
| 77 | + data["target_token"] = _TOKEN |
| 78 | +else: |
| 79 | + data["source"] = str(nautobot_demo.pk) |
| 80 | + data["source_url"] = _NAUTOBOT_DEMO_URL |
| 81 | + data["source_token"] = _TOKEN |
| 82 | + |
| 83 | +call_command( |
| 84 | + "runjob", |
| 85 | + f"{module_name}.{job_class_name}", |
| 86 | + data=json.dumps(data), |
| 87 | + username="admin", |
| 88 | + local=True, # Enable to run the job locally (not as a celery task) |
| 89 | +) |
0 commit comments