diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 818e932..2d78d50 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -16,7 +16,7 @@ ENV HOME="/root" # -------------------------------------- # Need to add the devcontainer workspace folder as a safe directory to enable git # version control system to be enabled in the containers file system. -RUN git config --global --add safe.directory "/workspaces/plugin-template" +RUN git config --global --add safe.directory "/workspaces/plugin-documentdb" # -------------------------------------- # -------------------------------------- @@ -53,7 +53,7 @@ ENV PATH="$PATH:$JAVA_HOME/bin" # Will load a custom configuration file for Micronaut ENV MICRONAUT_ENVIRONMENTS=local,override # Sets the path where you save plugins as Jar and is loaded during the startup process -ENV KESTRA_PLUGINS_PATH="/workspaces/plugin-template/local/plugins" +ENV KESTRA_PLUGINS_PATH="/workspaces/plugin-documentdb/local/plugins" # -------------------------------------- # -------------------------------------- diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index b5184d6..7d9a650 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,10 +1,10 @@ { - "name": "plugin-template", + "name": "plugin-documentdb", "build": { "context": ".", "dockerfile": "Dockerfile" }, - "workspaceFolder": "/workspaces/plugin-template", + "workspaceFolder": "/workspaces/plugin-documentdb", "forwardPorts": [8080], "customizations": { "vscode": { diff --git a/.github/setup-unit.sh b/.github/setup-unit.sh new file mode 100755 index 0000000..0dad29e --- /dev/null +++ b/.github/setup-unit.sh @@ -0,0 +1,130 @@ +#!/bin/bash + +# Setup script for Kestra DocumentDB Plugin Unit Tests +# This script sets up a local DocumentDB instance for testing both locally and in CI + +set -e + +echo "๐Ÿณ Setting up DocumentDB for unit tests..." + +# Check if Docker and Docker Compose are available +if ! command -v docker &> /dev/null; then + echo "โŒ Docker is not installed or not in PATH" + exit 1 +fi + +# Check for Docker Compose (both v1 and v2) +if ! command -v docker-compose &> /dev/null && ! command -v docker compose &> /dev/null; then + echo "โŒ Docker Compose is not installed or not in PATH" + exit 1 +fi + +# Use docker compose (v2) if available, otherwise fall back to docker-compose (v1) +if command -v docker compose &> /dev/null; then + DC_CMD="docker compose" + COMPOSE_FILE="docker-compose-ci.yml" +else + DC_CMD="docker-compose" + COMPOSE_FILE="docker-compose-ci.yml" +fi + +# Stop and remove any existing containers +echo "๐Ÿงน Cleaning up existing containers..." +$DC_CMD -f $COMPOSE_FILE down -v --remove-orphans || true + +# Start MongoDB and DocumentDB API containers +echo "๐Ÿš€ Starting MongoDB and DocumentDB API containers..." +$DC_CMD -f $COMPOSE_FILE up -d --build + +# Wait for containers to start +echo "โณ Waiting for containers to start..." +timeout=120 +elapsed=0 +while ! $DC_CMD -f $COMPOSE_FILE ps | grep -q "mongodb.*Up"; do + if [ $elapsed -ge $timeout ]; then + echo "โŒ MongoDB container failed to start within ${timeout} seconds" + $DC_CMD -f $COMPOSE_FILE logs mongodb + exit 1 + fi + sleep 5 + elapsed=$((elapsed + 5)) + echo "โณ Still waiting for MongoDB container... (${elapsed}/${timeout}s)" +done + +echo "โœ… MongoDB container is running" + +# Wait for DocumentDB API service to be ready +echo "โณ Waiting for DocumentDB API service to respond..." +timeout=120 +elapsed=0 +while ! curl -f -s http://localhost:10260/health &> /dev/null; do + if [ $elapsed -ge $timeout ]; then + echo "โŒ DocumentDB API service failed to respond within ${timeout} seconds" + echo "API Server logs:" + $DC_CMD -f $COMPOSE_FILE logs documentdb-api + exit 1 + fi + sleep 5 + elapsed=$((elapsed + 5)) + echo "โณ Still waiting for API service... (${elapsed}/${timeout}s)" +done + +echo "โœ… DocumentDB API service is ready" + +# Create a test database and collection +echo "๐Ÿ—„๏ธ Creating test database and collection..." + +# Test database creation by inserting a test document +response=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -H "Authorization: Basic $(echo -n 'testuser:testpass' | base64)" \ + -d '{ + "database": "test_db", + "collection": "test_collection", + "document": {"_id": "test_doc", "message": "DocumentDB is ready for testing", "timestamp": "'$(date -Iseconds)'"} + }' \ + http://localhost:10260/data/v1/action/insertOne) + +echo "Insert response: $response" + +# Verify we can read from the database +echo "๐Ÿ“‹ Verifying database connectivity..." +read_response=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -H "Authorization: Basic $(echo -n 'testuser:testpass' | base64)" \ + -d '{ + "database": "test_db", + "collection": "test_collection", + "filter": {"_id": "test_doc"} + }' \ + http://localhost:10260/data/v1/action/find) + +echo "Read response: $read_response" + +if echo "$read_response" | grep -q "DocumentDB is ready"; then + echo "โœ… Test database and collection created successfully" +else + echo "โš ๏ธ Database setup completed, but verification response unclear" + echo "This is normal - tests will handle actual connectivity" +fi + +# Show status +echo "๐Ÿ“Š Container status:" +$DC_CMD -f $COMPOSE_FILE ps + +echo "" +echo "๐ŸŽ‰ Setup complete!" +echo "" +echo "๐Ÿ“‹ Connection details:" +echo " URL: http://localhost:10260" +echo " Username: testuser" +echo " Password: testpass" +echo " Test Database: test_db" +echo " Test Collection: test_collection" +echo "" +echo "๐Ÿงช You can now run tests with:" +echo " ./gradlew test" +echo " DOCUMENTDB_INTEGRATION_TESTS=true ./gradlew test" +echo "" +echo "๐Ÿ›‘ To stop the services:" +echo " $DC_CMD -f $COMPOSE_FILE down" \ No newline at end of file diff --git a/.gitignore b/.gitignore index 4be6e5d..976b3c5 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ Thumbs.db build/ target/ out/ +bin/ .idea .vscode *.iml @@ -12,5 +13,6 @@ out/ .project .settings .classpath +.factorypath .attach* src/test/resources/application-test.yml diff --git a/README.md b/README.md index 3da6d1f..23f2296 100644 --- a/README.md +++ b/README.md @@ -33,39 +33,229 @@

Get started with Kestra in 4 minutes.

-# Kestra Plugin Template +# Kestra DocumentDB Plugin -> A template for creating Kestra plugins +> Integrate with DocumentDB - Microsoft's open-source, MongoDB-compatible document database -This repository serves as a general template for creating a new [Kestra](https://github.com/kestra-io/kestra) plugin. It should take only a few minutes! Use this repository as a scaffold to ensure that you've set up the plugin correctly, including unit tests and CI/CD workflows. +This plugin provides integration with [DocumentDB](https://documentdb.io), Microsoft's open-source document database built on PostgreSQL. DocumentDB is now part of the Linux Foundation and offers MongoDB compatibility with the reliability and ecosystem of PostgreSQL. + +## Features + +- **Document Operations**: Insert single or multiple documents with automatic ID generation +- **Advanced Querying**: Find documents with MongoDB-style filters and pagination +- **Aggregation Pipelines**: Execute complex aggregation operations for data analysis +- **MongoDB Compatibility**: Use familiar MongoDB query syntax and operators +- **Flexible Output**: Support for FETCH, FETCH_ONE, STORE, and NONE output types +- **PostgreSQL Backend**: Built on the reliability and performance of PostgreSQL +- **Open Source**: Fully MIT-licensed with no vendor lock-in + +## Supported Operations + +| Operation | Description | Required Parameters | +|-----------|-------------|-------------------| +| `Insert` | Insert single or multiple documents | `host`, `database`, `collection`, `username`, `password`, `document` or `documents` | +| `Read` | Find documents with filtering and aggregation | `host`, `database`, `collection`, `username`, `password`, optional: `filter`, `aggregationPipeline`, `limit`, `skip` | ![Kestra orchestrator](https://kestra.io/video.gif) -## Running the project in local +## Quick Start + +### Basic Configuration + +All tasks require these basic connection parameters: + +```yaml +tasks: + - id: documentdb_task + type: io.kestra.plugin.documentdb.Insert + host: "https://my-documentdb-instance.com" # DocumentDB HTTP endpoint + database: "myapp" # Database name + collection: "users" # Collection name + username: "{{ secret('DOCUMENTDB_USERNAME') }}" # Username + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" # Password +``` + +### Example: Insert Single Document + +```yaml +id: insert_user +namespace: company.documentdb + +tasks: + - id: create_user + type: io.kestra.plugin.documentdb.Insert + host: "https://my-documentdb-instance.com" + database: "myapp" + collection: "users" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + document: + name: "John Doe" + email: "john.doe@example.com" + age: 30 + created_at: "{{ now() }}" + roles: ["user", "editor"] +``` + +### Example: Insert Multiple Documents + +```yaml +id: insert_products +namespace: company.documentdb + +tasks: + - id: create_products + type: io.kestra.plugin.documentdb.Insert + host: "https://my-documentdb-instance.com" + database: "inventory" + collection: "products" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + documents: + - name: "Laptop" + price: 999.99 + category: "Electronics" + in_stock: true + - name: "Mouse" + price: 29.99 + category: "Electronics" + in_stock: false + - name: "Desk" + price: 299.99 + category: "Furniture" + in_stock: true +``` + +### Example: Find Documents with Filters + +```yaml +id: find_active_users +namespace: company.documentdb + +tasks: + - id: query_users + type: io.kestra.plugin.documentdb.Read + host: "https://my-documentdb-instance.com" + database: "myapp" + collection: "users" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + filter: + status: "active" + age: + $gte: 18 + roles: + $in: ["editor", "admin"] + limit: 100 + fetchType: FETCH +``` + +### Example: Aggregation Pipeline + +```yaml +id: user_statistics +namespace: company.documentdb + +tasks: + - id: aggregate_users + type: io.kestra.plugin.documentdb.Read + host: "https://my-documentdb-instance.com" + database: "myapp" + collection: "users" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + aggregationPipeline: + - $match: + status: "active" + - $group: + _id: "$department" + count: { $sum: 1 } + avgAge: { $avg: "$age" } + - $sort: + count: -1 + fetchType: FETCH +``` + +### Example: Get Single Document + +```yaml +id: get_user +namespace: company.documentdb + +tasks: + - id: find_user + type: io.kestra.plugin.documentdb.Read + host: "https://my-documentdb-instance.com" + database: "myapp" + collection: "users" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + filter: + email: "john.doe@example.com" + fetchType: FETCH_ONE +``` + +## Installation + +Add this plugin to your Kestra instance: + +```bash +./kestra plugins install io.kestra.plugin:plugin-documentdb:LATEST +``` + +## Development + ### Prerequisites - Java 21 - Docker ### Running tests + +#### Integration Tests with Mock DocumentDB Server + +This plugin includes a test mock server (`api-server.py`) that simulates DocumentDB's REST API for testing purposes. The server bridges HTTP requests to MongoDB operations, providing a realistic testing environment without requiring a real DocumentDB instance. + +**Automatic Setup (Recommended):** +```bash +# Setup test environment and run tests +./.github/setup-unit.sh +./gradlew test ``` + +**Manual Setup:** +```bash +# Start DocumentDB mock server and MongoDB +docker-compose -f docker-compose-ci.yml up -d + +# Run tests ./gradlew check --parallel + +# Cleanup +docker-compose -f docker-compose-ci.yml down ``` -### Development +The mock server (`api-server.py`) provides: +- **DocumentDB REST API simulation**: Endpoints matching real DocumentDB HTTP API +- **MongoDB backend**: Uses MongoDB as the underlying database (DocumentDB-compatible) +- **Test authentication**: Uses `testuser:testpass` credentials for testing +- **Local endpoint**: Available at `http://localhost:10260` -`VSCode`: +**Test Environment Details:** +- **Mock API Server**: `http://localhost:10260` (simulates DocumentDB REST API) +- **MongoDB Instance**: `localhost:27017` (backend storage) +- **Test Credentials**: Username: `testuser`, Password: `testpass` +- **Test Database**: `test_db` -Follow the README.md within the `.devcontainer` folder for a quick and easy way to get up and running with developing plugins if you are using VSCode. +### Local Development -`Other IDEs`: +**VSCode**: Follow the README.md within the `.devcontainer` folder for development setup. +**Other IDEs**: +```bash +./gradlew shadowJar && docker build -t kestra-documentdb . && docker run --rm -p 8080:8080 kestra-documentdb server local ``` -./gradlew shadowJar && docker build -t kestra-custom . && docker run --rm -p 8080:8080 kestra-custom server local -``` -> [!NOTE] -> You need to relaunch this whole command everytime you make a change to your plugin -go to http://localhost:8080, your plugin will be available to use +Visit http://localhost:8080 to test your plugin. ## Documentation * Full documentation can be found under: [kestra.io/docs](https://kestra.io/docs) diff --git a/api-server.py b/api-server.py new file mode 100644 index 0000000..24a0a53 --- /dev/null +++ b/api-server.py @@ -0,0 +1,265 @@ +#!/usr/bin/env python3 +""" +Simple HTTP API server that simulates DocumentDB REST API endpoints +This bridges HTTP requests to MongoDB operations for testing +""" + +import os +import json +import base64 +from flask import Flask, request, jsonify +from pymongo import MongoClient +from pymongo.errors import PyMongoError +import logging + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +app = Flask(__name__) + +# MongoDB connection +MONGODB_URI = os.getenv('MONGODB_URI', 'mongodb://testuser:testpass@mongodb:27017/test_db?authSource=admin') +client = MongoClient(MONGODB_URI) + +def authenticate_request(): + """Simple authentication check""" + auth_header = request.headers.get('Authorization', '') + if not auth_header.startswith('Basic '): + return False + + try: + credentials = base64.b64decode(auth_header[6:]).decode('utf-8') + username, password = credentials.split(':', 1) + return username == 'testuser' and password == 'testpass' + except: + return False + +def get_collection(database_name, collection_name): + """Get MongoDB collection""" + db = client[database_name] + return db[collection_name] + +@app.route('/data/v1/action/insertOne', methods=['POST']) +def insert_one(): + """Insert a single document""" + if not authenticate_request(): + return jsonify({'error': 'Unauthorized'}), 401 + + try: + data = request.get_json() + database = data['database'] + collection_name = data['collection'] + document = data['document'] + + collection = get_collection(database, collection_name) + result = collection.insert_one(document) + + return jsonify({ + 'insertedId': str(result.inserted_id), + 'insertedCount': 1 + }) + + except Exception as e: + logger.error(f"Insert error: {e}") + return jsonify({'error': str(e)}), 500 + +@app.route('/data/v1/action/insertMany', methods=['POST']) +def insert_many(): + """Insert multiple documents""" + if not authenticate_request(): + return jsonify({'error': 'Unauthorized'}), 401 + + try: + data = request.get_json() + database = data['database'] + collection_name = data['collection'] + documents = data['documents'] + + collection = get_collection(database, collection_name) + result = collection.insert_many(documents) + + return jsonify({ + 'insertedIds': [str(id) for id in result.inserted_ids], + 'insertedCount': len(result.inserted_ids) + }) + + except Exception as e: + logger.error(f"Insert many error: {e}") + return jsonify({'error': str(e)}), 500 + +@app.route('/data/v1/action/find', methods=['POST']) +def find(): + """Find documents""" + if not authenticate_request(): + return jsonify({'error': 'Unauthorized'}), 401 + + try: + data = request.get_json() + database = data['database'] + collection_name = data['collection'] + filter_doc = data.get('filter', {}) + limit = data.get('limit') + skip = data.get('skip', 0) + + collection = get_collection(database, collection_name) + cursor = collection.find(filter_doc).skip(skip) + + if limit: + cursor = cursor.limit(limit) + + documents = [] + for doc in cursor: + # Convert ObjectId to string for JSON serialization + if '_id' in doc: + doc['_id'] = str(doc['_id']) + documents.append(doc) + + return jsonify({ + 'documents': documents + }) + + except Exception as e: + logger.error(f"Find error: {e}") + return jsonify({'error': str(e)}), 500 + +@app.route('/data/v1/action/aggregate', methods=['POST']) +def aggregate(): + """Execute aggregation pipeline""" + if not authenticate_request(): + return jsonify({'error': 'Unauthorized'}), 401 + + try: + data = request.get_json() + database = data['database'] + collection_name = data['collection'] + pipeline = data['pipeline'] + + collection = get_collection(database, collection_name) + cursor = collection.aggregate(pipeline) + + documents = [] + for doc in cursor: + # Convert ObjectId to string for JSON serialization + if '_id' in doc: + doc['_id'] = str(doc['_id']) + documents.append(doc) + + return jsonify({ + 'documents': documents + }) + + except Exception as e: + logger.error(f"Aggregate error: {e}") + return jsonify({'error': str(e)}), 500 + +@app.route('/data/v1/action/updateOne', methods=['POST']) +def update_one(): + """Update a single document""" + if not authenticate_request(): + return jsonify({'error': 'Unauthorized'}), 401 + + try: + data = request.get_json() + database = data['database'] + collection_name = data['collection'] + filter_criteria = data.get('filter', {}) + update_doc = data['update'] + + collection = get_collection(database, collection_name) + result = collection.update_one(filter_criteria, update_doc) + + return jsonify({ + 'matchedCount': result.matched_count, + 'modifiedCount': result.modified_count, + 'upsertedId': str(result.upserted_id) if result.upserted_id else None + }) + + except Exception as e: + logger.error(f"Update one error: {e}") + return jsonify({'error': str(e)}), 500 + +@app.route('/data/v1/action/updateMany', methods=['POST']) +def update_many(): + """Update multiple documents""" + if not authenticate_request(): + return jsonify({'error': 'Unauthorized'}), 401 + + try: + data = request.get_json() + database = data['database'] + collection_name = data['collection'] + filter_criteria = data.get('filter', {}) + update_doc = data['update'] + + collection = get_collection(database, collection_name) + result = collection.update_many(filter_criteria, update_doc) + + return jsonify({ + 'matchedCount': result.matched_count, + 'modifiedCount': result.modified_count + }) + + except Exception as e: + logger.error(f"Update many error: {e}") + return jsonify({'error': str(e)}), 500 + +@app.route('/data/v1/action/deleteOne', methods=['POST']) +def delete_one(): + """Delete a single document""" + if not authenticate_request(): + return jsonify({'error': 'Unauthorized'}), 401 + + try: + data = request.get_json() + database = data['database'] + collection_name = data['collection'] + filter_criteria = data.get('filter', {}) + + collection = get_collection(database, collection_name) + result = collection.delete_one(filter_criteria) + + return jsonify({ + 'deletedCount': result.deleted_count + }) + + except Exception as e: + logger.error(f"Delete one error: {e}") + return jsonify({'error': str(e)}), 500 + +@app.route('/data/v1/action/deleteMany', methods=['POST']) +def delete_many(): + """Delete multiple documents""" + if not authenticate_request(): + return jsonify({'error': 'Unauthorized'}), 401 + + try: + data = request.get_json() + database = data['database'] + collection_name = data['collection'] + filter_criteria = data.get('filter', {}) + + collection = get_collection(database, collection_name) + result = collection.delete_many(filter_criteria) + + return jsonify({ + 'deletedCount': result.deleted_count + }) + + except Exception as e: + logger.error(f"Delete many error: {e}") + return jsonify({'error': str(e)}), 500 + +@app.route('/health', methods=['GET']) +def health(): + """Health check endpoint""" + try: + client.admin.command('ping') + return jsonify({'status': 'healthy', 'message': 'DocumentDB API server is running'}) + except Exception as e: + return jsonify({'status': 'unhealthy', 'error': str(e)}), 500 + +if __name__ == '__main__': + logger.info("Starting DocumentDB API server...") + logger.info(f"MongoDB URI: {MONGODB_URI}") + app.run(host='0.0.0.0', port=10260, debug=True) \ No newline at end of file diff --git a/build.gradle b/build.gradle index eadd000..5c9a5b3 100644 --- a/build.gradle +++ b/build.gradle @@ -29,7 +29,7 @@ java { } group = "io.kestra.plugin" -description = 'Plugin template for Kestra' +description = 'DocumentDB plugin for Kestra' tasks.withType(JavaCompile).configureEach { options.encoding = "UTF-8" @@ -130,6 +130,9 @@ test { jacocoTestReport { dependsOn test + reports { + xml.required.set(true) + } } /**********************************************************************************************************************\ @@ -172,8 +175,8 @@ jar { manifest { attributes( "X-Kestra-Name": project.name, - "X-Kestra-Title": "Template", - "X-Kestra-Group": project.group + ".templates", + "X-Kestra-Title": "DocumentDB", + "X-Kestra-Group": project.group + ".documentdb", "X-Kestra-Description": project.description, "X-Kestra-Version": project.version ) diff --git a/docker-compose-ci.yml b/docker-compose-ci.yml new file mode 100644 index 0000000..d4229e0 --- /dev/null +++ b/docker-compose-ci.yml @@ -0,0 +1,46 @@ +version: '3.8' +services: + # Using MongoDB with HTTP interface since DocumentDB is MongoDB-compatible + mongodb: + image: mongo:7.0 + ports: + - "127.0.0.1:27017:27017" + environment: + - MONGO_INITDB_ROOT_USERNAME=testuser + - MONGO_INITDB_ROOT_PASSWORD=testpass + - MONGO_INITDB_DATABASE=test_db + volumes: + - mongodb-data:/data/db + healthcheck: + test: ["CMD", "mongosh", "--quiet", "--eval", "db.adminCommand('ping')"] + interval: 10s + timeout: 5s + retries: 30 + start_period: 30s + + # DocumentDB HTTP API Gateway (simulates DocumentDB REST API) + documentdb-api: + image: python:3.11-slim + depends_on: + mongodb: + condition: service_healthy + ports: + - "0.0.0.0:10260:10260" + volumes: + - ./api-server.py:/app/api-server.py:ro + working_dir: /app + command: > + sh -c " + pip install flask pymongo && + python api-server.py + " + environment: + - MONGODB_URI=mongodb://testuser:testpass@mongodb:27017/test_db?authSource=admin + +volumes: + mongodb-data: + driver: local + +networks: + default: + name: documentdb-network \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index aaa347f..8107386 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1 @@ -rootProject.name = 'plugin-template' +rootProject.name = 'plugin-documentdb' \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/documentdb/AbstractDocumentDBTask.java b/src/main/java/io/kestra/plugin/documentdb/AbstractDocumentDBTask.java new file mode 100644 index 0000000..8a0ebf8 --- /dev/null +++ b/src/main/java/io/kestra/plugin/documentdb/AbstractDocumentDBTask.java @@ -0,0 +1,58 @@ +package io.kestra.plugin.documentdb; + +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.Task; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +/** + * Abstract base class for DocumentDB tasks. + * Provides common connection properties shared across all DocumentDB operations. + */ +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +public abstract class AbstractDocumentDBTask extends Task { + + @Schema( + title = "DocumentDB host", + description = "The HTTP endpoint URL of your DocumentDB instance" + ) + @NotNull + protected Property host; + + @Schema( + title = "Database name", + description = "The name of the database" + ) + @NotNull + protected Property database; + + @Schema( + title = "Collection name", + description = "The name of the collection" + ) + @NotNull + protected Property collection; + + @Schema( + title = "Username", + description = "DocumentDB username for authentication" + ) + @NotNull + protected Property username; + + @Schema( + title = "Password", + description = "DocumentDB password for authentication" + ) + @NotNull + protected Property password; +} diff --git a/src/main/java/io/kestra/plugin/documentdb/Delete.java b/src/main/java/io/kestra/plugin/documentdb/Delete.java new file mode 100644 index 0000000..dde06b0 --- /dev/null +++ b/src/main/java/io/kestra/plugin/documentdb/Delete.java @@ -0,0 +1,169 @@ +package io.kestra.plugin.documentdb; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.models.tasks.Task; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.documentdb.models.DeleteResult; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import org.slf4j.Logger; + +import java.util.Map; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Delete documents from a DocumentDB collection", + description = "Delete one or more documents from a DocumentDB collection that match the filter criteria." +) +@Plugin( + examples = { + @Example( + title = "Delete a single user document", + full = true, + code = """ + id: delete_documentdb_user + namespace: company.documentdb + + tasks: + - id: delete_user + type: io.kestra.plugin.documentdb.Delete + host: "https://my-documentdb-instance.com" + database: "myapp" + collection: "users" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + filter: + email: "user.to.delete@example.com" + deleteMany: false + """ + ), + @Example( + title = "Delete multiple inactive documents", + full = true, + code = """ + id: cleanup_inactive_users + namespace: company.documentdb + + tasks: + - id: delete_inactive_users + type: io.kestra.plugin.documentdb.Delete + host: "https://my-documentdb-instance.com" + database: "myapp" + collection: "users" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + filter: + status: "inactive" + last_login: + $lt: "2022-01-01" + deleteMany: true + """ + ), + @Example( + title = "Delete documents by age criteria", + full = true, + code = """ + id: delete_old_logs + namespace: company.documentdb + + tasks: + - id: cleanup_old_logs + type: io.kestra.plugin.documentdb.Delete + host: "https://my-documentdb-instance.com" + database: "logging" + collection: "application_logs" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + filter: + created_at: + $lt: "{{ now() | dateAdd(-30, 'DAYS') }}" + level: + $in: ["DEBUG", "INFO"] + deleteMany: true + """ + ) + } +) +public class Delete extends AbstractDocumentDBTask implements RunnableTask { + + @Schema( + title = "Filter criteria", + description = "MongoDB-style filter to select which documents to delete. Example: {\"status\": \"inactive\", \"age\": {\"$gte\": 18}}" + ) + private Property> filter; + + @Schema( + title = "Delete multiple documents", + description = "If true, deletes all documents matching the filter (deleteMany). If false, deletes only the first match (deleteOne)." + ) + @Builder.Default + private Property deleteMany = Property.ofValue(false); + + @Override + public Output run(RunContext runContext) throws Exception { + Logger logger = runContext.logger(); + + // Render properties + String rHost = runContext.render(this.host).as(String.class).orElseThrow(); + String rDatabase = runContext.render(this.database).as(String.class).orElseThrow(); + String rCollection = runContext.render(this.collection).as(String.class).orElseThrow(); + String rUsername = runContext.render(this.username).as(String.class).orElseThrow(); + String rPassword = runContext.render(this.password).as(String.class).orElseThrow(); + Map rFilter = runContext.render(this.filter).asMap(String.class, Object.class); + Boolean rDeleteMany = runContext.render(this.deleteMany).as(Boolean.class).orElse(false); + + DocumentDBClient client = new DocumentDBClient(rHost, rUsername, rPassword, runContext); + + if (rDeleteMany) { + // Delete multiple documents + logger.info("Deleting multiple documents from DocumentDB database: {} collection: {}", rDatabase, rCollection); + + DeleteResult result = client.deleteMany(rDatabase, rCollection, rFilter); + + logger.info("Successfully deleted {} documents", result.getDeletedCount()); + + return Output.builder() + .deletedCount(result.getDeletedCount()) + .build(); + } else { + // Delete single document + logger.info("Deleting single document from DocumentDB database: {} collection: {}", rDatabase, rCollection); + + DeleteResult result = client.deleteOne(rDatabase, rCollection, rFilter); + + if (result.getDeletedCount() > 0) { + logger.info("Successfully deleted {} document", result.getDeletedCount()); + } else { + logger.info("No documents matched the filter criteria for deletion"); + } + + return Output.builder() + .deletedCount(result.getDeletedCount()) + .build(); + } + } + + @Builder + @Getter + public static class Output implements io.kestra.core.models.tasks.Output { + @Schema( + title = "Number of documents deleted", + description = "Total count of documents that were successfully deleted" + ) + private final Integer deletedCount; + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/documentdb/DocumentDBClient.java b/src/main/java/io/kestra/plugin/documentdb/DocumentDBClient.java new file mode 100644 index 0000000..512b5b0 --- /dev/null +++ b/src/main/java/io/kestra/plugin/documentdb/DocumentDBClient.java @@ -0,0 +1,445 @@ +package io.kestra.plugin.documentdb; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.kestra.core.http.HttpRequest; +import io.kestra.core.serializers.JacksonMapper; +import io.kestra.core.http.HttpResponse; +import io.kestra.core.http.client.HttpClient; +import io.kestra.core.http.client.HttpClientResponseException; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.documentdb.models.DocumentDBException; +import io.kestra.plugin.documentdb.models.DocumentDBRecord; +import io.kestra.plugin.documentdb.models.InsertResult; +import io.kestra.plugin.documentdb.models.UpdateResult; +import io.kestra.plugin.documentdb.models.DeleteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * HTTP client for interacting with DocumentDB REST API. + * Handles authentication, request building, and response parsing for MongoDB-compatible operations. + */ +public class DocumentDBClient { + + private static final Logger logger = LoggerFactory.getLogger(DocumentDBClient.class); + public static final int MAX_DOCUMENTS_PER_INSERT = 10; + + private final HttpClient httpClient; + private final ObjectMapper objectMapper; + private final String host; + private final String username; + private final String password; + + public DocumentDBClient(String host, String username, String password, RunContext runContext) throws Exception { + this.host = host; + this.username = username; + this.password = password; + this.objectMapper = JacksonMapper.ofJson(); + this.httpClient = HttpClient.builder() + .runContext(runContext) + .build(); + } + + /** + * Insert a single document into a collection. + */ + public InsertResult insertOne(String database, String collection, Map document) throws Exception { + String url = buildUrl("insertOne"); + + Map requestBody = Map.of( + "database", database, + "collection", collection, + "document", document + ); + + HttpRequest request = HttpRequest.builder() + .method("POST") + .uri(URI.create(url)) + .addHeader("Content-Type", "application/json") + .addHeader("Accept", "application/json") + .addHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes())) + .body(HttpRequest.JsonRequestBody.builder() + .content(requestBody) + .build()) + .build(); + + logger.debug("Making POST request to: {}", url); + + try { + HttpResponse response = httpClient.request(request, String.class); + return parseInsertResponse(response.getBody()); + } catch (HttpClientResponseException e) { + String statusCode = e.getResponse() != null ? String.valueOf(e.getResponse().getStatus().getCode()) : "unknown"; + String responseBody = e.getResponse() != null ? String.valueOf(e.getResponse().getBody()) : "unknown"; + throw new DocumentDBException("Failed to insert document: " + statusCode + " - " + responseBody); + } + } + + /** + * Insert multiple documents into a collection. + */ + public InsertResult insertMany(String database, String collection, List> documents) throws Exception { + if (documents.size() > MAX_DOCUMENTS_PER_INSERT) { + throw new IllegalArgumentException("Cannot insert more than " + MAX_DOCUMENTS_PER_INSERT + " documents at once"); + } + + String url = buildUrl("insertMany"); + + Map requestBody = Map.of( + "database", database, + "collection", collection, + "documents", documents + ); + + HttpRequest request = HttpRequest.builder() + .method("POST") + .uri(URI.create(url)) + .addHeader("Content-Type", "application/json") + .addHeader("Accept", "application/json") + .addHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes())) + .body(HttpRequest.JsonRequestBody.builder() + .content(requestBody) + .build()) + .build(); + + logger.debug("Making POST request to: {}", url); + + try { + HttpResponse response = httpClient.request(request, String.class); + return parseInsertResponse(response.getBody()); + } catch (HttpClientResponseException e) { + String statusCode = e.getResponse() != null ? String.valueOf(e.getResponse().getStatus().getCode()) : "unknown"; + String responseBody = e.getResponse() != null ? String.valueOf(e.getResponse().getBody()) : "unknown"; + throw new DocumentDBException("Failed to insert documents: " + statusCode + " - " + responseBody); + } + } + + /** + * Find documents in a collection with optional filter and limit. + */ + public List find(String database, String collection, Map filter, + Integer limit, Integer skip) throws Exception { + String url = buildUrl("find"); + + Map requestBody = new HashMap<>(); + requestBody.put("database", database); + requestBody.put("collection", collection); + + if (filter != null && !filter.isEmpty()) { + requestBody.put("filter", filter); + } + if (limit != null) { + requestBody.put("limit", limit); + } + if (skip != null) { + requestBody.put("skip", skip); + } + + HttpRequest request = HttpRequest.builder() + .method("POST") + .uri(URI.create(url)) + .addHeader("Content-Type", "application/json") + .addHeader("Accept", "application/json") + .addHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes())) + .body(HttpRequest.JsonRequestBody.builder() + .content(requestBody) + .build()) + .build(); + + logger.debug("Making POST request to: {}", url); + + try { + HttpResponse response = httpClient.request(request, String.class); + return parseFindResponse(response.getBody()); + } catch (HttpClientResponseException e) { + String statusCode = e.getResponse() != null ? String.valueOf(e.getResponse().getStatus().getCode()) : "unknown"; + String responseBody = e.getResponse() != null ? String.valueOf(e.getResponse().getBody()) : "unknown"; + throw new DocumentDBException("Failed to find documents: " + statusCode + " - " + responseBody); + } + } + + /** + * Execute an aggregation pipeline on a collection. + */ + public List aggregate(String database, String collection, List> pipeline) throws Exception { + String url = buildUrl("aggregate"); + + Map requestBody = Map.of( + "database", database, + "collection", collection, + "pipeline", pipeline + ); + + HttpRequest request = HttpRequest.builder() + .method("POST") + .uri(URI.create(url)) + .addHeader("Content-Type", "application/json") + .addHeader("Accept", "application/json") + .addHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes())) + .body(HttpRequest.JsonRequestBody.builder() + .content(requestBody) + .build()) + .build(); + + logger.debug("Making POST request to: {}", url); + + try { + HttpResponse response = httpClient.request(request, String.class); + return parseFindResponse(response.getBody()); + } catch (HttpClientResponseException e) { + String statusCode = e.getResponse() != null ? String.valueOf(e.getResponse().getStatus().getCode()) : "unknown"; + String responseBody = e.getResponse() != null ? String.valueOf(e.getResponse().getBody()) : "unknown"; + throw new DocumentDBException("Failed to execute aggregation: " + statusCode + " - " + responseBody); + } + } + + /** + * Build URL for DocumentDB HTTP API endpoint. + */ + private String buildUrl(String action) { + // Remove trailing slash if present + String baseUrl = host.endsWith("/") ? + host.substring(0, host.length() - 1) : host; + return baseUrl + "/data/v1/action/" + action; + } + + /** + * Parse insert response from DocumentDB API. + */ + private InsertResult parseInsertResponse(String responseBody) throws JsonProcessingException { + JsonNode jsonNode = objectMapper.readTree(responseBody); + + List insertedIds = new ArrayList<>(); + Integer insertedCount = 0; + + if (jsonNode.has("insertedId")) { + // Single insert + insertedIds.add(jsonNode.get("insertedId").asText()); + insertedCount = 1; + } else if (jsonNode.has("insertedIds")) { + // Multiple inserts + JsonNode idsNode = jsonNode.get("insertedIds"); + if (idsNode.isArray()) { + for (JsonNode idNode : idsNode) { + insertedIds.add(idNode.asText()); + } + } + insertedCount = insertedIds.size(); + } + + return new InsertResult(insertedIds, insertedCount); + } + + /** + * Parse find/aggregate response from DocumentDB API. + */ + private List parseFindResponse(String responseBody) throws JsonProcessingException { + JsonNode jsonNode = objectMapper.readTree(responseBody); + + List records = new ArrayList<>(); + JsonNode documentsNode = jsonNode.get("documents"); + + if (documentsNode != null && documentsNode.isArray()) { + for (JsonNode documentNode : documentsNode) { + String id = null; + if (documentNode.has("_id")) { + JsonNode idNode = documentNode.get("_id"); + if (idNode.has("$oid")) { + id = idNode.get("$oid").asText(); + } else { + id = idNode.asText(); + } + } + + Map fields = objectMapper.convertValue(documentNode, Map.class); + records.add(new DocumentDBRecord(id, fields)); + } + } + + return records; + } + + /** + * Update a single document in a collection. + */ + public UpdateResult updateOne(String database, String collection, Map filter, Map update) throws Exception { + String url = buildUrl("updateOne"); + + Map requestBody = new HashMap<>(); + requestBody.put("database", database); + requestBody.put("collection", collection); + if (filter != null && !filter.isEmpty()) { + requestBody.put("filter", filter); + } + requestBody.put("update", update); + + HttpRequest request = HttpRequest.builder() + .method("POST") + .uri(URI.create(url)) + .addHeader("Content-Type", "application/json") + .addHeader("Accept", "application/json") + .addHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes())) + .body(HttpRequest.JsonRequestBody.builder() + .content(requestBody) + .build()) + .build(); + + logger.debug("Making POST request to: {}", url); + + try { + HttpResponse response = httpClient.request(request, String.class); + return parseUpdateResponse(response.getBody()); + } catch (HttpClientResponseException e) { + String statusCode = e.getResponse() != null ? String.valueOf(e.getResponse().getStatus().getCode()) : "unknown"; + String responseBody = e.getResponse() != null ? String.valueOf(e.getResponse().getBody()) : "unknown"; + throw new DocumentDBException("Failed to update document: " + statusCode + " - " + responseBody); + } + } + + /** + * Update multiple documents in a collection. + */ + public UpdateResult updateMany(String database, String collection, Map filter, Map update) throws Exception { + String url = buildUrl("updateMany"); + + Map requestBody = new HashMap<>(); + requestBody.put("database", database); + requestBody.put("collection", collection); + if (filter != null && !filter.isEmpty()) { + requestBody.put("filter", filter); + } + requestBody.put("update", update); + + HttpRequest request = HttpRequest.builder() + .method("POST") + .uri(URI.create(url)) + .addHeader("Content-Type", "application/json") + .addHeader("Accept", "application/json") + .addHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes())) + .body(HttpRequest.JsonRequestBody.builder() + .content(requestBody) + .build()) + .build(); + + logger.debug("Making POST request to: {}", url); + + try { + HttpResponse response = httpClient.request(request, String.class); + return parseUpdateResponse(response.getBody()); + } catch (HttpClientResponseException e) { + String statusCode = e.getResponse() != null ? String.valueOf(e.getResponse().getStatus().getCode()) : "unknown"; + String responseBody = e.getResponse() != null ? String.valueOf(e.getResponse().getBody()) : "unknown"; + throw new DocumentDBException("Failed to update documents: " + statusCode + " - " + responseBody); + } + } + + /** + * Delete a single document from a collection. + */ + public DeleteResult deleteOne(String database, String collection, Map filter) throws Exception { + String url = buildUrl("deleteOne"); + + Map requestBody = new HashMap<>(); + requestBody.put("database", database); + requestBody.put("collection", collection); + if (filter != null && !filter.isEmpty()) { + requestBody.put("filter", filter); + } + + HttpRequest request = HttpRequest.builder() + .method("POST") + .uri(URI.create(url)) + .addHeader("Content-Type", "application/json") + .addHeader("Accept", "application/json") + .addHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes())) + .body(HttpRequest.JsonRequestBody.builder() + .content(requestBody) + .build()) + .build(); + + logger.debug("Making POST request to: {}", url); + + try { + HttpResponse response = httpClient.request(request, String.class); + return parseDeleteResponse(response.getBody()); + } catch (HttpClientResponseException e) { + String statusCode = e.getResponse() != null ? String.valueOf(e.getResponse().getStatus().getCode()) : "unknown"; + String responseBody = e.getResponse() != null ? String.valueOf(e.getResponse().getBody()) : "unknown"; + throw new DocumentDBException("Failed to delete document: " + statusCode + " - " + responseBody); + } + } + + /** + * Delete multiple documents from a collection. + */ + public DeleteResult deleteMany(String database, String collection, Map filter) throws Exception { + String url = buildUrl("deleteMany"); + + Map requestBody = new HashMap<>(); + requestBody.put("database", database); + requestBody.put("collection", collection); + if (filter != null && !filter.isEmpty()) { + requestBody.put("filter", filter); + } + + HttpRequest request = HttpRequest.builder() + .method("POST") + .uri(URI.create(url)) + .addHeader("Content-Type", "application/json") + .addHeader("Accept", "application/json") + .addHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes())) + .body(HttpRequest.JsonRequestBody.builder() + .content(requestBody) + .build()) + .build(); + + logger.debug("Making POST request to: {}", url); + + try { + HttpResponse response = httpClient.request(request, String.class); + return parseDeleteResponse(response.getBody()); + } catch (HttpClientResponseException e) { + String statusCode = e.getResponse() != null ? String.valueOf(e.getResponse().getStatus().getCode()) : "unknown"; + String responseBody = e.getResponse() != null ? String.valueOf(e.getResponse().getBody()) : "unknown"; + throw new DocumentDBException("Failed to delete documents: " + statusCode + " - " + responseBody); + } + } + + /** + * Parse update response from DocumentDB API. + */ + private UpdateResult parseUpdateResponse(String responseBody) throws JsonProcessingException { + JsonNode jsonNode = objectMapper.readTree(responseBody); + + Integer matchedCount = jsonNode.has("matchedCount") ? jsonNode.get("matchedCount").asInt() : 0; + Integer modifiedCount = jsonNode.has("modifiedCount") ? jsonNode.get("modifiedCount").asInt() : 0; + String upsertedId = null; + + if (jsonNode.has("upsertedId") && !jsonNode.get("upsertedId").isNull()) { + upsertedId = jsonNode.get("upsertedId").asText(); + } + + return new UpdateResult(matchedCount, modifiedCount, upsertedId); + } + + /** + * Parse delete response from DocumentDB API. + */ + private DeleteResult parseDeleteResponse(String responseBody) throws JsonProcessingException { + JsonNode jsonNode = objectMapper.readTree(responseBody); + + Integer deletedCount = jsonNode.has("deletedCount") ? jsonNode.get("deletedCount").asInt() : 0; + + return new DeleteResult(deletedCount); + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/documentdb/Insert.java b/src/main/java/io/kestra/plugin/documentdb/Insert.java new file mode 100644 index 0000000..6ed9288 --- /dev/null +++ b/src/main/java/io/kestra/plugin/documentdb/Insert.java @@ -0,0 +1,214 @@ +package io.kestra.plugin.documentdb; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.models.tasks.Task; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.documentdb.models.InsertResult; + +import static io.kestra.plugin.documentdb.DocumentDBClient.MAX_DOCUMENTS_PER_INSERT; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Insert documents into a DocumentDB collection", + description = "Insert one or more documents into a DocumentDB collection. Can insert a single document or multiple documents (max 10) in one operation." +) +@Plugin( + examples = { + @Example( + title = "Insert a single user document", + full = true, + code = """ + id: insert_documentdb_user + namespace: company.documentdb + + tasks: + - id: insert_user + type: io.kestra.plugin.documentdb.Insert + host: "https://my-documentdb-instance.com" + database: "myapp" + collection: "users" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + document: + name: "John Doe" + email: "john.doe@example.com" + age: 30 + created_at: "{{ now() }}" + """ + ), + @Example( + title = "Insert multiple product documents", + full = true, + code = """ + id: insert_products + namespace: company.documentdb + + tasks: + - id: insert_product_batch + type: io.kestra.plugin.documentdb.Insert + host: "https://my-documentdb-instance.com" + database: "inventory" + collection: "products" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + documents: + - name: "Laptop" + price: 999.99 + category: "Electronics" + in_stock: true + - name: "Mouse" + price: 29.99 + category: "Electronics" + in_stock: false + """ + ), + @Example( + title = "Insert document with dynamic data", + full = true, + code = """ + id: insert_dynamic_order + namespace: company.documentdb + + inputs: + - id: customer_id + type: STRING + required: true + - id: product_name + type: STRING + required: true + - id: quantity + type: INT + required: true + + tasks: + - id: insert_order + type: io.kestra.plugin.documentdb.Insert + host: "https://my-documentdb-instance.com" + database: "sales" + collection: "orders" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + document: + customer_id: "{{ inputs.customer_id }}" + product: "{{ inputs.product_name }}" + quantity: "{{ inputs.quantity }}" + order_date: "{{ now() }}" + status: "pending" + """ + ) + } +) +public class Insert extends AbstractDocumentDBTask implements RunnableTask { + + @Schema( + title = "Document for single insert", + description = "Document to insert (for single document insertion). Use this OR documents, not both." + ) + private Property> document; + + @Schema( + title = "Multiple documents", + description = "List of documents to insert (max " + MAX_DOCUMENTS_PER_INSERT + "). Use this OR document, not both." + ) + private Property>> documents; + + @Override + public Output run(RunContext runContext) throws Exception { + Logger logger = runContext.logger(); + + // Render properties + String rHost = runContext.render(this.host).as(String.class).orElseThrow(); + String rDatabase = runContext.render(this.database).as(String.class).orElseThrow(); + String rCollection = runContext.render(this.collection).as(String.class).orElseThrow(); + String rUsername = runContext.render(this.username).as(String.class).orElseThrow(); + String rPassword = runContext.render(this.password).as(String.class).orElseThrow(); + Map rDocument = runContext.render(this.document).asMap(String.class, Object.class); + List> rDocuments = runContext.render(this.documents).asList(Map.class); + + // Validate input - either document or documents should be provided, not both + if ((rDocument == null || rDocument.isEmpty()) && (rDocuments == null || rDocuments.isEmpty())) { + throw new IllegalArgumentException("Either 'document' for single insert or 'documents' for multiple inserts must be provided"); + } + + if (rDocument != null && !rDocument.isEmpty() && rDocuments != null && !rDocuments.isEmpty()) { + throw new IllegalArgumentException("Cannot specify both 'document' and 'documents'. Use one or the other."); + } + + DocumentDBClient client = new DocumentDBClient(rHost, rUsername, rPassword, runContext); + + if (rDocument != null && !rDocument.isEmpty()) { + // Insert single document + logger.info("Inserting single document into DocumentDB database: {} collection: {}", rDatabase, rCollection); + + InsertResult result = client.insertOne(rDatabase, rCollection, rDocument); + + logger.info("Successfully inserted document with ID: {}", result.getInsertedIds().getFirst()); + + return Output.builder() + .insertedId(result.getInsertedIds().getFirst()) + .insertedIds(result.getInsertedIds()) + .insertedCount(result.getInsertedCount()) + .build(); + } else { + // Insert multiple documents + if (rDocuments.size() > MAX_DOCUMENTS_PER_INSERT) { + throw new IllegalArgumentException("Cannot insert more than " + MAX_DOCUMENTS_PER_INSERT + " documents at once"); + } + + logger.info("Inserting {} documents into DocumentDB database: {} collection: {}", rDocuments.size(), rDatabase, rCollection); + + InsertResult result = client.insertMany(rDatabase, rCollection, rDocuments); + + logger.info("Successfully inserted {} documents", result.getInsertedCount()); + + return Output.builder() + .insertedId(result.getInsertedIds().isEmpty() ? null : result.getInsertedIds().getFirst()) + .insertedIds(result.getInsertedIds()) + .insertedCount(result.getInsertedCount()) + .build(); + } + } + + @Builder + @Getter + public static class Output implements io.kestra.core.models.tasks.Output { + @Schema( + title = "Inserted document ID", + description = "The ID of the first inserted document (for single insert or first of multiple)" + ) + private final String insertedId; + + @Schema( + title = "All inserted document IDs", + description = "List of all inserted document IDs" + ) + private final List insertedIds; + + @Schema( + title = "Number of documents inserted", + description = "Total count of documents successfully inserted" + ) + private final Integer insertedCount; + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/documentdb/Read.java b/src/main/java/io/kestra/plugin/documentdb/Read.java new file mode 100644 index 0000000..317c802 --- /dev/null +++ b/src/main/java/io/kestra/plugin/documentdb/Read.java @@ -0,0 +1,310 @@ +package io.kestra.plugin.documentdb; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.models.tasks.Task; +import io.kestra.core.models.tasks.common.FetchType; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.documentdb.models.DocumentDBRecord; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import org.slf4j.Logger; +import reactor.core.publisher.Flux; +import io.kestra.core.serializers.FileSerde; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Read documents from a DocumentDB collection", + description = "Read documents from a DocumentDB collection with optional filtering, limiting, and aggregation support." +) +@Plugin( + examples = { + @Example( + title = "Find all users", + full = true, + code = """ + id: read_all_users + namespace: company.documentdb + + tasks: + - id: find_users + type: io.kestra.plugin.documentdb.Read + host: "https://my-documentdb-instance.com" + database: "myapp" + collection: "users" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + fetchType: FETCH + """ + ), + @Example( + title = "Find users with filter and limit", + full = true, + code = """ + id: find_active_users + namespace: company.documentdb + + tasks: + - id: find_filtered_users + type: io.kestra.plugin.documentdb.Read + host: "https://my-documentdb-instance.com" + database: "myapp" + collection: "users" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + filter: + status: "active" + age: + $gte: 18 + limit: 100 + fetchType: FETCH + """ + ), + @Example( + title = "Get single user", + full = true, + code = """ + id: get_single_user + namespace: company.documentdb + + tasks: + - id: find_one_user + type: io.kestra.plugin.documentdb.Read + host: "https://my-documentdb-instance.com" + database: "myapp" + collection: "users" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + filter: + email: "john.doe@example.com" + fetchType: FETCH_ONE + """ + ), + @Example( + title = "Aggregation pipeline example", + full = true, + code = """ + id: user_statistics + namespace: company.documentdb + + tasks: + - id: aggregate_users + type: io.kestra.plugin.documentdb.Read + host: "https://my-documentdb-instance.com" + database: "myapp" + collection: "users" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + aggregationPipeline: + - $match: + status: "active" + - $group: + _id: "$department" + count: { $sum: 1 } + avgAge: { $avg: "$age" } + - $sort: + count: -1 + fetchType: FETCH + """ + ) + } +) +public class Read extends AbstractDocumentDBTask implements RunnableTask { + + @Schema( + title = "Filter", + description = "MongoDB-style filter criteria to apply to the query. Example: {\"status\": \"active\", \"age\": {\"$gte\": 18}}" + ) + private Property> filter; + + @Schema( + title = "Aggregation pipeline", + description = "MongoDB aggregation pipeline stages. If provided, this will execute an aggregation instead of a simple find." + ) + private Property>> aggregationPipeline; + + @Schema( + title = "Limit", + description = "Maximum number of documents to return" + ) + private Property limit; + + @Schema( + title = "Skip", + description = "Number of documents to skip" + ) + private Property skip; + + @Schema( + title = "Fetch type", + description = "How to handle query results. STORE: store all rows to a file, FETCH: output all rows as output variable, FETCH_ONE: output the first row, NONE: do nothing" + ) + @NotNull + @Builder.Default + private Property fetchType = Property.ofValue(FetchType.FETCH); + + @Override + public Output run(RunContext runContext) throws Exception { + Logger logger = runContext.logger(); + + // Render properties + String rHost = runContext.render(this.host).as(String.class).orElseThrow(); + String rDatabase = runContext.render(this.database).as(String.class).orElseThrow(); + String rCollection = runContext.render(this.collection).as(String.class).orElseThrow(); + String rUsername = runContext.render(this.username).as(String.class).orElseThrow(); + String rPassword = runContext.render(this.password).as(String.class).orElseThrow(); + Map rFilter = runContext.render(this.filter).asMap(String.class, Object.class); + List> rAggregationPipeline = runContext.render(this.aggregationPipeline).asList(Map.class); + Integer rLimit = runContext.render(this.limit).as(Integer.class).orElse(null); + Integer rSkip = runContext.render(this.skip).as(Integer.class).orElse(null); + FetchType rFetchType = runContext.render(this.fetchType).as(FetchType.class).orElse(FetchType.FETCH); + + DocumentDBClient client = new DocumentDBClient(rHost, rUsername, rPassword, runContext); + + List records; + + if (rAggregationPipeline != null && !rAggregationPipeline.isEmpty()) { + // Execute aggregation + logger.info("Executing aggregation pipeline on DocumentDB database: {} collection: {} with {} stages", + rDatabase, rCollection, rAggregationPipeline.size()); + records = client.aggregate(rDatabase, rCollection, rAggregationPipeline); + } else { + // Execute find + logger.info("Finding documents in DocumentDB database: {} collection: {}", rDatabase, rCollection); + records = client.find(rDatabase, rCollection, rFilter, rLimit, rSkip); + } + + logger.info("Found {} documents", records.size()); + + // Handle fetchType logic and build output in single switch + int recordCount = records.size(); + Output.OutputBuilder outputBuilder = Output.builder(); + + switch (rFetchType) { + case FETCH_ONE: + Map row = records.isEmpty() ? null : convertRecordToMap(records.getFirst()); + recordCount = records.isEmpty() ? 0 : 1; + outputBuilder.size((long) recordCount).row(row); + break; + case NONE: + outputBuilder.size((long) recordCount); + break; + case STORE: + StoredResult storedResult = storeRecordsAsFile(runContext, records); + outputBuilder.size((long) recordCount).uri(storedResult.getUri()); + break; + case FETCH: + default: + List> rows = records.stream() + .map(this::convertRecordToMap) + .toList(); + outputBuilder.size((long) recordCount).rows(rows); + break; + } + + return outputBuilder.build(); + } + + private Map convertRecordToMap(DocumentDBRecord record) { + Map map = new HashMap<>(); + if (record.getId() != null) { + map.put("_id", record.getId()); + } + if (record.getFields() != null) { + map.putAll(record.getFields()); + } + return map; + } + + /** + * Store records as an ION file in Kestra's internal storage. + * Returns a StoredResult containing both the URI and record count. + */ + private StoredResult storeRecordsAsFile(RunContext runContext, List records) throws IOException { + try { + // Create a temporary file for Ion format + File tempFile = runContext.workingDir().createTempFile(".ion").toFile(); + + // Stream records via Flux and write to file using FileSerde + try (BufferedWriter writer = new BufferedWriter(new FileWriter(tempFile), FileSerde.BUFFER_SIZE)) { + Flux> recordFlux = Flux.fromIterable(records) + .map(this::convertRecordToMap); + + Long count = FileSerde.writeAll(writer, recordFlux).block(); + + // Store the temporary file in Kestra's internal storage + URI storedFileUri = runContext.storage().putFile(tempFile); + + return new StoredResult(storedFileUri, count != null ? count.intValue() : 0); + } + + } catch (IOException e) { + throw new IOException("Failed to store records as Ion file: " + e.getMessage(), e); + } + } + + /** + * Helper class to return both URI and count for STORE operations. + */ + private static class StoredResult { + private final URI uri; + private final int count; + + StoredResult(URI uri, int count) { + this.uri = uri; + this.count = count; + } + + public URI getUri() { return uri; } + public int getCount() { return count; } + } + + @Builder + @Getter + public static class Output implements io.kestra.core.models.tasks.Output { + @Schema( + title = "Map containing the first row of fetched data.", + description = "Only populated if fetchType is FETCH_ONE." + ) + private final Map row; + + @Schema( + title = "List of map containing rows of fetched data.", + description = "Only populated if fetchType is FETCH." + ) + private final List> rows; + + @Schema( + title = "The URI of the result file on Kestra's internal storage (.ion file / Amazon Ion formatted text file).", + description = "Only populated if fetchType is STORE." + ) + private final URI uri; + + @Schema( + title = "The number of documents returned by the operation." + ) + private final Long size; + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/documentdb/Update.java b/src/main/java/io/kestra/plugin/documentdb/Update.java new file mode 100644 index 0000000..7be937b --- /dev/null +++ b/src/main/java/io/kestra/plugin/documentdb/Update.java @@ -0,0 +1,208 @@ +package io.kestra.plugin.documentdb; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.models.tasks.Task; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.documentdb.models.UpdateResult; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import org.slf4j.Logger; + +import java.util.Map; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Update documents in a DocumentDB collection", + description = "Update one or more documents in a DocumentDB collection that match the filter criteria." +) +@Plugin( + examples = { + @Example( + title = "Update a single user document", + full = true, + code = """ + id: update_documentdb_user + namespace: company.documentdb + + tasks: + - id: update_user + type: io.kestra.plugin.documentdb.Update + host: "https://my-documentdb-instance.com" + database: "myapp" + collection: "users" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + filter: + email: "john.doe@example.com" + update: + $set: + status: "active" + last_login: "{{ now() }}" + updateMany: false + """ + ), + @Example( + title = "Update multiple documents", + full = true, + code = """ + id: update_multiple_users + namespace: company.documentdb + + tasks: + - id: update_inactive_users + type: io.kestra.plugin.documentdb.Update + host: "https://my-documentdb-instance.com" + database: "myapp" + collection: "users" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + filter: + last_login: + $lt: "2023-01-01" + update: + $set: + status: "inactive" + archived_date: "{{ now() }}" + updateMany: true + """ + ), + @Example( + title = "Update with increment operation", + full = true, + code = """ + id: increment_user_views + namespace: company.documentdb + + tasks: + - id: increment_views + type: io.kestra.plugin.documentdb.Update + host: "https://my-documentdb-instance.com" + database: "myapp" + collection: "profiles" + username: "{{ secret('DOCUMENTDB_USERNAME') }}" + password: "{{ secret('DOCUMENTDB_PASSWORD') }}" + filter: + user_id: "{{ inputs.user_id }}" + update: + $inc: + view_count: 1 + total_interactions: 1 + $set: + last_viewed: "{{ now() }}" + updateMany: false + """ + ) + } +) +public class Update extends AbstractDocumentDBTask implements RunnableTask { + + @Schema( + title = "Filter criteria", + description = "MongoDB-style filter to select which documents to update. Example: {\"status\": \"pending\", \"age\": {\"$gte\": 18}}" + ) + private Property> filter; + + @Schema( + title = "Update operations", + description = "MongoDB-style update operations to apply. Example: {\"$set\": {\"status\": \"active\"}, \"$inc\": {\"count\": 1}}" + ) + @NotNull + private Property> update; + + @Schema( + title = "Update multiple documents", + description = "If true, updates all documents matching the filter (updateMany). If false, updates only the first match (updateOne)." + ) + @Builder.Default + private Property updateMany = Property.ofValue(false); + + @Override + public Output run(RunContext runContext) throws Exception { + Logger logger = runContext.logger(); + + // Render properties + String rHost = runContext.render(this.host).as(String.class).orElseThrow(); + String rDatabase = runContext.render(this.database).as(String.class).orElseThrow(); + String rCollection = runContext.render(this.collection).as(String.class).orElseThrow(); + String rUsername = runContext.render(this.username).as(String.class).orElseThrow(); + String rPassword = runContext.render(this.password).as(String.class).orElseThrow(); + Map rFilter = runContext.render(this.filter).asMap(String.class, Object.class); + Map rUpdate = runContext.render(this.update).asMap(String.class, Object.class); + Boolean rUpdateMany = runContext.render(this.updateMany).as(Boolean.class).orElse(false); + + // Validate update operations + if (rUpdate == null || rUpdate.isEmpty()) { + throw new IllegalArgumentException("Update operations must be provided"); + } + + DocumentDBClient client = new DocumentDBClient(rHost, rUsername, rPassword, runContext); + + if (rUpdateMany) { + // Update multiple documents + logger.info("Updating multiple documents in DocumentDB database: {} collection: {}", rDatabase, rCollection); + + UpdateResult result = client.updateMany(rDatabase, rCollection, rFilter, rUpdate); + + logger.info("Successfully updated {} of {} matching documents", result.getModifiedCount(), result.getMatchedCount()); + + return Output.builder() + .matchedCount(result.getMatchedCount()) + .modifiedCount(result.getModifiedCount()) + .upsertedId(result.getUpsertedId()) + .build(); + } else { + // Update single document + logger.info("Updating single document in DocumentDB database: {} collection: {}", rDatabase, rCollection); + + UpdateResult result = client.updateOne(rDatabase, rCollection, rFilter, rUpdate); + + if (result.getModifiedCount() > 0) { + logger.info("Successfully updated {} of {} matching documents", result.getModifiedCount(), result.getMatchedCount()); + } else { + logger.info("No documents were modified (matched: {})", result.getMatchedCount()); + } + + return Output.builder() + .matchedCount(result.getMatchedCount()) + .modifiedCount(result.getModifiedCount()) + .upsertedId(result.getUpsertedId()) + .build(); + } + } + + @Builder + @Getter + public static class Output implements io.kestra.core.models.tasks.Output { + @Schema( + title = "Number of documents matched", + description = "Total count of documents that matched the filter criteria" + ) + private final Integer matchedCount; + + @Schema( + title = "Number of documents modified", + description = "Total count of documents that were actually modified" + ) + private final Integer modifiedCount; + + @Schema( + title = "Upserted document ID", + description = "ID of the document that was created if upsert was enabled and no match was found" + ) + private final String upsertedId; + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/documentdb/models/DeleteResult.java b/src/main/java/io/kestra/plugin/documentdb/models/DeleteResult.java new file mode 100644 index 0000000..85a4eb5 --- /dev/null +++ b/src/main/java/io/kestra/plugin/documentdb/models/DeleteResult.java @@ -0,0 +1,15 @@ +package io.kestra.plugin.documentdb.models; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; + +/** + * Result of a DocumentDB delete operation. + */ +@Getter +@NoArgsConstructor +@AllArgsConstructor +public class DeleteResult { + private Integer deletedCount; +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/documentdb/models/DocumentDBException.java b/src/main/java/io/kestra/plugin/documentdb/models/DocumentDBException.java new file mode 100644 index 0000000..28e33ea --- /dev/null +++ b/src/main/java/io/kestra/plugin/documentdb/models/DocumentDBException.java @@ -0,0 +1,14 @@ +package io.kestra.plugin.documentdb.models; + +/** + * Exception thrown when DocumentDB operations fail. + */ +public class DocumentDBException extends Exception { + public DocumentDBException(String message) { + super(message); + } + + public DocumentDBException(String message, Throwable cause) { + super(message, cause); + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/documentdb/models/DocumentDBRecord.java b/src/main/java/io/kestra/plugin/documentdb/models/DocumentDBRecord.java new file mode 100644 index 0000000..57b2384 --- /dev/null +++ b/src/main/java/io/kestra/plugin/documentdb/models/DocumentDBRecord.java @@ -0,0 +1,18 @@ +package io.kestra.plugin.documentdb.models; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.util.Map; + +/** + * Represents a document record from DocumentDB. + */ +@Getter +@NoArgsConstructor +@AllArgsConstructor +public class DocumentDBRecord { + private String id; + private Map fields; +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/documentdb/models/InsertResult.java b/src/main/java/io/kestra/plugin/documentdb/models/InsertResult.java new file mode 100644 index 0000000..d746c6d --- /dev/null +++ b/src/main/java/io/kestra/plugin/documentdb/models/InsertResult.java @@ -0,0 +1,18 @@ +package io.kestra.plugin.documentdb.models; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * Result of a DocumentDB insert operation. + */ +@Getter +@NoArgsConstructor +@AllArgsConstructor +public class InsertResult { + private List insertedIds; + private Integer insertedCount; +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/documentdb/models/UpdateResult.java b/src/main/java/io/kestra/plugin/documentdb/models/UpdateResult.java new file mode 100644 index 0000000..527d088 --- /dev/null +++ b/src/main/java/io/kestra/plugin/documentdb/models/UpdateResult.java @@ -0,0 +1,17 @@ +package io.kestra.plugin.documentdb.models; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; + +/** + * Result of a DocumentDB update operation. + */ +@Getter +@NoArgsConstructor +@AllArgsConstructor +public class UpdateResult { + private Integer matchedCount; + private Integer modifiedCount; + private String upsertedId; +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/documentdb/package-info.java b/src/main/java/io/kestra/plugin/documentdb/package-info.java new file mode 100644 index 0000000..d62763d --- /dev/null +++ b/src/main/java/io/kestra/plugin/documentdb/package-info.java @@ -0,0 +1,4 @@ +@PluginSubGroup(title = "DocumentDB", description = "Integrate with DocumentDB, Microsoft's open-source MongoDB-compatible document database built on PostgreSQL.", categories = PluginSubGroup.PluginCategory.DATABASE) +package io.kestra.plugin.documentdb; + +import io.kestra.core.models.annotations.PluginSubGroup; \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/templates/Example.java b/src/main/java/io/kestra/plugin/templates/Example.java deleted file mode 100644 index a760828..0000000 --- a/src/main/java/io/kestra/plugin/templates/Example.java +++ /dev/null @@ -1,72 +0,0 @@ -package io.kestra.plugin.templates; - -import io.kestra.core.models.annotations.Plugin; -import io.kestra.core.models.property.Property; -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.*; -import lombok.experimental.SuperBuilder; -import org.apache.commons.lang3.StringUtils; -import io.kestra.core.models.tasks.RunnableTask; -import io.kestra.core.models.tasks.Task; -import io.kestra.core.runners.RunContext; -import org.slf4j.Logger; - -@SuperBuilder -@ToString -@EqualsAndHashCode -@Getter -@NoArgsConstructor -@Schema( - title = "Short description for this task", - description = "Full description of this task" -) -@Plugin( - examples = { - @io.kestra.core.models.annotations.Example( - title = "Simple revert", - code = { "format: \"Text to be reverted\"" } - ) - } -) -public class Example extends Task implements RunnableTask { - @Schema( - title = "Short description for this input", - description = "Full description of this input" - ) - private Property format; - - @Override - public Example.Output run(RunContext runContext) throws Exception { - Logger logger = runContext.logger(); - - String render = runContext.render(format).as(String.class).orElse(""); - logger.debug(render); - - return Output.builder() - .child(new OutputChild(StringUtils.reverse(render))) - .build(); - } - - /** - * Input or Output can be nested as you need - */ - @Builder - @Getter - public static class Output implements io.kestra.core.models.tasks.Output { - @Schema( - title = "Short description for this output", - description = "Full description of this output" - ) - private final OutputChild child; - } - - @Builder - @Getter - public static class OutputChild implements io.kestra.core.models.tasks.Output { - @Schema( - title = "Short description for this output", - description = "Full description of this output" - ) - private final String value; - } -} diff --git a/src/main/java/io/kestra/plugin/templates/Trigger.java b/src/main/java/io/kestra/plugin/templates/Trigger.java deleted file mode 100644 index 7971aa1..0000000 --- a/src/main/java/io/kestra/plugin/templates/Trigger.java +++ /dev/null @@ -1,58 +0,0 @@ -package io.kestra.plugin.templates; - -import io.kestra.core.exceptions.IllegalVariableEvaluationException; -import io.kestra.core.models.annotations.Plugin; -import io.kestra.core.models.conditions.ConditionContext; -import io.kestra.core.models.executions.Execution; -import io.kestra.core.models.property.Property; -import io.kestra.core.models.triggers.*; -import io.kestra.core.runners.RunContext; -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.*; -import lombok.experimental.SuperBuilder; - -import java.time.Duration; -import java.util.Optional; - -@SuperBuilder -@ToString -@EqualsAndHashCode -@Getter -@NoArgsConstructor -@Plugin -@Schema( - title = "Trigger an execution randomly", - description ="Trigger an execution randomly" -) -public class Trigger extends AbstractTrigger implements PollingTriggerInterface, TriggerOutput { - @Builder.Default - private final Duration interval = Duration.ofSeconds(60); - - protected Property min = Property.of(0.5); - - @Override - public Optional evaluate(ConditionContext conditionContext, TriggerContext context) throws IllegalVariableEvaluationException { - RunContext runContext = conditionContext.getRunContext(); - - double random = Math.random(); - if (random < runContext.render(this.min).as(Double.class).orElseThrow()) { - return Optional.empty(); - } - - runContext.logger().info("Will create an execution"); - Execution execution = TriggerService.generateExecution( - this, - conditionContext, - context, - Output.builder().random(random).build() - ); - - return Optional.of(execution); - } - - @Builder - @Getter - public static class Output implements io.kestra.core.models.tasks.Output { - private Double random; - } -} diff --git a/src/main/java/io/kestra/plugin/templates/package-info.java b/src/main/java/io/kestra/plugin/templates/package-info.java deleted file mode 100644 index 50e452d..0000000 --- a/src/main/java/io/kestra/plugin/templates/package-info.java +++ /dev/null @@ -1,8 +0,0 @@ -@PluginSubGroup( - title = "Example plugin", - description = "A plugin to show how to build a plugin in Kestra.", - categories = PluginSubGroup.PluginCategory.TOOL -) -package io.kestra.plugin.templates; - -import io.kestra.core.models.annotations.PluginSubGroup; \ No newline at end of file diff --git a/src/main/resources/icons/io.kestra.plugin.documentdb.svg b/src/main/resources/icons/io.kestra.plugin.documentdb.svg new file mode 100644 index 0000000..cd6cf7e --- /dev/null +++ b/src/main/resources/icons/io.kestra.plugin.documentdb.svg @@ -0,0 +1,13 @@ + + + + + + + + + + + + + diff --git a/src/main/resources/icons/plugin-icon.svg b/src/main/resources/icons/plugin-icon.svg new file mode 100644 index 0000000..cd6cf7e --- /dev/null +++ b/src/main/resources/icons/plugin-icon.svg @@ -0,0 +1,13 @@ + + + + + + + + + + + + + diff --git a/src/test/java/io/kestra/plugin/documentdb/DeleteTest.java b/src/test/java/io/kestra/plugin/documentdb/DeleteTest.java new file mode 100644 index 0000000..cccf6b9 --- /dev/null +++ b/src/test/java/io/kestra/plugin/documentdb/DeleteTest.java @@ -0,0 +1,217 @@ +package io.kestra.plugin.documentdb; + +import io.kestra.core.models.property.Property; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.utils.TestsUtils; +import io.kestra.core.junit.annotations.KestraTest; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.DisplayName; + +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +@KestraTest +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class DeleteTest { + + // Real connection details for local DocumentDB container + private static final String HOST = "http://localhost:10260"; + private static final String DATABASE = "test_db"; + private static final String COLLECTION = "delete_test"; + private static final String USERNAME = "testuser"; + private static final String PASSWORD = "testpass"; + + @Inject + private RunContextFactory runContextFactory; + + @Test + void shouldValidateRequiredProperties() throws Exception { + Delete task = Delete.builder() + .id("test-delete") + .type(Delete.class.getName()) + .host(Property.ofValue("https://test-documentdb.com")) + .database(Property.ofValue("testdb")) + .collection(Property.ofValue("testcol")) + .username(Property.ofValue("testuser")) + .password(Property.ofValue("testpass")) + .filter(Property.ofValue(Map.of("name", "Test Document"))) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of()); + + // This should not throw an exception for validation + assertThat(task.getHost(), is(notNullValue())); + assertThat(task.getDatabase(), is(notNullValue())); + assertThat(task.getCollection(), is(notNullValue())); + assertThat(task.getUsername(), is(notNullValue())); + assertThat(task.getPassword(), is(notNullValue())); + assertThat(task.getFilter(), is(notNullValue())); + + // Task should fail with connection error since this unit test uses a non-existent host + // but this validates that the task configuration is valid + try { + task.run(runContext); + throw new AssertionError("Should have thrown exception due to connection failure"); + } catch (Exception e) { + // Expected - connection will fail to the non-existent test host + // This validates the task can be executed with valid properties + assertThat(e.getMessage(), anyOf( + containsString("Connection refused"), + containsString("Failed to delete document"), + containsString("Name or service not known"), + containsString("UnknownHostException") + )); + } + } + + @Test + void shouldValidateDeleteManyFlag() throws Exception { + Delete task = Delete.builder() + .id("test-delete-many") + .type(Delete.class.getName()) + .host(Property.ofValue("https://test-documentdb.com")) + .database(Property.ofValue("testdb")) + .collection(Property.ofValue("testcol")) + .username(Property.ofValue("testuser")) + .password(Property.ofValue("testpass")) + .filter(Property.ofValue(Map.of("status", "to_delete"))) + .deleteMany(Property.ofValue(true)) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of()); + + assertThat(task.getDeleteMany(), is(notNullValue())); + + // Task should fail with connection error since this unit test uses a non-existent host + // but this validates that the deleteMany flag is properly configured + try { + task.run(runContext); + throw new AssertionError("Should have thrown exception due to connection failure"); + } catch (Exception e) { + // Expected - connection will fail to the non-existent test host + // This validates the task can be executed with deleteMany flag + assertThat(e.getMessage(), anyOf( + containsString("Connection refused"), + containsString("Failed to delete document"), + containsString("Name or service not known"), + containsString("UnknownHostException") + )); + } + } + + @Test + @Order(10) + @DisplayName("Integration: Delete Single Document with Real DocumentDB") + void shouldDeleteSingleDocumentWithRealServer() throws Exception { + // First, insert a document to delete + Insert insertTask = Insert.builder() + .id("setup-delete-test") + .type(Insert.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .document(Property.ofValue(Map.of( + "_id", "delete-test-doc-" + System.currentTimeMillis(), + "name", "Delete Test Document", + "status", "to_delete", + "created_at", System.currentTimeMillis() + ))) + .build(); + + RunContext insertContext = TestsUtils.mockRunContext(runContextFactory, insertTask, Map.of()); + Insert.Output insertOutput = insertTask.run(insertContext); + String docId = insertOutput.getInsertedId(); + + // Now delete the document + Delete deleteTask = Delete.builder() + .id("delete-single-integration-test") + .type(Delete.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .filter(Property.ofValue(Map.of("_id", docId))) + .deleteMany(Property.ofValue(false)) + .build(); + + RunContext deleteContext = TestsUtils.mockRunContext(runContextFactory, deleteTask, Map.of()); + Delete.Output deleteOutput = deleteTask.run(deleteContext); + + assertThat("Should delete one document", deleteOutput.getDeletedCount(), equalTo(1)); + } + + @Test + @Order(11) + @DisplayName("Integration: Delete Multiple Documents with Real DocumentDB") + void shouldDeleteMultipleDocumentsWithRealServer() throws Exception { + // Insert multiple documents to delete + Insert insertTask = Insert.builder() + .id("setup-delete-many-test") + .type(Insert.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .documents(Property.ofValue(List.of( + Map.of("_id", "delete-many-1-" + System.currentTimeMillis(), "category", "cleanup-test", "status", "expired", "priority", 1), + Map.of("_id", "delete-many-2-" + System.currentTimeMillis(), "category", "cleanup-test", "status", "expired", "priority", 2), + Map.of("_id", "delete-many-3-" + System.currentTimeMillis(), "category", "cleanup-test", "status", "expired", "priority", 3) + ))) + .build(); + + RunContext insertContext = TestsUtils.mockRunContext(runContextFactory, insertTask, Map.of()); + insertTask.run(insertContext); + + // Now delete multiple documents + Delete deleteTask = Delete.builder() + .id("delete-many-integration-test") + .type(Delete.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .filter(Property.ofValue(Map.of("category", "cleanup-test"))) + .deleteMany(Property.ofValue(true)) + .build(); + + RunContext deleteContext = TestsUtils.mockRunContext(runContextFactory, deleteTask, Map.of()); + Delete.Output deleteOutput = deleteTask.run(deleteContext); + + assertThat("Should delete multiple documents", deleteOutput.getDeletedCount(), greaterThanOrEqualTo(3)); + } + + @Test + @Order(12) + @DisplayName("Integration: Delete No Matching Documents") + void shouldHandleNoMatchingDocuments() throws Exception { + Delete deleteTask = Delete.builder() + .id("delete-no-match-test") + .type(Delete.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .filter(Property.ofValue(Map.of("_id", "non-existent-document-" + System.currentTimeMillis()))) + .deleteMany(Property.ofValue(false)) + .build(); + + RunContext deleteContext = TestsUtils.mockRunContext(runContextFactory, deleteTask, Map.of()); + Delete.Output deleteOutput = deleteTask.run(deleteContext); + + assertThat("Should delete zero documents when no match", deleteOutput.getDeletedCount(), equalTo(0)); + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/documentdb/DocumentDBIntegrationTest.java b/src/test/java/io/kestra/plugin/documentdb/DocumentDBIntegrationTest.java new file mode 100644 index 0000000..6da6770 --- /dev/null +++ b/src/test/java/io/kestra/plugin/documentdb/DocumentDBIntegrationTest.java @@ -0,0 +1,156 @@ +package io.kestra.plugin.documentdb; + +import io.kestra.core.models.property.Property; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.utils.TestsUtils; +import io.kestra.core.models.tasks.common.FetchType; +import io.kestra.core.junit.annotations.KestraTest; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestInstance; + +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +/** + * Real integration tests for DocumentDB operations using local container. + * These tests run against a real DocumentDB container and use actual credentials. + * + * To run these tests: + * 1. Start DocumentDB: ./.github/setup-unit.sh + * 2. Run tests: ./gradlew test + * + * No environment variables needed - tests use real credentials directly. + */ +@KestraTest +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class DocumentDBIntegrationTest { + + @Inject + RunContextFactory runContextFactory; + + // Real connection details for local DocumentDB container + private static final String HOST = "http://localhost:10260"; + private static final String DATABASE = "test_db"; + private static final String COLLECTION = "integration_test"; + private static final String USERNAME = "testuser"; + private static final String PASSWORD = "testpass"; + + @BeforeAll + void setupTestData() throws Exception { + // Create test data for READ tests to use + Insert insertTask = Insert.builder() + .id("setup-test-data") + .type(Insert.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .documents(Property.ofValue(List.of( + Map.of("_id", "setup-doc-" + System.currentTimeMillis() + "-1", "name", "Setup Document 1", "category", "A", "priority", 1), + Map.of("_id", "setup-doc-" + System.currentTimeMillis() + "-2", "name", "Setup Document 2", "category", "B", "priority", 2), + Map.of("_id", "setup-doc-" + System.currentTimeMillis() + "-3", "name", "Setup Document 3", "category", "A", "priority", 3) + ))) + .build(); + + RunContext setupContext = TestsUtils.mockRunContext(runContextFactory, insertTask, Map.of()); + insertTask.run(setupContext); + } + + @Test + @Order(1) + @DisplayName("1. Fetch One Document with Filter - Real DocumentDB Test") + void shouldHandleFetchOne() throws Exception { + Read readTask = Read.builder() + .id("fetch-one-real-test") + .type(Read.class.getName()) +.host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .filter(Property.ofValue(Map.of("category", "A"))) + .fetchType(Property.ofValue(FetchType.FETCH_ONE)) + .build(); + + RunContext readContext = TestsUtils.mockRunContext(runContextFactory, readTask, Map.of()); + Read.Output readOutput = readTask.run(readContext); + + assertThat("Should retrieve one document", readOutput.getRow(), notNullValue()); + assertThat("Document should match filter", readOutput.getRow().get("category"), equalTo("A")); + assertThat("Should not return rows array for FETCH_ONE", readOutput.getRows(), nullValue()); + } + + @Test + @Order(2) + @DisplayName("2. Handle Limit and Skip - Real DocumentDB Test") + void shouldHandleLimitAndSkip() throws Exception { + Read readTask = Read.builder() + .id("limit-skip-real-test") + .type(Read.class.getName()) +.host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .filter(Property.ofValue(Map.of())) // Empty filter to get all documents + .limit(Property.ofValue(2)) + .skip(Property.ofValue(1)) + .fetchType(Property.ofValue(FetchType.FETCH)) + .build(); + + RunContext readContext = TestsUtils.mockRunContext(runContextFactory, readTask, Map.of()); + Read.Output readOutput = readTask.run(readContext); + + assertThat("Should retrieve documents", readOutput.getRows(), notNullValue()); + assertThat("Should respect limit", readOutput.getRows().size(), lessThanOrEqualTo(2)); + assertThat("Should have at least one document", readOutput.getRows().size(), greaterThan(0)); + } + + @Test + @Order(3) + @DisplayName("3. Execute Aggregation Pipeline - Real DocumentDB Test") + void shouldExecuteAggregationPipeline() throws Exception { + Read readTask = Read.builder() + .id("aggregation-real-test") + .type(Read.class.getName()) +.host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .aggregationPipeline(Property.ofValue(List.of( + Map.of("$match", Map.of("category", Map.of("$exists", true))), + Map.of("$group", Map.of( + "_id", "$category", + "count", Map.of("$sum", 1), + "avgPriority", Map.of("$avg", "$priority") + )), + Map.of("$sort", Map.of("count", -1)) + ))) + .fetchType(Property.ofValue(FetchType.FETCH)) + .build(); + + RunContext readContext = TestsUtils.mockRunContext(runContextFactory, readTask, Map.of()); + Read.Output readOutput = readTask.run(readContext); + + assertThat("Should execute aggregation", readOutput.getRows(), notNullValue()); + assertThat("Should return aggregation results", readOutput.getRows().size(), greaterThan(0)); + + // Verify aggregation structure + Map firstResult = readOutput.getRows().getFirst(); + assertThat("Should have _id field", firstResult.containsKey("_id"), equalTo(true)); + assertThat("Should have count field", firstResult.containsKey("count"), equalTo(true)); + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/documentdb/InsertTest.java b/src/test/java/io/kestra/plugin/documentdb/InsertTest.java new file mode 100644 index 0000000..e88be16 --- /dev/null +++ b/src/test/java/io/kestra/plugin/documentdb/InsertTest.java @@ -0,0 +1,260 @@ +package io.kestra.plugin.documentdb; + +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.common.FetchType; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.utils.TestsUtils; +import io.kestra.core.junit.annotations.KestraTest; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.DisplayName; + +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +@KestraTest +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class InsertTest { + + // Real connection details for local DocumentDB container + private static final String HOST = "http://localhost:10260"; + private static final String DATABASE = "test_db"; + private static final String COLLECTION = "integration_test"; + private static final String USERNAME = "testuser"; + private static final String PASSWORD = "testpass"; + + @Inject + private RunContextFactory runContextFactory; + + @Test + void shouldValidateRequiredProperties() throws Exception { + Insert task = Insert.builder() + .id("test-insert") + .type(Insert.class.getName()) + .host(Property.ofValue("https://test-documentdb.com")) + .database(Property.ofValue("testdb")) + .collection(Property.ofValue("testcol")) + .username(Property.ofValue("testuser")) + .password(Property.ofValue("testpass")) + .document(Property.ofValue(Map.of("name", "Test Document"))) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of()); + + // This should not throw an exception for validation + assertThat(task.getHost(), is(notNullValue())); + assertThat(task.getDatabase(), is(notNullValue())); + assertThat(task.getCollection(), is(notNullValue())); + assertThat(task.getUsername(), is(notNullValue())); + assertThat(task.getPassword(), is(notNullValue())); + + // Task should fail with connection error since this unit test uses a non-existent host + // but this validates that the task configuration is valid + try { + task.run(runContext); + throw new AssertionError("Should have thrown exception due to connection failure"); + } catch (Exception e) { + // Expected - connection will fail to the non-existent test host + // This validates the task can be executed with valid properties + assertThat(e.getMessage(), anyOf( + containsString("Connection refused"), + containsString("Failed to insert document"), + containsString("Name or service not known"), + containsString("UnknownHostException") + )); + } + } + + @Test + void shouldRejectBothDocumentAndDocuments() throws Exception { + Insert task = Insert.builder() + .id("test-reject-both") + .type(Insert.class.getName()) + .host(Property.ofValue("https://test-documentdb.com")) + .database(Property.ofValue("testdb")) + .collection(Property.ofValue("testcol")) + .username(Property.ofValue("testuser")) + .password(Property.ofValue("testpass")) + .document(Property.ofValue(Map.of("name", "Test Document"))) + .documents(Property.ofValue(List.of(Map.of("name", "Test Document 2")))) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of()); + + try { + task.run(runContext); + throw new AssertionError("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("Cannot specify both")); + } + } + + @Test + void shouldRejectNeitherDocumentNorDocuments() throws Exception { + Insert task = Insert.builder() + .id("test-reject-neither") + .type(Insert.class.getName()) + .host(Property.ofValue("https://test-documentdb.com")) + .database(Property.ofValue("testdb")) + .collection(Property.ofValue("testcol")) + .username(Property.ofValue("testuser")) + .password(Property.ofValue("testpass")) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of()); + + try { + task.run(runContext); + throw new AssertionError("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("must be provided")); + } + } + + @Test + void shouldRejectTooManyDocuments() throws Exception { + List> tooManyDocs = List.of( + Map.of("name", "Doc1"), Map.of("name", "Doc2"), Map.of("name", "Doc3"), + Map.of("name", "Doc4"), Map.of("name", "Doc5"), Map.of("name", "Doc6"), + Map.of("name", "Doc7"), Map.of("name", "Doc8"), Map.of("name", "Doc9"), + Map.of("name", "Doc10"), Map.of("name", "Doc11") // 11 documents - exceeds limit + ); + + Insert task = Insert.builder() + .id("test-too-many") + .type(Insert.class.getName()) + .host(Property.ofValue("https://test-documentdb.com")) + .database(Property.ofValue("testdb")) + .collection(Property.ofValue("testcol")) + .username(Property.ofValue("testuser")) + .password(Property.ofValue("testpass")) + .documents(Property.ofValue(tooManyDocs)) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of()); + + try { + task.run(runContext); + throw new AssertionError("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("Cannot insert more than")); + } + } + + @Test + @Order(10) + @DisplayName("Integration: Insert Single Document with Real DocumentDB") + void shouldInsertSingleDocumentWithRealServer() throws Exception { + Insert insertTask = Insert.builder() + .id("insert-single-integration-test") + .type(Insert.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .document(Property.ofValue(Map.of( + "_id", "single-test-doc-" + System.currentTimeMillis(), + "name", "Integration Test Document", + "value", 42, + "active", true, + "timestamp", System.currentTimeMillis() + ))) + .build(); + + RunContext insertContext = TestsUtils.mockRunContext(runContextFactory, insertTask, Map.of()); + Insert.Output insertOutput = insertTask.run(insertContext); + + assertThat("Document should be inserted", insertOutput.getInsertedCount(), equalTo(1)); + assertThat("Should return inserted ID", insertOutput.getInsertedIds(), hasSize(1)); + assertThat("Inserted ID should not be null", insertOutput.getInsertedId(), notNullValue()); + } + + @Test + @Order(11) + @DisplayName("Integration: Insert Multiple Documents with Real DocumentDB") + void shouldInsertMultipleDocumentsWithRealServer() throws Exception { + Insert insertTask = Insert.builder() + .id("insert-multiple-integration-test") + .type(Insert.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .documents(Property.ofValue(List.of( + Map.of("_id", "multi-test-doc-1-" + System.currentTimeMillis(), "name", "Document 1", "category", "A", "priority", 1), + Map.of("_id", "multi-test-doc-2-" + System.currentTimeMillis(), "name", "Document 2", "category", "B", "priority", 2), + Map.of("_id", "multi-test-doc-3-" + System.currentTimeMillis(), "name", "Document 3", "category", "A", "priority", 3) + ))) + .build(); + + RunContext insertContext = TestsUtils.mockRunContext(runContextFactory, insertTask, Map.of()); + Insert.Output insertOutput = insertTask.run(insertContext); + + assertThat("Should insert all documents", insertOutput.getInsertedCount(), equalTo(3)); + assertThat("Should return all inserted IDs", insertOutput.getInsertedIds(), hasSize(3)); + assertThat("First inserted ID should not be null", insertOutput.getInsertedId(), notNullValue()); + } + + @Test + @Order(12) + @DisplayName("Integration: Insert and Verify Document with Real DocumentDB") + void shouldInsertAndVerifyDocumentWithRealServer() throws Exception { + String uniqueId = "verify-test-doc-" + System.currentTimeMillis(); + + // Insert a document + Insert insertTask = Insert.builder() + .id("insert-verify-integration-test") + .type(Insert.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .document(Property.ofValue(Map.of( + "_id", uniqueId, + "name", "Verify Integration Test Document", + "value", 99, + "active", true, + "timestamp", System.currentTimeMillis() + ))) + .build(); + + RunContext insertContext = TestsUtils.mockRunContext(runContextFactory, insertTask, Map.of()); + Insert.Output insertOutput = insertTask.run(insertContext); + + assertThat("Document should be inserted", insertOutput.getInsertedCount(), equalTo(1)); + assertThat("Should return inserted ID", insertOutput.getInsertedIds(), hasSize(1)); + String insertedId = insertOutput.getInsertedIds().getFirst(); + assertThat("Inserted ID should match", insertedId, equalTo(uniqueId)); + + // Verify the document was inserted by reading it back + Read readTask = Read.builder() + .id("read-verify-integration-test") + .type(Read.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .filter(Property.ofValue(Map.of("_id", insertedId))) + .fetchType(Property.ofValue(FetchType.FETCH_ONE)) + .build(); + + RunContext readContext = TestsUtils.mockRunContext(runContextFactory, readTask, Map.of()); + Read.Output readOutput = readTask.run(readContext); + + assertThat("Should retrieve the document", readOutput.getRow(), notNullValue()); + assertThat("Document name should match", readOutput.getRow().get("name"), equalTo("Verify Integration Test Document")); + assertThat("Document value should match", readOutput.getRow().get("value"), equalTo(99)); + assertThat("Document active flag should match", readOutput.getRow().get("active"), equalTo(true)); + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/documentdb/ReadTest.java b/src/test/java/io/kestra/plugin/documentdb/ReadTest.java new file mode 100644 index 0000000..371eb1e --- /dev/null +++ b/src/test/java/io/kestra/plugin/documentdb/ReadTest.java @@ -0,0 +1,234 @@ +package io.kestra.plugin.documentdb; + +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.common.FetchType; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.utils.TestsUtils; +import io.kestra.core.junit.annotations.KestraTest; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestInstance; + +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +@KestraTest +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class ReadTest { + + @Inject + private RunContextFactory runContextFactory; + + // Real connection details for local DocumentDB container + private static final String HOST = "http://localhost:10260"; + private static final String DATABASE = "test_db"; + private static final String COLLECTION = "read_test"; + private static final String USERNAME = "testuser"; + private static final String PASSWORD = "testpass"; + + @BeforeAll + void setupTestData() throws Exception { + // Create test data for READ tests with unique IDs + long timestamp = System.currentTimeMillis(); + Insert insertTask = Insert.builder() + .id("setup-read-test-data") + .type(Insert.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .documents(Property.ofValue(List.of( + Map.of("_id", "test-" + timestamp + "-1", "name", "Test User 1", "status", "active", "age", 25, "roles", List.of("user", "editor")), + Map.of("_id", "test-" + timestamp + "-2", "name", "Test User 2", "status", "active", "age", 30, "roles", List.of("admin")), + Map.of("_id", "test-" + timestamp + "-3", "name", "Test User 3", "status", "inactive", "age", 17, "department", "Sales") + ))) + .build(); + + RunContext setupContext = TestsUtils.mockRunContext(runContextFactory, insertTask, Map.of()); + insertTask.run(setupContext); + } + + @Test + void shouldCreateTaskWithRequiredProperties() throws Exception { + Read task = Read.builder() + .id("test-read") + .type(Read.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .fetchType(Property.ofValue(FetchType.FETCH)) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of()); + + assertThat(task, is(notNullValue())); + assertThat(task.getHost(), is(notNullValue())); + assertThat(task.getDatabase(), is(notNullValue())); + assertThat(task.getCollection(), is(notNullValue())); + assertThat(task.getUsername(), is(notNullValue())); + assertThat(task.getPassword(), is(notNullValue())); + assertThat(task.getFetchType(), is(notNullValue())); + + // Execute the task and verify it works with real database + Read.Output output = task.run(runContext); + assertThat(output, is(notNullValue())); + assertThat(output.getRows(), is(notNullValue())); + assertThat(output.getSize(), greaterThan(0L)); + } + + @Test + void shouldDefaultToFetchType() throws Exception { + Read task = Read.builder() + .id("test-default") + .type(Read.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of()); + + // Should default to FETCH + assertThat(task.getFetchType(), is(notNullValue())); + + // Execute the task and verify default fetchType works + Read.Output output = task.run(runContext); + assertThat(output, is(notNullValue())); + assertThat(output.getRows(), is(notNullValue())); + assertThat(output.getSize(), greaterThan(0L)); + } + + + @Test + void shouldValidateTaskWithFilter() throws Exception { + Map filter = Map.of( + "status", "active", + "age", Map.of("$gte", 18) + ); + + Read task = Read.builder() + .id("test-filter") + .type(Read.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .filter(Property.ofValue(filter)) + .fetchType(Property.ofValue(FetchType.FETCH)) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of()); + + // Execute the task and verify filter works + assertThat(task.getFilter(), is(notNullValue())); + Read.Output output = task.run(runContext); + + assertThat(output, is(notNullValue())); + assertThat(output.getRows(), is(notNullValue())); + // Should only return documents matching the filter (status=active and age>=18) + assertThat(output.getSize(), greaterThan(0L)); + for (Map row : output.getRows()) { + assertThat(row.get("status"), equalTo("active")); + assertThat((Integer) row.get("age"), greaterThanOrEqualTo(18)); + } + } + + @Test + void shouldValidateTaskWithAggregationPipeline() throws Exception { + List> pipeline = List.of( + Map.of("$match", Map.of("status", "active")), + Map.of("$group", Map.of( + "_id", "$status", + "count", Map.of("$sum", 1), + "avgAge", Map.of("$avg", "$age") + )) + ); + + Read task = Read.builder() + .id("test-aggregation") + .type(Read.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .aggregationPipeline(Property.ofValue(pipeline)) + .fetchType(Property.ofValue(FetchType.FETCH)) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of()); + + // Execute the task and verify aggregation pipeline works + assertThat(task.getAggregationPipeline(), is(notNullValue())); + Read.Output output = task.run(runContext); + + assertThat(output, is(notNullValue())); + assertThat(output.getRows(), is(notNullValue())); + assertThat(output.getSize(), greaterThan(0L)); + + // Verify aggregation structure + Map firstResult = output.getRows().get(0); + assertThat(firstResult.containsKey("_id"), equalTo(true)); + assertThat(firstResult.containsKey("count"), equalTo(true)); + } + + @Test + void shouldValidateAllFetchTypes() throws Exception { + for (FetchType fetchType : FetchType.values()) { + Read task = Read.builder() + .id("test-fetchtype-" + fetchType.name()) + .type(Read.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .fetchType(Property.ofValue(fetchType)) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of()); + + // Execute the task and verify all fetch types work + assertThat(task.getFetchType(), is(notNullValue())); + Read.Output output = task.run(runContext); + + assertThat(output, is(notNullValue())); + assertThat(output.getSize(), greaterThan(0L)); + + // Verify output structure based on fetch type + switch (fetchType) { + case FETCH: + assertThat(output.getRows(), is(notNullValue())); + assertThat(output.getRow(), is(nullValue())); + assertThat(output.getUri(), is(nullValue())); + break; + case FETCH_ONE: + assertThat(output.getRow(), is(notNullValue())); + assertThat(output.getRows(), is(nullValue())); + assertThat(output.getUri(), is(nullValue())); + break; + case STORE: + assertThat(output.getUri(), is(notNullValue())); + assertThat(output.getRows(), is(nullValue())); + assertThat(output.getRow(), is(nullValue())); + break; + case NONE: + assertThat(output.getRows(), is(nullValue())); + assertThat(output.getRow(), is(nullValue())); + assertThat(output.getUri(), is(nullValue())); + break; + } + } + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/documentdb/UpdateTest.java b/src/test/java/io/kestra/plugin/documentdb/UpdateTest.java new file mode 100644 index 0000000..cb3c6b0 --- /dev/null +++ b/src/test/java/io/kestra/plugin/documentdb/UpdateTest.java @@ -0,0 +1,237 @@ +package io.kestra.plugin.documentdb; + +import io.kestra.core.models.property.Property; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.utils.TestsUtils; +import io.kestra.core.junit.annotations.KestraTest; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.DisplayName; + +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +@KestraTest +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class UpdateTest { + + // Real connection details for local DocumentDB container + private static final String HOST = "http://localhost:10260"; + private static final String DATABASE = "test_db"; + private static final String COLLECTION = "update_test"; + private static final String USERNAME = "testuser"; + private static final String PASSWORD = "testpass"; + + @Inject + private RunContextFactory runContextFactory; + + @Test + void shouldValidateRequiredProperties() throws Exception { + Update task = Update.builder() + .id("test-update") + .type(Update.class.getName()) + .host(Property.ofValue("https://test-documentdb.com")) + .database(Property.ofValue("testdb")) + .collection(Property.ofValue("testcol")) + .username(Property.ofValue("testuser")) + .password(Property.ofValue("testpass")) + .filter(Property.ofValue(Map.of("name", "Test Document"))) + .update(Property.ofValue(Map.of("$set", Map.of("status", "updated")))) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of()); + + // This should not throw an exception for validation + assertThat(task.getHost(), is(notNullValue())); + assertThat(task.getDatabase(), is(notNullValue())); + assertThat(task.getCollection(), is(notNullValue())); + assertThat(task.getUsername(), is(notNullValue())); + assertThat(task.getPassword(), is(notNullValue())); + assertThat(task.getFilter(), is(notNullValue())); + assertThat(task.getUpdate(), is(notNullValue())); + + // Task should fail with connection error since this unit test uses a non-existent host + // but this validates that the task configuration is valid + try { + task.run(runContext); + throw new AssertionError("Should have thrown exception due to connection failure"); + } catch (Exception e) { + // Expected - connection will fail to the non-existent test host + // This validates the task can be executed with valid properties + assertThat(e.getMessage(), anyOf( + containsString("Connection refused"), + containsString("Failed to update document"), + containsString("Name or service not known"), + containsString("UnknownHostException") + )); + } + } + + @Test + void shouldRejectMissingUpdate() throws Exception { + Update task = Update.builder() + .id("test-missing-update") + .type(Update.class.getName()) + .host(Property.ofValue("https://test-documentdb.com")) + .database(Property.ofValue("testdb")) + .collection(Property.ofValue("testcol")) + .username(Property.ofValue("testuser")) + .password(Property.ofValue("testpass")) + .filter(Property.ofValue(Map.of("name", "Test Document"))) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of()); + + try { + task.run(runContext); + throw new AssertionError("Should have thrown ConstraintViolationException or IllegalArgumentException"); + } catch (Exception e) { + // The validation can happen at framework level (ConstraintViolationException) + // or our custom level (IllegalArgumentException) + assertThat(e.getMessage(), anyOf( + containsString("must not be null"), + containsString("Update operations must be provided") + )); + } + } + + @Test + void shouldValidateUpdateManyFlag() throws Exception { + Update task = Update.builder() + .id("test-update-many") + .type(Update.class.getName()) + .host(Property.ofValue("https://test-documentdb.com")) + .database(Property.ofValue("testdb")) + .collection(Property.ofValue("testcol")) + .username(Property.ofValue("testuser")) + .password(Property.ofValue("testpass")) + .filter(Property.ofValue(Map.of("status", "pending"))) + .update(Property.ofValue(Map.of("$set", Map.of("status", "processed")))) + .updateMany(Property.ofValue(true)) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of()); + + assertThat(task.getUpdateMany(), is(notNullValue())); + + // Task should fail with connection error since this unit test uses a non-existent host + // but this validates that the updateMany flag is properly configured + try { + task.run(runContext); + throw new AssertionError("Should have thrown exception due to connection failure"); + } catch (Exception e) { + // Expected - connection will fail to the non-existent test host + // This validates the task can be executed with updateMany flag + assertThat(e.getMessage(), anyOf( + containsString("Connection refused"), + containsString("Failed to update document"), + containsString("Name or service not known"), + containsString("UnknownHostException") + )); + } + } + + @Test + @Order(10) + @DisplayName("Integration: Update Single Document with Real DocumentDB") + void shouldUpdateSingleDocumentWithRealServer() throws Exception { + // First, insert a document to update + Insert insertTask = Insert.builder() + .id("setup-update-test") + .type(Insert.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .document(Property.ofValue(Map.of( + "_id", "update-test-doc-" + System.currentTimeMillis(), + "name", "Update Test Document", + "status", "pending", + "count", 0 + ))) + .build(); + + RunContext insertContext = TestsUtils.mockRunContext(runContextFactory, insertTask, Map.of()); + Insert.Output insertOutput = insertTask.run(insertContext); + String docId = insertOutput.getInsertedId(); + + // Now update the document + Update updateTask = Update.builder() + .id("update-single-integration-test") + .type(Update.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .filter(Property.ofValue(Map.of("_id", docId))) + .update(Property.ofValue(Map.of( + "$set", Map.of("status", "updated", "updated_at", System.currentTimeMillis()), + "$inc", Map.of("count", 5) + ))) + .updateMany(Property.ofValue(false)) + .build(); + + RunContext updateContext = TestsUtils.mockRunContext(runContextFactory, updateTask, Map.of()); + Update.Output updateOutput = updateTask.run(updateContext); + + assertThat("Should match one document", updateOutput.getMatchedCount(), equalTo(1)); + assertThat("Should modify one document", updateOutput.getModifiedCount(), equalTo(1)); + assertThat("Should not have upserted ID", updateOutput.getUpsertedId(), nullValue()); + } + + @Test + @Order(11) + @DisplayName("Integration: Update Multiple Documents with Real DocumentDB") + void shouldUpdateMultipleDocumentsWithRealServer() throws Exception { + // Insert multiple documents to update + Insert insertTask = Insert.builder() + .id("setup-update-many-test") + .type(Insert.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .documents(Property.ofValue(List.of( + Map.of("_id", "update-many-1-" + System.currentTimeMillis(), "category", "bulk-test", "status", "pending", "priority", 1), + Map.of("_id", "update-many-2-" + System.currentTimeMillis(), "category", "bulk-test", "status", "pending", "priority", 2), + Map.of("_id", "update-many-3-" + System.currentTimeMillis(), "category", "bulk-test", "status", "pending", "priority", 3) + ))) + .build(); + + RunContext insertContext = TestsUtils.mockRunContext(runContextFactory, insertTask, Map.of()); + insertTask.run(insertContext); + + // Now update multiple documents + Update updateTask = Update.builder() + .id("update-many-integration-test") + .type(Update.class.getName()) + .host(Property.ofValue(HOST)) + .database(Property.ofValue(DATABASE)) + .collection(Property.ofValue(COLLECTION)) + .username(Property.ofValue(USERNAME)) + .password(Property.ofValue(PASSWORD)) + .filter(Property.ofValue(Map.of("category", "bulk-test"))) + .update(Property.ofValue(Map.of( + "$set", Map.of("status", "bulk-updated", "batch_update_time", System.currentTimeMillis()) + ))) + .updateMany(Property.ofValue(true)) + .build(); + + RunContext updateContext = TestsUtils.mockRunContext(runContextFactory, updateTask, Map.of()); + Update.Output updateOutput = updateTask.run(updateContext); + + assertThat("Should match multiple documents", updateOutput.getMatchedCount(), greaterThanOrEqualTo(3)); + assertThat("Should modify multiple documents", updateOutput.getModifiedCount(), greaterThanOrEqualTo(3)); + assertThat("Should not have upserted ID", updateOutput.getUpsertedId(), nullValue()); + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/templates/ExampleRunnerTest.java b/src/test/java/io/kestra/plugin/templates/ExampleRunnerTest.java deleted file mode 100644 index e3e6cfa..0000000 --- a/src/test/java/io/kestra/plugin/templates/ExampleRunnerTest.java +++ /dev/null @@ -1,26 +0,0 @@ -package io.kestra.plugin.templates; - -import io.kestra.core.junit.annotations.ExecuteFlow; -import io.kestra.core.junit.annotations.KestraTest; -import org.junit.jupiter.api.Test; -import io.kestra.core.models.executions.Execution; - -import java.util.Map; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; - -/** - * This test will start Kestra, load the flow located in `src/test/resources/flows/example.yaml`, and execute it. - * The Kestra configuration file is in `src/test/resources/application.yml`, it configures the in-memory backend for tests. - */ -@KestraTest(startRunner = true) // This annotation starts an embedded Kestra for tests -class ExampleRunnerTest { - @Test - @ExecuteFlow("flows/example.yaml") - void flow(Execution execution) { - assertThat(execution.getTaskRunList(), hasSize(3)); - assertThat(((Map)execution.getTaskRunList().get(2).getOutputs().get("child")).get("value"), is("task-id")); - } -} diff --git a/src/test/java/io/kestra/plugin/templates/ExampleTest.java b/src/test/java/io/kestra/plugin/templates/ExampleTest.java deleted file mode 100644 index d2e8d38..0000000 --- a/src/test/java/io/kestra/plugin/templates/ExampleTest.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.kestra.plugin.templates; - -import io.kestra.core.junit.annotations.KestraTest; -import io.kestra.core.models.property.Property; -import org.apache.commons.lang3.StringUtils; -import org.junit.jupiter.api.Test; -import io.kestra.core.runners.RunContext; -import io.kestra.core.runners.RunContextFactory; - -import jakarta.inject.Inject; - -import java.util.Map; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; - -/** - * This test will only test the main task, this allow you to send any input - * parameters to your task and test the returning behaviour easily. - */ -@KestraTest -class ExampleTest { - @Inject - private RunContextFactory runContextFactory; - - @Test - void run() throws Exception { - RunContext runContext = runContextFactory.of(Map.of("variable", "John Doe")); - - Example task = Example.builder() - .format(new Property<>("Hello {{ variable }}")) - .build(); - - Example.Output runOutput = task.run(runContext); - - assertThat(runOutput.getChild().getValue(), is(StringUtils.reverse("Hello John Doe"))); - } -}