Skip to content

add dynamic look up of schema to kafka producer #21170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -2152,6 +2152,7 @@ spec/support/vcr_cassettes/form1010_ezr @department-of-veterans-affairs/vfs-10-1
spec/support/vcr_cassettes/health_quest @department-of-veterans-affairs/vsa-healthcare-health-quest-1-backend @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
spec/support/vcr_cassettes/iam_ssoe_oauth @department-of-veterans-affairs/octo-identity
spec/support/vcr_cassettes/identity @department-of-veterans-affairs/octo-identity
spec/support/vcr_cassettes/kafka @department-of-veterans-affairs/ves-submission-traceability @department-of-veterans-affairs/backend-review-group
spec/support/vcr_cassettes/ihub @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
spec/support/vcr_cassettes/lgy @department-of-veterans-affairs/benefits-non-disability @department-of-veterans-affairs/benefits-non-disability @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
spec/support/vcr_cassettes/lighthouse/auth @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
Expand Down
3 changes: 3 additions & 0 deletions config/features.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2043,6 +2043,9 @@ features:
kafka_producer:
actor_type: cookie_id
description: Enables the Kafka producer for the VA.gov platform
kafka_producer_fetch_schema_dynamically:
actor_type: cookie_id
description: Enables the Kafka producer to fetch schema dynamically
show_about_yellow_ribbon_program:
actor_type: user
description: If enabled, show additional info about the yellow ribbon program
27 changes: 20 additions & 7 deletions lib/kafka/avro_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@

require 'avro'
require 'kafka/producer_manager'
require 'kafka/schema_registry/service'
require 'logger'

module Kafka
class AvroProducer
attr_reader :producer
attr_reader :producer, :registry

def initialize(producer: nil)
@producer = producer || Kafka::ProducerManager.instance.producer
@registry = SchemaRegistry::Service.new
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to stick with the service/configuration setup that other libs use to stick with the vets-api precedent but now is deviating from the singleton setup of the producer manager. There's a lot of http faraday configurations that I think are important to keep consistent throughout vets-api so I'm hesitant to break away from that. We have that ticket to evaluate the use of the singleton on the producerManager, we can use this PR as a place to discuss it or we can kick the can down to whoever picks up that ticket.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so the tricky part about the Kafka::ProducerManager is that we really want to use it like a Singleton in the sense we want to avoid creating multiple instances of the object. However, we want to try to remove the Ruby Singleton pattern and replacing it with something else because singletons tend to be a bit hard to work with. I think we can punt it for now and go into it when we start work on the next ticket.

@schema_id = nil
end

def produce(topic, payload, schema_version: 1)
def produce(topic, payload, schema_version: 'latest')
schema = get_schema(topic, schema_version)
encoded_payload = encode_payload(schema, payload)
producer.produce_sync(topic:, payload: encoded_payload)
Expand All @@ -25,8 +29,16 @@ def produce(topic, payload, schema_version: 1)
private

def get_schema(topic, schema_version)
schema_path = Rails.root.join('lib', 'kafka', 'schemas', "#{topic}-value-#{schema_version}.avsc")
Avro::Schema.parse(File.read(schema_path))
if Flipper.enabled?(:kafka_producer_fetch_schema_dynamically)
response = @registry.subject_version(topic, schema_version)
schema = response['schema']
@schema_id = response['id']
else
schema_path = Rails.root.join('lib', 'kafka', 'schemas', "#{topic}-value-#{schema_version}.avsc")
schema = File.read(schema_path)
end

Avro::Schema.parse(schema)
end

def encode_payload(schema, payload)
Expand All @@ -40,9 +52,10 @@ def encode_payload(schema, payload)

# Add magic byte and schema ID to the payload
magic_byte = [0].pack('C')
# NOTE: This is a placeholder schema ID. In a real-world scenario, this should be fetched from a schema registry
# ID = 5 is the Event Bus schema ID for test schema, replace this with the actual schema ID when running locally
schema_id_bytes = [5].pack('N') # should be schema id
# NOTE: Use fetched schema id from schema registry but if not found,
# ID = 5 is the Event Bus schema ID for test schema.replace this with the actual schema ID when running locally
@schema_id ||= 5
schema_id_bytes = [@schema_id].pack('N') # should be schema id
magic_byte + schema_id_bytes + avro_payload
end

Expand Down
25 changes: 25 additions & 0 deletions lib/kafka/schema_registry/configuration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

require 'common/client/configuration/rest'

