Skip to content

Commit 9479cd1

Browse files
committed
Apply scalafmt
1 parent f4028a6 commit 9479cd1

File tree

39 files changed

+244
-359
lines changed

39 files changed

+244
-359
lines changed

amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectionProvider.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -138,17 +138,16 @@ final class AmqpDetailsConnectionProvider private (
138138
factory.setPassword(credentials.password)
139139
}
140140
virtualHost.foreach(factory.setVirtualHost)
141-
sslConfiguration.foreach(sslConfiguration => {
141+
sslConfiguration.foreach { sslConfiguration =>
142142
if (sslConfiguration.protocol.isDefined) {
143143
if (sslConfiguration.trustManager.isDefined)
144144
factory.useSslProtocol(sslConfiguration.protocol.get, sslConfiguration.trustManager.get)
145145
else factory.useSslProtocol(sslConfiguration.protocol.get)
146-
} else if (sslConfiguration.context.isDefined) {
146+
} else if (sslConfiguration.context.isDefined)
147147
factory.useSslProtocol(sslConfiguration.context.get)
148-
} else {
148+
else
149149
factory.useSslProtocol()
150-
}
151-
})
150+
}
152151
requestedHeartbeat.foreach(factory.setRequestedHeartbeat)
153152
connectionTimeout.foreach(factory.setConnectionTimeout)
154153
handshakeTimeout.foreach(factory.setHandshakeTimeout)
@@ -244,9 +243,8 @@ object AmqpCredentials {
244243
final class AmqpSSLConfiguration private (val protocol: Option[String] = None,
245244
val trustManager: Option[TrustManager] = None,
246245
val context: Option[SSLContext] = None) {
247-
if (protocol.isDefined && context.isDefined) {
246+
if (protocol.isDefined && context.isDefined)
248247
throw new IllegalArgumentException("Protocol and context can't be defined in the same AmqpSSLConfiguration.")
249-
}
250248

251249
def withProtocol(protocol: String): AmqpSSLConfiguration =
252250
copy(protocol = Some(protocol))
@@ -419,10 +417,8 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr
419417
throw new ConcurrentModificationException(
420418
"Unexpected concurrent modification while closing the connection.")
421419
}
422-
} else {
423-
if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1)))
424-
releaseRecursive(amqpConnectionProvider, connection)
425-
}
420+
} else if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1)))
421+
releaseRecursive(amqpConnectionProvider, connection)
426422
case Closing => releaseRecursive(amqpConnectionProvider, connection)
427423
}
428424

amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AbstractAmqpAsyncFlowStageLogic.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,8 @@ import scala.concurrent.Promise
153153
if (noAwaitingMessages && exitQueue.isEmpty) {
154154
streamCompletion.success(Done)
155155
super.onUpstreamFinish()
156-
} else {
156+
} else
157157
log.debug("Received upstream finish signal - stage will be closed when all buffered messages are processed")
158-
}
159158

160159
private def publish(message: WriteMessage): DeliveryTag = {
161160
val tag: DeliveryTag = channel.getNextPublishSeqNo
@@ -191,10 +190,9 @@ import scala.concurrent.Promise
191190

192191
override protected def onTimer(timerKey: Any): Unit =
193192
timerKey match {
194-
case tag: DeliveryTag => {
193+
case tag: DeliveryTag =>
195194
log.debug("Received timeout for deliveryTag {}.", tag)
196195
onRejection(tag, multiple = false)
197-
}
198196
case _ => ()
199197
}
200198

@@ -209,4 +207,4 @@ import scala.concurrent.Promise
209207
}
210208

211209
private def isFinished: Boolean = isClosed(in) && noAwaitingMessages && exitQueue.isEmpty
212-
}
210+
}

amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpAsyncFlowStage.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,14 @@ import scala.concurrent.{ Future, Promise }
5858
buffer += (tag -> AwaitingMessage(tag, passThrough))
5959

