Skip to content

Latest commit

 

History

History
137 lines (107 loc) · 4.42 KB

File metadata and controls

137 lines (107 loc) · 4.42 KB
name transactions-scala
summary ACID multi-document transactions with the Couchbase Scala SDK — cluster.transactions.run, TransactionAttemptContext, ctx.get/insert/replace/remove, SQL++ in transactions, error handling, durability, async transactions
description ACID multi-document transactions with the Couchbase Scala SDK — cluster.transactions.run, TransactionAttemptContext, ctx.get/insert/replace/remove, SQL++ in transactions, error handling, durability, async transactions
compatibility Scala SDK 1.x. Requires Couchbase Server 6.6.1+ or Capella.
metadata
last_verified min_server_version handoff
2026-05
7.0
condition skill
user asks about SQL++ queries
server-querying-scala
condition skill
user asks about CAS, bulk ops, sub-document, or SDK patterns
sdk-patterns-scala
condition skill
user asks about error handling, retries, or exception types
error-handling
condition skill
user asks about connection setup or SDK configuration
server-connection-scala

Transactions — Scala

Transactions are ACID, multi-document, and automatically retried on transient failures. The lambda may run more than once — keep it idempotent and free of side effects.

Version requirement: Distributed transactions require Couchbase Server 7.0+. The 6.6 developer preview is not recommended for production.

Basic Transaction

import com.couchbase.client.scala.transactions._
import com.couchbase.client.scala.json.JsonObject
import scala.util.{Success, Failure}

val inventory = cluster.bucket("travel-sample").scope("inventory")

val result = cluster.transactions.run { ctx =>
  // Insert
  ctx.insert(collection, "doc-a", JsonObject("status" -> "new")).get

  // Get
  val docA = ctx.get(collection, "doc-a").get

  // Replace
  val updated = docA.contentAs[JsonObject].get.put("status", "processed")
  ctx.replace(docA, updated).get

  // Remove
  val docB = ctx.get(collection, "doc-b").get
  ctx.remove(docB).get
}

result match {
  case Success(_)   => println("Transaction committed")
  case Failure(err) => println(s"Transaction failed: $err")
}

Transfer Pattern

val result = cluster.transactions.run { ctx =>
  val from = ctx.get(collection, "account::alice").get
  val to   = ctx.get(collection, "account::bob").get

  val fromDoc = from.contentAs[JsonObject].get
  val toDoc   = to.contentAs[JsonObject].get

  val amount = 100
  require(fromDoc.num("balance").get >= amount, "Insufficient funds")

  ctx.replace(from, fromDoc.put("balance", fromDoc.num("balance").get - amount)).get
  ctx.replace(to,   toDoc.put("balance",   toDoc.num("balance").get + amount)).get
}

SQL++ Inside a Transaction

val result = cluster.transactions.run { ctx =>
  ctx.query(
    s"UPDATE `travel-sample`.`inventory`.`airline` SET status = 'active' WHERE country = $$1",
    TransactionQueryOptions().parameters(QueryParameters.Positional("US"))
  ).get
}

Error Handling

import com.couchbase.client.core.error.transaction.TransactionFailedException

result match {
  case Success(_) => println("Committed")
  case Failure(e: TransactionFailedException) =>
    println(s"Transaction failed after retries: ${e.getMessage}")
    println(s"Cause: ${e.getCause}")
  case Failure(e) => println(s"Unexpected error: $e")
}

Durability

import com.couchbase.client.scala.durability.Durability
import com.couchbase.client.scala.transactions.TransactionOptions

val result = cluster.transactions.run(
  { ctx =>
    ctx.insert(collection, "order::1", JsonObject("status" -> "pending")).get
  },
  TransactionOptions().durabilityLevel(Durability.PersistToMajority)
)

Async Transactions (Reactor)

import reactor.core.scala.publisher.SMono

val mono: SMono[TransactionResult] = cluster.reactive.transactions.run { ctx =>
  ctx.insert(collection.reactive, "doc-async", JsonObject("x" -> 1))
    .`then`(ctx.get(collection.reactive, "doc-async"))
    .flatMap(doc => ctx.replace(doc, JsonObject("x" -> 2)))
    .`then`()
}

mono.subscribe(
  _ => println("Committed"),
  err => println(s"Failed: $err")
)

Design Rules

  • Keep transactions under 1 second — long transactions increase conflict probability
  • SDK retries automatically on transient conflicts — don't add your own retry loop
  • No cross-bucket transactions
  • OnUpdate Eventing fires after commit, not within the transaction