Skip to content

Commit 484a69b

Browse files
committed
Cleanup code by applying inspections
1 parent 408896b commit 484a69b

File tree

478 files changed

+3035
-3527
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

478 files changed

+3035
-3527
lines changed

amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectionProvider.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -138,17 +138,16 @@ final class AmqpDetailsConnectionProvider private (
138138
factory.setPassword(credentials.password)
139139
}
140140
virtualHost.foreach(factory.setVirtualHost)
141-
sslConfiguration.foreach(sslConfiguration => {
141+
sslConfiguration.foreach { sslConfiguration =>
142142
if (sslConfiguration.protocol.isDefined) {
143143
if (sslConfiguration.trustManager.isDefined)
144144
factory.useSslProtocol(sslConfiguration.protocol.get, sslConfiguration.trustManager.get)
145145
else factory.useSslProtocol(sslConfiguration.protocol.get)
146-
} else if (sslConfiguration.context.isDefined) {
146+
} else if (sslConfiguration.context.isDefined)
147147
factory.useSslProtocol(sslConfiguration.context.get)
148-
} else {
148+
else
149149
factory.useSslProtocol()
150-
}
151-
})
150+
}
152151
requestedHeartbeat.foreach(factory.setRequestedHeartbeat)
153152
connectionTimeout.foreach(factory.setConnectionTimeout)
154153
handshakeTimeout.foreach(factory.setHandshakeTimeout)
@@ -244,9 +243,8 @@ object AmqpCredentials {
244243
final class AmqpSSLConfiguration private (val protocol: Option[String] = None,
245244
val trustManager: Option[TrustManager] = None,
246245
val context: Option[SSLContext] = None) {
247-
if (protocol.isDefined && context.isDefined) {
246+
if (protocol.isDefined && context.isDefined)
248247
throw new IllegalArgumentException("Protocol and context can't be defined in the same AmqpSSLConfiguration.")
249-
}
250248

251249
def withProtocol(protocol: String): AmqpSSLConfiguration =
252250
copy(protocol = Some(protocol))
@@ -419,10 +417,8 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr
419417
throw new ConcurrentModificationException(
420418
"Unexpected concurrent modification while closing the connection.")
421419
}
422-
} else {
423-
if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1)))
424-
releaseRecursive(amqpConnectionProvider, connection)
425-
}
420+
} else if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1)))
421+
releaseRecursive(amqpConnectionProvider, connection)
426422
case Closing => releaseRecursive(amqpConnectionProvider, connection)
427423
}
428424

amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectorSettings.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ final class TemporaryQueueSourceSettings private (
152152
}
153153

