Skip to content

[RFC] Add Job CRUDL(Create, Retrieve, Update, Delete and List) REST APIs to Support Micro-service Architecture #839

@cliu123

Description

@cliu123

Is your feature request related to a problem?

Today, JS plugin is an extensible plugin. It defines interfaces, and other plugins defines the plugin-specific jobs by implementing the interfaces. This works perfect as-is when JS plugin runs in a same process as other plugins on a same node. But the JS cannot be run as a centralized micro-service.

What solution would you like?

When this feature is ready, it starts a path to running job scheduler plugin as a micro-service. Today, job scheduler plugin defines an ScheduledJobParameter interface and allow extending plugins to define the plugin-specific jobs by extending the interface. Essentially, extending plugins create jobs with index operations directly on the system indices. As a micro-service, JS plugin needs to expose and handle REST APIs. Job CRUDL APIs will be a good start point.

APIs High-level Design

The Job Scheduler CRUDL APIs are designed as an abstraction layer that supports multiple metadata store implementations.

Image

APIs Low-level Design

Base path: /_plugins/_job_scheduler/api

Create Job (Store-Agnostic)

Endpoint:
POST /_plugins/_job_scheduler/api/jobs

Note: job_type corresponds to the plugin-specified job type (e.g., "ml_batch_task_update", "scheduler_sample_extension")

Request:

POST /_plugins/_job_scheduler/api/jobs HTTP/1.1
Content-Type: application/json

{
  "type": "ml_batch_task_update"
  "name": "daily_inference_task_polling",
  "tenant_id": "arn:aws:opensearch:us-west-2:123456789012:domain/test-domain",
  "enabled": true,
  "schedule": {
    "interval": {
      "start_time": "2024-01-01T01:00:00Z",
      "period": 5,
      "unit": "Minutes"
    }
  },
  "jitter": 0.1,
  "lock_duration_seconds": 1800,
  // Plugin-specific parameters
  "parameters": {
    "custom_field1": "value1",
    "custom_field2": "value2"
  }
}

Example response:

HTTP/1.1 200
Content-type: application/json

{
  "id": "wojfowjfowejfo"
  "type": "ml_batch_task_update"
  "name": "daily_inference_task_polling",
  "tenant_id": "arn:aws:opensearch:us-west-2:123456789012:domain/test-domain",
  "enabled": true,
  "schedule": {
    "interval": {
      "start_time": "2024-01-01T01:00:00Z",
      "period": 5,
      "unit": "Minutes"
    }
  },
  "jitter": 0.1,
  "last_update_time": "2024-01-01T00:00:00Z",
  "lock_duration_seconds": 1800,
  // Plugin-specific parameters
  "parameters": {
    "custom_field1": "value1",
    "custom_field2": "value2"
  }
}

Get Job (Store-Agnostic)

Endpoint:
GET /_plugins/_job_scheduler/api/jobs/{id}

Request:

GET /_plugins/_job_scheduler/api/jobs/{id} HTTP/1.1

Example response:

HTTP/1.1 200
Content-type: application/json

{
  "id": "wojfowjfowejfo",
  "tenant_id": "arn:aws:opensearch:us-west-2:123456789012:collection/test-collection",
  "job_type": "ml_batch_task_update",
  "name": "daily-batch-inference",
  "enabled": true,
  "schedule": {
    "cron": {
      "expression": "0 0 2 * * ?",
      "timezone": "America/Los_Angeles"
    }
  },
  "enabled_time": "2024-01-01T10:00:00Z",
  "last_update_time": "2024-01-01T10:00:00Z",
  "lock_duration_seconds": 1800,
  "jitter": 0.05,
  "parameters": {
    "model_id": "text-embedding-model-v2",
    "batch_size": 500,
    "timeout": "30m",
    "input_data_source": "s3://xyz_bucket/input/",
    "output_data_source": "s3://xyz_bucket/output/",
  }
}

Update Job (Store-Agnostic)

Endpoint:

PUT /_plugins/_job_scheduler/api/jobs/{id}
Content-Type: application/json
{
  "enabled": false/true,
  "schedule": { ... },
  "parameters": { ... }
}

Example request and response:

curl -X PUT "localhost:9200/_plugins/_job_scheduler/api/jobs/wojfowjfowejfo" \
  -H "Content-Type: application/json" \
  -d '{
    "enabled": false,
    "parameters": {
      "batch_size": 1000,
      "timeout": "45m"
    }
  }'

# Response:
HTTP/1.1 200
Content-type: application/json

{
  "id": "wojfowjfowejfo",
  "tenant_id": "arn:aws:opensearch:us-west-2:123456789012:collection/test-collection",
  "job_type": "ml_batch_task_update",
  "name": "daily-batch-inference",
  "enabled": false,
  "schedule": {
    "cron": {
      "expression": "0 0 2 * * ?",
      "timezone": "America/Los_Angeles"
    }
  },
  "enabled_time": "2024-01-01T10:00:00Z",
  "last_update_time": "2024-01-01T10:00:00Z",
  "lock_duration_seconds": 1800,
  "jitter": 0.05,
  "parameters": {
    "model_id": "text-embedding-model-v2",
    "batch_size": 1000,
    "timeout": "45m",
    ...
  }
}

Delete Job (Store-Agnostic)

Endpoint:
DELETE /_plugins/_job_scheduler/api/jobs/{id}

Example response:

HTTP/1.1 200
Content-type: application/json

{
  "acknowledged": true,
  "id": "wojfowjfowejfo",
  "message": "Job deleted successfully"
}

List Job (Store-Agnostic)

Global or per job type, with filtering and pagination.

Endpoints:

GET /_plugins/_job_scheduler/api/jobs

Example request and response:

curl -X GET "localhost:9200/_plugins/_job_scheduler/api/jobs?status=enabled&tenant_id=arn:aws:opensearch:us-west-2:123456789012:collection/test-collection&next_token=eyJ0eXBlIjoiZHluYW1vZGIiLCJsYXN0X2V2YWx1YXRlZF9rZXkiOnsiam9iX2lkIjp7InMiOiJqb2ItMTIzIn19fQ=="

Example response(Response includes store-agnostic pagination):

HTTP/1.1 200
Content-type: application/json

{
  "tenant_id": "arn:aws:opensearch:us-west-2:123456789012:collection/test-collection",
  "total_jobs": 1250,
  "jobs": [...],
  "next_token": null,
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    Status

    🆕 New

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions