This repository was archived by the owner on Mar 3, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathCqlRequestAction.scala
More file actions
105 lines (91 loc) · 4.63 KB
/
CqlRequestAction.scala
File metadata and controls
105 lines (91 loc) · 4.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/*
* Copyright (c) 2018 Datastax Inc.
*
* This software can be used solely with DataStax products. Please consult the file LICENSE.md.
*/
package com.datastax.gatling.plugin.request
import java.lang.Boolean
import java.util.UUID
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit.MICROSECONDS
import akka.actor.ActorSystem
import com.datastax.gatling.plugin.DseProtocol
import com.datastax.gatling.plugin.metrics.MetricsLogger
import com.datastax.gatling.plugin.model.DseCqlAttributes
import com.datastax.gatling.plugin.response.CqlResponseHandler
import com.datastax.gatling.plugin.utils._
import io.gatling.commons.stats.KO
import io.gatling.core.action.{Action, ExitableAction}
import io.gatling.core.session.Session
import io.gatling.core.stats.StatsEngine
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
/**
*
* This class is responsible for executing CQL queries asynchronously against the cluster.
*
* It first starts by delegating everything that is driver related to the DSE plugin router. This is in order to
* free the Gatling `injector` actor as fast as possible.
*
* The plugin router (and its actors) execute the driver code in order to locate the best replica that should receive
* each query, through `DseSession.executeAsync()` or `DseSession.executeGraphAsync()`. A driver I/O thread encodes
* the request and sends it over the wire.
*
* Once the response is received, a driver I/O thread (Netty) decodes the response into a Java Object. It then
* completes the `Future` that was returned by `DseSession.executeAsync()`.
*
* Completing that future results in immediately delegating the latency recording work to the plugin router. That
* work includes recording it in HDR histograms through non-blocking data structures, and forwarding the result to
* other Gatling data writers, like the console reporter.
*/
class CqlRequestAction(val name: String,
val next: Action,
val system: ActorSystem,
val statsEngine: StatsEngine,
val protocol: DseProtocol,
val dseAttributes: DseCqlAttributes,
val metricsLogger: MetricsLogger,
val dseExecutorService: ExecutorService,
val gatlingTimingSource: GatlingTimingSource)
extends ExitableAction {
def execute(session: Session): Unit = {
dseExecutorService.submit(new Runnable {
override def run(): Unit = sendQuery(session)
})
}
def sendQuery(session: Session): Unit = {
val stmt = dseAttributes.statement.buildFromSession(session)
stmt.onFailure(err => {
val responseTime = new ServiceTime(0, 0)
val logUuid = UUID.randomUUID.toString
val tagString = if (session.groupHierarchy.nonEmpty) session.groupHierarchy.mkString("/") + "/" + dseAttributes.tag else dseAttributes.tag
statsEngine.logResponse(session, name, responseTime.toGatlingResponseTimings, KO, None,
Some(s"$tagString - Preparing: ${err.take(50)}"), List(responseTime.latencyIn(MICROSECONDS), "PRE", logUuid))
logger.error("[{}] {} - Preparing: {} - Attrs: {}", logUuid, tagString, err, session.attributes.mkString(","))
next ! session.markAsFailed
})
stmt.onSuccess({ stmt =>
val responseTimeBuilder = ResponseTimeBuilder.newResponseTimeBuilder(session, gatlingTimingSource)
// global options
dseAttributes.cl.map(stmt.setConsistencyLevel)
dseAttributes.userOrRole.map(stmt.executingAs)
dseAttributes.readTimeout.map(stmt.setReadTimeoutMillis)
dseAttributes.idempotent.map(stmt.setIdempotent)
dseAttributes.defaultTimestamp.map(stmt.setDefaultTimestamp)
// CQL Only Options
dseAttributes.outGoingPayload.map(x => stmt.setOutgoingPayload(x.asJava))
dseAttributes.serialCl.map(stmt.setSerialConsistencyLevel)
dseAttributes.retryPolicy.map(stmt.setRetryPolicy)
dseAttributes.fetchSize.map(stmt.setFetchSize)
dseAttributes.pagingState.map(stmt.setPagingState)
if (dseAttributes.enableTrace.isDefined && dseAttributes.enableTrace.get) {
stmt.enableTracing
}
val responseHandler = new CqlResponseHandler(next, session, system, statsEngine, responseTimeBuilder, stmt, dseAttributes, metricsLogger)
implicit val sameThreadExecutionContext: ExecutionContextExecutor = ExecutionContext.fromExecutorService(dseExecutorService)
FutureUtils
.toScalaFuture(protocol.session.executeAsync(stmt))
.onComplete(t => DseRequestActor.recordResult(RecordResult(t, responseHandler)))
})
}
}