Skip to content

Unable to read schema from Karapace Schema registry using Confluent client, the curl command works fine #1108

@karan-versa

Description

@karan-versa

I'm unable to read schema from Karapace Schema registry using Confluent client, the curl command works fine

here is the scala code -

def getSchemaFromRegistry(schemaRegistryUrl: String, subject: String, username: String, password: String): String = {
    println(s"🔍 Attempting connection to: $schemaRegistryUrl")
    println(s"🔍 Using credentials: $username:${password.take(3)}***")
    println(s"🔍 Using Confluent Schema Registry Client v7.9.1")
    
    // Set these properties before creating the client
    System.setProperty("schema.registry.basic.auth.credentials.source", "USER_INFO")
    System.setProperty("schema.registry.basic.auth.user.info", s"$username:$password")
    
    // Create configs with connection settings - updated for 7.9.1
    val configs = new JavaHashMap[String, String]()
    
    // Include ALL possible variations of auth config keys
    configs.put("basic.auth.credentials.source", "USER_INFO")
    configs.put("basic.auth.user.info", s"$username:$password")
    configs.put("schema.registry.basic.auth.credentials.source", "USER_INFO")
    configs.put("schema.registry.basic.auth.user.info", s"$username:$password")
    
    // These newer keys might also be necessary in 7.x
    configs.put("bearer.auth.credentials.source", "USER_INFO")
    configs.put("bearer.auth.token", null)  // Set to null to use basic auth
    
    // Add debug logging
    System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG")
    
    // Create the client with a smaller cache size (just in case)
    val client = new CachedSchemaRegistryClient(
      schemaRegistryUrl,  // URL 
      10,                  // Cache size (smaller)
      configs             // Auth config
    )
    
    try {
      val latestSchema = client.getLatestSchemaMetadata(subject)
      println(s"Retrieved schema from registry - Subject: $subject, ID: ${latestSchema.getId}, Version: ${latestSchema.getVersion}")
      latestSchema.getSchema
    } catch {
      case e: Exception =>
        println(s" Failed to retrieve schema: ${e.getMessage}")
        throw e
    }
  }

def main(args: Array[String]): Unit = {
    // Enable SSL debugging globally
    // println("=== Enabling SSL Debugging ===")
    // System.setProperty("javax.net.debug", "ssl:handshake")
    
    // Add this before creating the client
    System.setProperty("sun.net.http.allowRestrictedHeaders", "true")
    System.setProperty("http.wire.debug", "true")
    System.setProperty("sun.net.www.protocol.http.HttpURLConnection.enableESCWithBasicAuth", "true")
    System.setProperty("https.protocols", "TLSv1.2,TLSv1.3")
    System.setProperty("jdk.tls.client.protocols", "TLSv1.2,TLSv1.3")
    System.setProperty("jsse.enableSNIExtension", "true")
    
 
    val spark = SparkSession.builder
      .appName("Spark Avro Kafka Consumer with Schema Registry")
      .master("local[*]")
      .config("spark.sql.adaptive.enabled", "false")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    // Schema Registry and Kafka config
    val schemaRegistryUrl = "https://karapace-sr.vkp.versa-vani.com"
    val schemaRegistryUsername = "<user>"
    val schemaRegistryPassword = "<pwd>"
    val kafkaBootstrapServers = "bootstrap.vkp.versa-vani.com:443"
    val kafkaTopic = "syslog.ueba-nov.v1.nov.nov"

    println("=== Schema Registry Configuration ===")
    println(s"Schema Registry URL: $schemaRegistryUrl")
    println(s"Topic: $kafkaTopic")
    println("=== SSL Debug Enabled for Let's Encrypt certificate troubleshooting ===")

    // Test with the exact subject from curl output
    val knownSubject = "com.versa.LoadFromFile"


  val avroSchema1 = try {
      println(s"🔍 Trying known subject from curl: $knownSubject")
      getSchemaFromRegistry(schemaRegistryUrl, knownSubject, schemaRegistryUsername, schemaRegistryPassword)
    } catch {
      case e1: Exception =>
        println(s"Subject '$knownSubject' failed: ${e1.getMessage}")
        // IMPORTANT: Return the canonical schema instead of nothing
        getCanonicalSchema(knownSubject)
    }

    println("=== Using Avro Schema ===")
    println(avroSchema1)

when i use the curl command, it works fine -

(base) MacBook-Air:kafka-consumer karanalang$ curl -u <user>:<pwd> https://karapace-sr.vkp.versa-vani.com/subjects/com.versa.LoadFromFile/versions/latest
{"id":2,"schema":"{\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"default\":\"\",\"name\":\"uuid\",\"type\":\"string\"},{\"default\":\"\",\"name\":\"logType\",\"type\":\"string\"},{\"default\":\"\",\"name\":\"description\",\"type\":\"string\"},{\"default\":null,\"name\":\"mstatsType\",\"type\":[\"null\",\"string\"]}],\"name\":\"LoadFromFile\",\"namespace\":\"com.versa\",\"type\":\"record\"}","subject":"com.versa.LoadFromFile","version":2}

what changes do i need to make this work using the Confluent client.

Here is my build.sbt, with the confluent client versions -

// 1. Use the Scala version that matches Spark’s build (Spark 3.x is built with Scala 2.12)
scalaVersion := "2.12.17"

name         := "scala-kafka-consumer"
organization := "com.versa.karapace"
version      := "1.0"

resolvers += "Confluent" at "https://packages.confluent.io/maven/"

// If you hit zstd conflicts, pin one version
dependencyOverrides += "com.github.luben" % "zstd-jni" % "1.5.6-4"

// 2. Mark all of Spark’s own modules as “provided”
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql"              % "3.5.1" % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10"   % "3.5.1" % "provided",
  "org.apache.spark" %% "spark-avro"             % "3.5.1" % "provided",

  // Confluent Schema Registry clients (keep these on compile)
  "io.confluent"     %  "kafka-schema-registry-client" % "7.9.1",
  "io.confluent"     %  "kafka-avro-serializer"        % "7.9.1",

  // Apache Avro library
  "org.apache.avro"  %  "avro"                        % "1.11.3",

  // JSON parsing
  "com.google.code.gson" % "gson"                     % "2.10.1",
  "org.json"          %  "json"                       % "20230618",

  // Simple SLF4J backend for local testing
  "org.slf4j"        %  "slf4j-simple"               % "2.0.9",

  // JUnit for tests
  "junit"            %  "junit"                      % "4.11" % Test
)