6060
override def dequeueAwaitingMessages(tag: DeliveryTag, multiple: Boolean): Iterable[AwaitingMessage[T]] =
61-
if (multiple) {
61+
if (multiple)
6262
dequeueWhile((t, _) => t <= tag)
63-
} else {
63+
else {
6464
setReady(tag)
65-
if (isAtHead(tag)) {
65+
if (isAtHead(tag))
6666
dequeueWhile((_, message) => message.ready)
67-
} else {
67+
else
6868
Seq.empty
69-
}
7069
}
7170

7271
private def dequeueWhile(
@@ -88,4 +87,4 @@ import scala.concurrent.{ Future, Promise }
8887

8988
}, streamCompletion.future)
9089
}
91-
}
90+
}

amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpReplyToSinkStage.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,8 @@ private[amqp] final class AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToS
8282
elem.immediate,
8383
elem.properties.orNull,
8484
elem.bytes.toArray)
85-
} else if (settings.failIfReplyToMissing) {
85+
} else if (settings.failIfReplyToMissing)
8686
onFailure(new RuntimeException("Reply-to header was not set"))
87-
}
8887

8988
tryPull(in)
9089
}
@@ -94,4 +93,4 @@ private[amqp] final class AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToS
9493
}
9594

9695
override def toString: String = "AmqpReplyToSink"
97-
}
96+
}

amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
7070
val consumerCallback = getAsyncCallback(handleDelivery)
7171

7272
val commitCallback = getAsyncCallback[AckArguments] {
73-
case AckArguments(deliveryTag, multiple, promise) => {
73+
case AckArguments(deliveryTag, multiple, promise) =>
7474
try {
7575
channel.basicAck(deliveryTag, multiple)
7676
unackedMessages -= 1
@@ -81,10 +81,9 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
8181
} catch {
8282
case e: Throwable => promise.failure(e)
8383
}
84-
}
8584
}
8685
val nackCallback = getAsyncCallback[NackArguments] {
87-
case NackArguments(deliveryTag, multiple, requeue, promise) => {
86+
case NackArguments(deliveryTag, multiple, requeue, promise) =>
8887
try {
8988
channel.basicNack(deliveryTag, multiple, requeue)
9089
unackedMessages -= 1
@@ -95,7 +94,6 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
9594
} catch {
9695
case e: Throwable => promise.failure(e)
9796
}
98-
}
9997
}
10098