154154
object TemporaryQueueSourceSettings {
155-
def apply(connectionProvider: AmqpConnectionProvider, exchange: String) =
155+
def apply(connectionProvider: AmqpConnectionProvider, exchange: String): TemporaryQueueSourceSettings =
156156
new TemporaryQueueSourceSettings(connectionProvider, exchange)
157157

158158
/**

amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AbstractAmqpAsyncFlowStageLogic.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,7 @@ import scala.concurrent.Promise
7272
val callback = getAsyncCallback[(DeliveryTag, Boolean)] {
7373
case (tag: DeliveryTag, multiple: Boolean) => confirmCallback(tag, multiple)
7474
}
75-
new ConfirmCallback { // cant use function literal because it doesn't work with 2.11
76-
override def handle(tag: DeliveryTag, multiple: Boolean): Unit = callback.invoke((tag, multiple))
77-
}
75+
(tag: DeliveryTag, multiple: Boolean) => callback.invoke((tag, multiple))
7876
}
7977

8078
private def onConfirmation(tag: DeliveryTag, multiple: Boolean): Unit = {
@@ -155,9 +153,8 @@ import scala.concurrent.Promise
155153
if (noAwaitingMessages && exitQueue.isEmpty) {
156154
streamCompletion.success(Done)
157155
super.onUpstreamFinish()
158-
} else {
156+
} else
159157
log.debug("Received upstream finish signal - stage will be closed when all buffered messages are processed")
160-
}
161158

162159
private def publish(message: WriteMessage): DeliveryTag = {
163160
val tag: DeliveryTag = channel.getNextPublishSeqNo
@@ -193,10 +190,9 @@ import scala.concurrent.Promise
193190

194191
override protected def onTimer(timerKey: Any): Unit =
195192
timerKey match {
196-
case tag: DeliveryTag => {
193+
case tag: DeliveryTag =>
197194
log.debug("Received timeout for deliveryTag {}.", tag)
198195
onRejection(tag, multiple = false)
199-
}
200196
case _ => ()
201197
}
202198

amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpAsyncFlowStage.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,14 @@ import scala.concurrent.{ Future, Promise }
5858
buffer += (tag -> AwaitingMessage(tag, passThrough))
5959

6060
override def dequeueAwaitingMessages(tag: DeliveryTag, multiple: Boolean): Iterable[AwaitingMessage[T]] =
61-
if (multiple) {
61+
if (multiple)
6262
dequeueWhile((t, _) => t <= tag)
63-
} else {
63+
else {
6464
setReady(tag)
65-
if (isAtHead(tag)) {
65+
if (isAtHead(tag))
6666
dequeueWhile((_, message) => message.ready)
67-
} else {
67+
else
6868
Seq.empty
69-
}
7069
}
7170

7271
private def dequeueWhile(

amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpReplyToSinkStage.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import scala.concurrent.{ Future, Promise }
3131
private[amqp] final class AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToSinkSettings)
3232
extends GraphStageWithMaterializedValue[SinkShape[WriteMessage], Future[Done]] { stage =>
3333

34-
val in = Inlet[WriteMessage]("AmqpReplyToSink.in")
34+
val in: Inlet[WriteMessage] = Inlet[WriteMessage]("AmqpReplyToSink.in")
3535

3636
override def shape: SinkShape[WriteMessage] = SinkShape.of(in)
3737

@@ -82,9 +82,8 @@ private[amqp] final class AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToS
8282
elem.immediate,
8383
elem.properties.orNull,
8484
elem.bytes.toArray)
85-
} else if (settings.failIfReplyToMissing) {
85+
} else if (settings.failIfReplyToMissing)
8686
onFailure(new RuntimeException("Reply-to header was not set"))
87-
}
8887

8988
tryPull(in)
9089
}

amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
4242
extends GraphStageWithMaterializedValue[FlowShape[WriteMessage, CommittableReadResult], Future[String]] {
4343
stage =>
4444

45-
val in = Inlet[WriteMessage]("AmqpRpcFlow.in")
46-
val out = Outlet[CommittableReadResult]("AmqpRpcFlow.out")
45+
val in: Inlet[WriteMessage] = Inlet[WriteMessage]("AmqpRpcFlow.in")
46+
val out: Outlet[CommittableReadResult] = Outlet[CommittableReadResult]("AmqpRpcFlow.out")
4747

4848
override def shape: FlowShape[WriteMessage, CommittableReadResult] = FlowShape.of(in, out)
4949

@@ -70,7 +70,7 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
7070
val consumerCallback = getAsyncCallback(handleDelivery)
7171

7272
val commitCallback = getAsyncCallback[AckArguments] {
73-
case AckArguments(deliveryTag, multiple, promise) => {
73+
case AckArguments(deliveryTag, multiple, promise) =>
7474
try {
7575
channel.basicAck(deliveryTag, multiple)
7676
unackedMessages -= 1
@@ -81,10 +81,9 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
8181
} catch {
8282
case e: Throwable => promise.failure(e)
8383
}
84-
}
8584
}
8685
val nackCallback = getAsyncCallback[NackArguments] {
87-
case NackArguments(deliveryTag, multiple, requeue, promise) => {
86+
case NackArguments(deliveryTag, multiple, requeue, promise) =>
8887
try {
8988
channel.basicNack(deliveryTag, multiple, requeue)
9089
unackedMessages -= 1
@@ -95,7 +94,6 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
9594
} catch {
9695
case e: Throwable => promise.failure(e)
9796
}
98-
}
9997
}
10098

10199
val amqpSourceConsumer = new DefaultConsumer(channel) {
@@ -105,7 +103,7 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
105103
body: Array[Byte]): Unit =
106104
consumerCallback.invoke(
107105
new CommittableReadResult {
108-
override val message = ReadResult(ByteString(body), envelope, properties)
106+
override val message: ReadResult = ReadResult(ByteString(body), envelope, properties)
109107

110108
override def ack(multiple: Boolean): Future[Done] = {
111109
val promise = Promise[Done]()
@@ -148,21 +146,19 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
148146
}
149147

150148
def handleDelivery(message: CommittableReadResult): Unit =
151-
if (isAvailable(out)) {
149+
if (isAvailable(out))
152150
pushMessage(message)
153-
} else if (queue.size + 1 > bufferSize) {
151+
else if (queue.size + 1 > bufferSize)
154152
onFailure(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
155-
} else {
153+
else
156154
queue.enqueue(message)
157-
}
158155

159156
setHandler(
160157
out,
161158
new OutHandler {
162159
override def onPull(): Unit =
163-
if (queue.nonEmpty) {
160+
if (queue.nonEmpty)
164161
pushMessage(queue.dequeue())
165-
}
166162

167163
override def onDownstreamFinish(cause: Throwable): Unit = {
168164
setKeepGoing(true)
@@ -207,15 +203,14 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
207203

208204
val expectedResponses: Int = {
209205
val headers = props.getHeaders
210-
if (headers == null) {
206+
if (headers == null)
211207
responsesPerMessage
212-
} else {
208+
else {
213209
val r = headers.get("expectedReplies")
214-
if (r != null) {
210+
if (r != null)
215211
r.asInstanceOf[Int]
216-
} else {
212+
else
217213
responsesPerMessage
218-
}
219214
}
220215
}
221216

amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSourceStage.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,8 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
9494
properties: BasicProperties,
9595
body: Array[Byte]): Unit = {
9696
val message = if (ackRequired) {
97-
9897
new CommittableReadResult {
99-
override val message = ReadResult(ByteString(body), envelope, properties)
98+
override val message: ReadResult = ReadResult(ByteString(body), envelope, properties)
10099

101100
override def ack(multiple: Boolean): Future[Done] = {
102101
val promise = Promise[Done]()
@@ -155,21 +154,19 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
155154
}
156155

157156
def handleDelivery(message: CommittableReadResult): Unit =
158-
if (isAvailable(out)) {
157+
if (isAvailable(out))
159158
pushMessage(message)
160-
} else if (queue.size + 1 > bufferSize) {
159+
else if (queue.size + 1 > bufferSize)
161160
onFailure(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
162-
} else {
161+
else
163162
queue.enqueue(message)
164-
}
165163

166164
setHandler(
167165
out,
168166
new OutHandler {
169167
override def onPull(): Unit =
170-
if (queue.nonEmpty) {
168+
if (queue.nonEmpty)
171169
pushMessage(queue.dequeue())
172-
}
173170

174171
override def onDownstreamFinish(cause: Throwable): Unit =
175172
if (unackedMessages == 0) super.onDownstreamFinish(cause)

amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class AmqpDocsSpec extends AmqpSpec {
3535

3636
override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds)
3737

38-
val businessLogic: CommittableReadResult => Future[CommittableReadResult] = Future.successful(_)
38+
val businessLogic: CommittableReadResult => Future[CommittableReadResult] = Future.successful
3939

4040
"The AMQP Connectors" should {
4141

@@ -158,7 +158,7 @@ class AmqpDocsSpec extends AmqpSpec {
158158
val mergingFlow = mergedSources
159159
.viaMat(KillSwitches.single)(Keep.right)
160160
.to(Sink.fold(Set.empty[Int]) {
161-
case (seen, (branch, element)) =>
161+
case (seen, (branch, _)) =>
162162
if (seen.size == fanoutSize) completion.trySuccess(Done)
163163
seen + branch
164164
})

amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/AmqpProxyConnection.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,21 @@ import com.rabbitmq.client._
2929
* otherwise undefined
3030
*/
3131
class AmqpProxyConnection(protected val delegate: Connection) extends Connection {
32-
override def getAddress: InetAddress = delegate.getAddress()
32+
override def getAddress: InetAddress = delegate.getAddress
3333

34-
override def getPort: Int = delegate.getPort()
34+
override def getPort: Int = delegate.getPort
3535

36-
override def getChannelMax: Int = delegate.getChannelMax()
36+
override def getChannelMax: Int = delegate.getChannelMax
3737

38-
override def getFrameMax: Int = delegate.getFrameMax()
38+
override def getFrameMax: Int = delegate.getFrameMax
3939

40-
override def getHeartbeat: Int = delegate.getHeartbeat()
40+
override def getHeartbeat: Int = delegate.getHeartbeat
4141

42-
override def getClientProperties: util.Map[String, AnyRef] = delegate.getClientProperties()
42+
override def getClientProperties: util.Map[String, AnyRef] = delegate.getClientProperties
4343

44-
override def getClientProvidedName: String = delegate.getClientProvidedName()
44+
override def getClientProvidedName: String = delegate.getClientProvidedName
4545

46-
override def getServerProperties: util.Map[String, AnyRef] = delegate.getServerProperties()
46+
override def getServerProperties: util.Map[String, AnyRef] = delegate.getServerProperties
4747

4848
override def createChannel(): Channel = delegate.createChannel()
4949

@@ -76,9 +76,9 @@ class AmqpProxyConnection(protected val delegate: Connection) extends Connection
7676

7777
override def clearBlockedListeners(): Unit = delegate.clearBlockedListeners()
7878

79-
override def getExceptionHandler: ExceptionHandler = delegate.getExceptionHandler()
79+
override def getExceptionHandler: ExceptionHandler = delegate.getExceptionHandler
8080

81-
override def getId: String = delegate.getId()
81+
override def getId: String = delegate.getId
8282

8383
override def setId(s: String): Unit = delegate.setId(s)
8484

@@ -88,9 +88,9 @@ class AmqpProxyConnection(protected val delegate: Connection) extends Connection
8888
override def removeShutdownListener(shutdownListener: ShutdownListener): Unit =
8989
delegate.removeShutdownListener(shutdownListener)
9090

91-
override def getCloseReason: ShutdownSignalException = delegate.getCloseReason()
91+
override def getCloseReason: ShutdownSignalException = delegate.getCloseReason
9292

9393
override def notifyListeners(): Unit = delegate.notifyListeners()
9494

95-
override def isOpen: Boolean = delegate.isOpen()
95+
override def isOpen: Boolean = delegate.isOpen
9696
}

amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,9 @@ class AmqpConnectorsSpec extends AmqpSpec {
270270
.take(input.size)
271271
.runWith(Sink.seq)
272272

273-
result.futureValue.map(cm => {
273+
result.futureValue.map { cm =>
274274
noException should be thrownBy cm.ack().futureValue
275-
})
275+
}
276276
}
277277

278278
"not republish message without autoAck(false) if nack is sent" in assertAllStagesStopped {

0 commit comments

Comments
 (0)