module Kafka
module SchemaRegistry
class Configuration < Common::Client::Configuration::REST
def base_path
Settings.kafka_producer.schema_registry_url
end

def self.base_request_headers
super.merge('Content-Type' => 'application/vnd.schemaregistry.v1+json')
end

def connection
Faraday.new(base_path, headers: base_request_headers, request: request_options) do |conn|
conn.use :breakers
conn.use Faraday::Response::RaiseError
conn.adapter Faraday.default_adapter
end
end
end
end
end
103 changes: 103 additions & 0 deletions lib/kafka/schema_registry/service.rb
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're only using the subject_versions method but figured it was good to have the others available for if we ever do need them, this file was mostly copied from the avro_turf gem and adjusted to work with faraday

Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# frozen_string_literal: true

require_relative 'configuration'

module Kafka
module SchemaRegistry
class Service < Common::Client::Base
configuration SchemaRegistry::Configuration

def fetch(id)
data = get("/schemas/ids/#{id}", idempotent: true)
data.fetch('schema')
end

# List all subjects
def subjects
get('/subjects', idempotent: true)
end

# List all versions for a subject
def subject_versions(subject)
get("/subjects/#{subject}/versions", idempotent: true)
end

# Get a specific version for a subject
def subject_version(subject, version = 'latest')
get("/subjects/#{subject}-value/versions/#{version}", idempotent: true)
end

# Get the subject and version for a schema id
def schema_subject_versions(schema_id)
get("/schemas/ids/#{schema_id}/versions", idempotent: true)
end

# Check if a schema exists. Returns nil if not found.
def check(subject, schema)
data = post("/subjects/#{subject}",
expects: [200, 404],
body: { schema: schema.to_s }.to_json,
idempotent: true)
data unless data.key?('error_code')
end

# Check if a schema is compatible with the stored version.
# Returns:
# - true if compatible
# - nil if the subject or version does not exist
# - false if incompatible
# http://docs.confluent.io/3.1.2/schema-registry/docs/api.html#compatibility
def compatible?(subject, schema, version = 'latest')
data = post("/compatibility/subjects/#{subject}/versions/#{version}",
expects: [200, 404], body: { schema: schema.to_s }.to_json, idempotent: true)
data.fetch('is_compatible', false) unless data.key?('error_code')
end

# Check for specific schema compatibility issues
# Returns:
# - nil if the subject or version does not exist
# - a list of compatibility issues
# https://docs.confluent.io/platform/current/schema-registry/develop/api.html#sr-api-compatibility
def compatibility_issues(subject, schema, version = 'latest')
data = post("/compatibility/subjects/#{subject}/versions/#{version}",
expects: [200, 404],
body: { schema: schema.to_s }.to_json, query: { verbose: true }, idempotent: true)

data.fetch('messages', []) unless data.key?('error_code')
end

# Get global config
def global_config
get('/config', idempotent: true)
end

# Get config for subject
def subject_config(subject)
get("/config/#{subject}", idempotent: true)
end

private

def get(path, **)
request(path, method: :get, **)
end

def put(path, **)
request(path, method: :put, **)
end

def post(path, **)
request(path, method: :post, **)
end
Comment on lines +81 to +91
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider just calling request directly in the methods (e.g., subject_versions)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point, this part was copied from the avro_turf gem but it does seem too verbose. I'm got another PR coming soon that will add a validation model to all of this and I'll look into removing these there.


def request(path, method: :get, **options)
options = { expects: 200 }.merge!(options)
response = connection.send(method, path) do |req|
req.headers = options[:headers] if options[:headers]
end

JSON.parse(response.body)
end
end
end
end
54 changes: 44 additions & 10 deletions spec/lib/kafka/avro_producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
describe Kafka::AvroProducer do
let(:avro_producer) { described_class.new }
let(:schema_path) { Rails.root.join('lib', 'kafka', 'schemas', 'submission_trace_mock_dev-value-1.avsc') }
let(:schema) { Avro::Schema.parse(File.read(schema_path)) }
let(:schema_file) { File.read(schema_path) }
let(:schema) { Avro::Schema.parse(schema_file) }
let(:valid_payload) { { 'data' => { 'key' => 'value' } } }
let(:invalid_payload) { { 'invalid_key' => 'value' } }