10199
val amqpSourceConsumer = new DefaultConsumer(channel) {
@@ -105,7 +103,7 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
105103
body: Array[Byte]): Unit =
106104
consumerCallback.invoke(
107105
new CommittableReadResult {
108-
override val message = ReadResult(ByteString(body), envelope, properties)
106+
override val message: ReadResult = ReadResult(ByteString(body), envelope, properties)
109107

110108
override def ack(multiple: Boolean): Future[Done] = {
111109
val promise = Promise[Done]()
@@ -148,21 +146,19 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
148146
}
149147

150148
def handleDelivery(message: CommittableReadResult): Unit =
151-
if (isAvailable(out)) {
149+
if (isAvailable(out))
152150
pushMessage(message)
153-
} else if (queue.size + 1 > bufferSize) {
151+
else if (queue.size + 1 > bufferSize)
154152
onFailure(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
155-
} else {
153+
else
156154
queue.enqueue(message)
157-
}
158155

159156
setHandler(
160157
out,
161158
new OutHandler {
162159
override def onPull(): Unit =
163-
if (queue.nonEmpty) {
160+
if (queue.nonEmpty)
164161
pushMessage(queue.dequeue())
165-
}
166162

167163
override def onDownstreamFinish(cause: Throwable): Unit = {
168164
setKeepGoing(true)
@@ -207,15 +203,14 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
207203

208204
val expectedResponses: Int = {
209205
val headers = props.getHeaders
210-
if (headers == null) {
206+
if (headers == null)
211207
responsesPerMessage
212-
} else {
208+
else {
213209
val r = headers.get("expectedReplies")
214-
if (r != null) {
210+
if (r != null)
215211
r.asInstanceOf[Int]
216-
} else {
212+
else
217213
responsesPerMessage
218-
}
219214
}
220215
}
221216

@@ -237,4 +232,4 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
237232

238233
override def toString: String = "AmqpRpcFlow"
239234

240-
}
235+
}

amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSourceStage.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
9494
properties: BasicProperties,
9595
body: Array[Byte]): Unit = {
9696
val message = if (ackRequired) {
97-
9897
new CommittableReadResult {
9998
override val message: ReadResult = ReadResult(ByteString(body), envelope, properties)
10099

@@ -155,21 +154,19 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
155154
}
156155

157156
def handleDelivery(message: CommittableReadResult): Unit =
158-
if (isAvailable(out)) {
157+
if (isAvailable(out))
159158
pushMessage(message)
160-
} else if (queue.size + 1 > bufferSize) {
159+
else if (queue.size + 1 > bufferSize)
161160
onFailure(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
162-
} else {
161+
else
163162
queue.enqueue(message)
164-
}
165163

166164
setHandler(
167165
out,
168166
new OutHandler {
169167
override def onPull(): Unit =
170-
if (queue.nonEmpty) {
168+
if (queue.nonEmpty)
171169
pushMessage(queue.dequeue())
172-
}
173170

174171
override def onDownstreamFinish(cause: Throwable): Unit =
175172
if (unackedMessages == 0) super.onDownstreamFinish(cause)

avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@ trait AbstractAvroParquetBase {
3939

4040
val genFinalFile: Gen[String] = for {
4141
fileName <- Gen.alphaLowerStr
42-
} yield {
43-
folder + "/" + fileName + ".parquet"
44-
}
42+
} yield folder + "/" + fileName + ".parquet"
4543

4644
val genFile: Gen[String] = Gen.oneOf(Seq(Gen.alphaLowerStr.sample.get + ".parquet"))
4745

azure-storage-queue/src/main/scala/org/apache/pekko/stream/connectors/azure/storagequeue/impl/AzureQueueSourceStage.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,8 @@ import scala.collection.mutable.Queue
5454
if (res.isEmpty) {
5555
settings.retrieveRetryTimeout match {
5656
case Some(timeout) =>
57-
if (isAvailable(out)) {
57+
if (isAvailable(out))
5858
scheduleOnce(NotUsed, timeout)
59-
}
6059
case None => complete(out)
6160
}
6261
} else {
@@ -69,11 +68,10 @@ import scala.collection.mutable.Queue
6968
out,
7069
new OutHandler {
7170
override def onPull(): Unit =
72-
if (buffer.nonEmpty) {
71+
if (buffer.nonEmpty)
7372
push(out, buffer.dequeue())
74-
} else {
73+
else
7574
retrieveMessages()
76-
}
7775
})
7876
}
7977
}

cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/CqlSessionProvider.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,14 @@ class DefaultSessionProvider(system: ActorSystem, config: Config) extends CqlSes
5454
*/
5555
private def usePekkoDiscovery(config: Config): Boolean = config.getString("service-discovery.name").nonEmpty
5656

57-
override def connect()(implicit ec: ExecutionContext): Future[CqlSession] = {
58-
if (usePekkoDiscovery(config)) {
57+
override def connect()(implicit ec: ExecutionContext): Future[CqlSession] =
58+
if (usePekkoDiscovery(config))
5959
PekkoDiscoverySessionProvider.connect(system, config)
60-
} else {
60+
else {
6161
val driverConfig = CqlSessionProvider.driverConfig(system, config)
6262
val driverConfigLoader = DriverConfigLoaderFromConfig.fromConfig(driverConfig)
6363
CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync().asScala
6464
}
65-
}
6665
}
6766

6867
object CqlSessionProvider {

cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/DriverConfigLoaderFromConfig.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@ class DriverConfigLoaderFromConfig(config: Config) extends DriverConfigLoader {
4141

4242
private val driverConfig: DriverConfig = new TypesafeDriverConfig(config)
4343

44-
override def getInitialConfig: DriverConfig = {
45-
driverConfig
46-
}
44+
override def getInitialConfig: DriverConfig = driverConfig
4745

4846
override def onDriverInit(context: DriverContext): Unit = ()
4947

0 commit comments

Comments
 (0)