// sbt-assembly merge strategies to resolve deduplicate errors
import sbtassembly.AssemblyPlugin.autoImport._
import sbtassembly.PathList

assembly / assemblyMergeStrategy := {
  case PathList("META-INF", xs @ _*) => xs match {
    case Seq("io.netty.versions.properties") => MergeStrategy.first
    case Seq("org", "apache", "logging", "log4j", "core", "config", "plugins", "Log4j2Plugins.dat") => MergeStrategy.concat
    case _ => MergeStrategy.discard
  }
  // Discard all module-info.class files, wherever they appear
  case PathList("module-info.class") => MergeStrategy.discard
  case PathList("META-INF", "versions", _, "module-info.class") => MergeStrategy.discard
  case "arrow-git.properties" => MergeStrategy.first
  case "google/protobuf/any.proto" => MergeStrategy.first
  case "google/protobuf/api.proto" => MergeStrategy.first
  case "google/protobuf/descriptor.proto" => MergeStrategy.first
  case "google/protobuf/duration.proto" => MergeStrategy.first
  case "google/protobuf/empty.proto" => MergeStrategy.first
  case "google/protobuf/field_mask.proto" => MergeStrategy.first
  case "google/protobuf/source_context.proto" => MergeStrategy.first
  case "google/protobuf/struct.proto" => MergeStrategy.first
  case "google/protobuf/timestamp.proto" => MergeStrategy.first
  case "google/protobuf/type.proto" => MergeStrategy.first
  case "google/protobuf/wrappers.proto" => MergeStrategy.first
  case PathList("org", "apache", "commons", "logging", xs @ _*) => MergeStrategy.first
  case x => (assembly / assemblyMergeStrategy).value(x)
}

Here is the error i get -

=== SSL Debug Enabled for Let's Encrypt certificate troubleshooting ===
🔍 Trying known subject from curl: com.versa.LoadFromFile
🔍 Attempting connection to: https://karapace-sr.vkp.versa-vani.com
❌ Failed to retrieve schema: Unauthorized; error code: 0
⚠️ Subject 'com.versa.LoadFromFile' failed: Unauthorized; error code: 0

Note - The Unauthorized error is obtained only with the Confluent client.
I'm able to access the schema registry using the same credentials when i use curl OR using HttpURLConnection.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions