Skip to content

Commit e43e8a4

Browse files
authored
Upgrading Pulsar Version to 3.3.2 (#180)
* init * adding jsonignore filter back * updating exception * adding bouncycastle ack * adding back lines
1 parent 0c5130d commit e43e8a4

File tree

4 files changed

+23
-13
lines changed

4 files changed

+23
-13
lines changed

pom.xml

+11-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666

6767
<!-- dependencies -->
6868
<!-- latest version from apache pulsar -->
69-
<pulsar.version>2.10.2</pulsar.version>
69+
<pulsar.version>3.3.2</pulsar.version>
7070
<scala.version>2.12.17</scala.version>
7171
<scala.binary.version>2.12</scala.binary.version>
7272
<scalatest.version>3.2.14</scalatest.version>
@@ -153,6 +153,11 @@
153153
<artifactId>commons-io</artifactId>
154154
<version>${commons-io.version}</version>
155155
</dependency>
156+
<dependency>
157+
<groupId>org.bouncycastle</groupId>
158+
<artifactId>bcprov-jdk18on</artifactId>
159+
<version>1.72</version>
160+
</dependency>
156161

157162
<!-- spark dependency -->
158163

@@ -390,6 +395,7 @@
390395
<include>org.bouncycastle*:*</include>
391396
<include>org.lz4*:*</include>
392397
<include>commons-io:commons-io:jar:*</include>
398+
<include>io.opentelemetry:*</include> <!-- Add this -->
393399
</includes>
394400
</artifactSet>
395401
<filters>
@@ -409,6 +415,10 @@
409415
</filter>
410416
</filters>
411417
<relocations>
418+
<relocation>
419+
<pattern>io.opentelemetry</pattern>
420+
<shadedPattern>org.apache.pulsar.shade.io.opentelemetry</shadedPattern>
421+
</relocation>
412422
<relocation>
413423
<pattern>com.google</pattern>
414424
<shadedPattern>org.apache.pulsar.shade.com.google</shadedPattern>

src/main/scala/org/apache/spark/sql/pulsar/PulsarConfigurationUtils.scala

+2-6
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,8 @@ import java.util.Locale
1818

1919
import scala.reflect._
2020

21-
import org.apache.pulsar.client.impl.conf.{
22-
ClientConfigurationData,
23-
ProducerConfigurationData,
24-
ReaderConfigurationData
25-
}
26-
import org.apache.pulsar.shade.com.fasterxml.jackson.annotation.JsonIgnore
21+
import com.fasterxml.jackson.annotation.JsonIgnore
22+
import org.apache.pulsar.client.impl.conf.{ClientConfigurationData, ProducerConfigurationData, ReaderConfigurationData}
2723

2824
object PulsarConfigurationUtils {
2925

src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,10 @@ private[pulsar] case class PulsarHelper(
309309
private def getTopics(topicsPattern: String): Seq[String] = {
310310
val dest = TopicName.get(topicsPattern)
311311
val allTopics: ju.List[String] = client.getLookup
312-
.getTopicsUnderNamespace(dest.getNamespaceObject, CommandGetTopicsOfNamespace.Mode.ALL)
313-
.get()
312+
.getTopicsUnderNamespace(
313+
// passing an empty topicsHash because we don't cache the GetTopicsResponse
314+
dest.getNamespaceObject, CommandGetTopicsOfNamespace.Mode.ALL, topicsPattern, "")
315+
.get().getTopics
314316

315317
val allNonPartitionedTopics: ju.List[String] = allTopics.asScala
316318
.filter(t => !TopicName.get(t).isPartitioned)
@@ -345,7 +347,9 @@ private[pulsar] case class PulsarHelper(
345347
while (waitList.nonEmpty) {
346348
val topic = waitList.head
347349
try {
348-
client.getPartitionedTopicMetadata(topic).get()
350+
// setting metadataAutoCreationEnabled to false, and useFallbackForNonPIP344Brokers
351+
// to true to conform to non-breaking behavior.
352+
client.getPartitionedTopicMetadata(topic, false, true).get()
349353
waitList -= topic
350354
} catch {
351355
case NonFatal(_) =>

src/test/scala/org/apache/spark/sql/pulsar/PulsarAdmissionControlSuite.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package org.apache.spark.sql.pulsar
33
import java.{util => ju}
44

55
import org.apache.pulsar.client.admin.PulsarAdmin
6+
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException
67
import org.apache.pulsar.client.api.MessageId
7-
import org.apache.pulsar.client.internal.DefaultImplementation
8+
89
import org.apache.spark.sql.pulsar.PulsarSourceUtils.{getEntryId, getLedgerId}
910
import org.apache.spark.sql.streaming.Trigger.{Once, ProcessingTime}
1011
import org.apache.spark.util.Utils
@@ -48,10 +49,9 @@ class PulsarAdmissionControlSuite extends PulsarSourceTest {
4849
// Need to call latestOffsetForTopicPartition so the helper instantiates
4950
// the admin
5051
val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl, conf)
51-
val e = intercept[RuntimeException] {
52+
intercept[NotFoundException] {
5253
admissionControlHelper.latestOffsetForTopicPartition(topic, MessageId.earliest, approxSizeOfInt)
5354
}
54-
assert(e.getMessage.contains("Failed to load config into existing configuration data"))
5555
}
5656

5757
test("Admit entry in the middle of the ledger") {

0 commit comments

Comments
 (0)