Skip to content

Latest commit

 

History

History
171 lines (137 loc) · 4.79 KB

File metadata and controls

171 lines (137 loc) · 4.79 KB

Activity Definition

String-based Activity Execution

For calling activities by name (useful for cross-language interop or dynamic activity names):

// Execute activity by string name - suspend function, awaits result
val result = KWorkflow.executeActivity<String>(
    "activityName",
    KActivityOptions(
        startToCloseTimeout = 30.seconds,
        retryOptions = KRetryOptions(
            initialInterval = 1.seconds,
            maximumAttempts = 3
        )
    ),
    arg1, arg2
)

// Parallel execution - use standard coroutineScope { async {} }
val results = coroutineScope {
    val d1 = async { KWorkflow.executeActivity<String>("activity1", options, arg1) }
    val d2 = async { KWorkflow.executeActivity<String>("activity2", options, arg2) }
    awaitAll(d1, d2)  // Returns List<String>
}

Typed Activities

The typed activity API uses direct method references - no stub creation needed. This approach:

  • Provides full compile-time type safety for arguments and return types
  • Allows different options (timeouts, retry policies) per activity call
  • Works with both Kotlin suspend and Java non-suspend activity interfaces
  • Similar to TypeScript and Python SDK patterns
// Define activity interface
@ActivityInterface
interface GreetingActivities {
    @ActivityMethod
    suspend fun composeGreeting(greeting: String, name: String): String

    @ActivityMethod
    suspend fun sendEmail(email: Email): SendResult

    @ActivityMethod
    suspend fun log(message: String)
}

// In workflow - direct method reference, no stub needed
val greeting = KWorkflow.executeActivity(
    GreetingActivities::composeGreeting,  // Direct reference to interface method
    KActivityOptions(startToCloseTimeout = 30.seconds),
    "Hello", "World"
)

// Different activity, different options
val result = KWorkflow.executeActivity(
    GreetingActivities::sendEmail,
    KActivityOptions(
        startToCloseTimeout = 2.minutes,
        retryOptions = KRetryOptions(maximumAttempts = 5)
    ),
    email
)

// Void activities work too
KWorkflow.executeActivity(
    GreetingActivities::log,
    KActivityOptions(startToCloseTimeout = 5.seconds),
    "Processing started"
)

Type Safety

The API uses KFunction reflection to extract method metadata and provides compile-time type checking:

// Compile error! Wrong argument types
KWorkflow.executeActivity(
    GreetingActivities::composeGreeting,
    options,
    123, true  // ✗ Type mismatch: expected String, String
)

Parallel Execution

Use standard coroutineScope { async { } } for concurrent execution:

override suspend fun parallelGreetings(names: List<String>): List<String> = coroutineScope {
    names.map { name ->
        async {
            KWorkflow.executeActivity(
                GreetingActivities::composeGreeting,
                KActivityOptions(startToCloseTimeout = 10.seconds),
                "Hello", name
            )
        }
    }.awaitAll()  // Standard kotlinx.coroutines.awaitAll
}

// Multiple different activities in parallel
val (result1, result2) = coroutineScope {
    val d1 = async { KWorkflow.executeActivity(Activities::operation1, options, arg1) }
    val d2 = async { KWorkflow.executeActivity(Activities::operation2, options, arg2) }
    awaitAll(d1, d2)
}

Note: We use standard Kotlin async instead of a custom startActivity method. The workflow's deterministic dispatcher ensures correct replay behavior.

Java Activity Interoperability

Method references work regardless of whether the activity is defined in Kotlin or Java:

// Java activity interface works seamlessly
// public interface JavaPaymentActivities {
//     PaymentResult processPayment(String orderId, BigDecimal amount);
// }

val result: PaymentResult = KWorkflow.executeActivity(
    JavaPaymentActivities::processPayment,
    KActivityOptions(startToCloseTimeout = 2.minutes),
    orderId, amount
)

Activity Execution API

The KWorkflow object provides type-safe overloads using KFunction types:

object KWorkflow {
    // 1 argument
    suspend fun <T, A1, R> executeActivity(
        activity: KFunction2<T, A1, R>,
        options: KActivityOptions,
        arg1: A1
    ): R

    // 2 arguments
    suspend fun <T, A1, A2, R> executeActivity(
        activity: KFunction3<T, A1, A2, R>,
        options: KActivityOptions,
        arg1: A1, arg2: A2
    ): R

    // ... up to 6 arguments

    // String-based overloads
    suspend inline fun <reified R> executeActivity(
        activityName: String,
        options: KActivityOptions,
        vararg args: Any?
    ): R
}

Related


Next: Activity Implementation