before do
allow(Rails).to receive(:env).and_return(ActiveSupport::StringInquirer.new('test'))
allow(Flipper).to receive(:enabled?).with(:kafka_producer).and_return(true)
allow(Flipper).to receive(:enabled?).with(:kafka_producer_fetch_schema_dynamically).and_return(true)
allow(Kafka::OauthTokenRefresher).to receive(:new).and_return(double(on_oauthbearer_token_refresh: 'token'))
end

Expand Down Expand Up @@ -44,25 +46,57 @@
end

context 'producing a message successfully' do
let(:topic1_payload_value) { "\x00\x00\x00\x00\x05\x02\x06key\nvalue\x00" }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure the hardcoded and dynamically fetched schema payload get parsed the same


before do
Kafka::ProducerManager.instance.send(:setup_producer)
allow(avro_producer).to receive(:get_schema).and_return(schema)
end

after do
# reset the client after each test
avro_producer.producer.client.reset
end

it 'produces a message to the specified topic' do
avro_producer.produce('topic-1', valid_payload)
avro_producer.produce('topic-1', valid_payload)
avro_producer.produce('topic-2', valid_payload)
context 'with dynamic schema registry retrieval' do
context 'of an existing schema' do
it 'produces a message to the specified topic' do
VCR.use_cassette('kafka/topics') do
avro_producer.produce('topic-1', valid_payload)
avro_producer.produce('topic-2', valid_payload)
expect(avro_producer.producer.client.messages.length).to eq(2)
topic_1_messages = avro_producer.producer.client.messages_for('topic-1')
expect(topic_1_messages.length).to eq(1)
expect(topic_1_messages[0][:payload]).to be_a(String)
expect(topic_1_messages[0][:payload]).to eq(topic1_payload_value)
end
end
end

context 'of an non-existing schema' do
it 'raises approriate error' do
VCR.use_cassette('kafka/topics404') do
expect do
avro_producer.produce('topic-999', valid_payload)
end.to raise_error(Faraday::ResourceNotFound)
end
end
end
end

context 'with hardcoded schema registry retrieval' do
before do
allow(Flipper).to receive(:enabled?).with(:kafka_producer_fetch_schema_dynamically).and_return(false)
allow(File).to receive(:read).and_return(schema_file)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a workaround to a problem i was having where only on the CI pipeline was it not finding the hardcoded schema file in the producer but it can find it in the spec apparently.

end

expect(avro_producer.producer.client.messages.length).to eq(3)
topic_1_messages = avro_producer.producer.client.messages_for('topic-1')
expect(topic_1_messages.length).to eq(2)
expect(topic_1_messages[0][:payload]).to be_a(String)
it 'produces a message to the specified topic' do
avro_producer.produce('submission_trace_mock_dev', valid_payload)
expect(avro_producer.producer.client.messages.length).to eq(1)
topic_1_messages = avro_producer.producer.client.messages_for('submission_trace_mock_dev')
expect(topic_1_messages.length).to eq(1)
expect(topic_1_messages[0][:payload]).to be_a(String)
expect(topic_1_messages[0][:payload]).to eq(topic1_payload_value)
end
end
end

Expand Down
48 changes: 48 additions & 0 deletions spec/lib/kafka/schema_registry/service_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# frozen_string_literal: true

require 'rails_helper'
require 'kafka/schema_registry/service'

describe Kafka::SchemaRegistry::Service do
let(:service) { described_class.new }
let(:subject_name) { 'topic-1' }
let(:version) { '1' }
let(:topic_1_response) do
{ 'subject' => 'topic-1-value',
'version' => 1,
'id' => 5,
'schema' => "{\"type\":\"record\",\"name\":\"TestRecord\",\"namespace\":\"gov.va.eventbus.test.data\"\
,\"fields\":[{\"name\":\"data\",\"type\":{\"type\":\"map\",\"values\":\"string\"},\"default\":{}}]}" }
end

describe '#subject_version' do
context 'when requesting latest version' do
it 'returns schema information' do
VCR.use_cassette('kafka/topics') do
response = service.subject_version(subject_name)
expect(response).to eq(topic_1_response)
end
end
end

context 'when requesting specific version' do
it 'returns schema information for that version' do
VCR.use_cassette('kafka/topics') do
response = service.subject_version(subject_name, version)
expect(response).to eq(topic_1_response)
end
end
end

context 'when subject does not exist' do
let(:nonexistent_subject) { 'topic-999' }

it 'raises a ResourceNotFound error' do
VCR.use_cassette('kafka/topics404') do
expect { service.subject_version(nonexistent_subject) }
.to raise_error(Faraday::ResourceNotFound)
end
end
end
end
end
Loading
Loading