Skip to content

Commit

Permalink
Merge pull request #1358 from sanger/y24-096-publish-volume-tracking-…
Browse files Browse the repository at this point in the history
…message

Y24-096 publish volume tracking message
  • Loading branch information
seenanair authored Jul 18, 2024
2 parents b9e116d + 268f4bd commit bee2cf7
Show file tree
Hide file tree
Showing 18 changed files with 972 additions and 5 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ git_source(:github) { |repo| "https://github.com/#{repo}.git" }

ruby '3.3.4'

gem 'avro'
gem 'bootsnap', '>= 1.1.0', require: false # Reduces boot times through caching
gem 'bunny'
gem 'exception_notification'
Expand Down Expand Up @@ -42,6 +43,7 @@ group :development, :test do
gem 'simplecov', require: false
gem 'simplecov-lcov', require: false
gem 'sqlite3'
gem 'webmock'
end

gem 'flipper', '~> 0.25.0'
Expand Down
16 changes: 16 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,12 @@ GEM
minitest (>= 5.1)
mutex_m
tzinfo (~> 2.0)
addressable (2.8.7)
public_suffix (>= 2.0.2, < 7.0)
amq-protocol (2.3.2)
ast (2.4.2)
avro (1.11.3)
multi_json (~> 1.0)
base64 (0.2.0)
bigdecimal (3.1.8)
bootsnap (1.18.3)
Expand All @@ -90,6 +94,9 @@ GEM
coderay (1.1.3)
concurrent-ruby (1.3.3)
connection_pool (2.4.1)
crack (1.0.0)
bigdecimal
rexml
crass (1.0.6)
database_cleaner (2.0.2)
database_cleaner-active_record (>= 2, < 3)
Expand Down Expand Up @@ -126,6 +133,7 @@ GEM
sanitize (< 7)
globalid (1.2.1)
activesupport (>= 6.1)
hashdiff (1.1.0)
i18n (1.14.5)
concurrent-ruby (~> 1.0)
io-console (0.7.2)
Expand Down Expand Up @@ -155,6 +163,7 @@ GEM
mini_portile2 (2.8.7)
minitest (5.24.1)
msgpack (1.7.2)
multi_json (1.15.0)
mutex_m (0.2.0)
mysql2 (0.5.6)
net-imap (0.4.12)
Expand All @@ -181,6 +190,7 @@ GEM
pry (>= 0.13.0)
psych (5.1.2)
stringio
public_suffix (6.0.0)
puma (6.4.2)
nio4r (~> 2.0)
racc (1.8.0)
Expand Down Expand Up @@ -314,6 +324,10 @@ GEM
tzinfo (2.0.6)
concurrent-ruby (~> 1.0)
unicode-display_width (2.5.0)
webmock (3.23.1)
addressable (>= 2.8.0)
crack (>= 0.3.2)
hashdiff (>= 0.4.0, < 2.0.0)
webrick (1.8.1)
websocket-driver (0.7.6)
websocket-extensions (>= 0.1.0)
Expand All @@ -325,6 +339,7 @@ PLATFORMS
ruby

DEPENDENCIES
avro
bootsnap (>= 1.1.0)
bunny
byebug
Expand Down Expand Up @@ -354,6 +369,7 @@ DEPENDENCIES
spring
spring-watcher-listen
sqlite3
webmock
yard

RUBY VERSION
Expand Down
33 changes: 30 additions & 3 deletions app/exchanges/data_structure_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,51 @@ def build_children(object, field)
# * [constant] - Takes the constant and applies the method chain
# to it e.g DateTime.now
# * [array] - usually an array of fields
def instance_value(object, field, parent) # rubocop:disable Metrics/MethodLength
# * [self] - applies to the method to the current (builder) object
def instance_value(object, field, parent) # # rubocop:disable Metrics/MethodLength
case field[:type]
when :string
field[:value]
when :model
evaluate_method_chain(object, field[:value].split('.'))
evaluate_field(object, field[:value])
when :parent_model
evaluate_method_chain(parent, field[:value].split('.'))
evaluate_field(parent, field[:value])
when :constant
evaluate_method_chain(field[:value].split('.').first.constantize,
field[:value].split('.')[1..])
when :array
build_children(object, field)
when :self
evaluate_field(self, field[:value])
end
end

def evaluate_field(object, field_value)
if field_value.include?('&.')
evaluate_safe_navigation(object, field_value.split('&.'))
else
evaluate_method_chain(object, field_value.split('.'))
end
end

# we need to do this via try as certain fields may be nil
def evaluate_method_chain(object, chain)
chain.inject(object) { |o, meth| o.try(:send, meth) }
end

def evaluate_safe_navigation(object, chain)
chain.inject(object) do |obj, meth|
break nil unless obj

if meth.include?('.')
evaluate_method_chain(obj, meth.split('.'))
elsif obj.is_a?(Hash)
# Handle both hash and object cases
key = meth.to_sym
obj.key?(key) ? obj[key] : obj[meth.to_s]
else
obj.try(:send, meth)
end
end
end
end
44 changes: 44 additions & 0 deletions app/exchanges/volume_tracking/message_builder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# frozen_string_literal: true

module VolumeTracking
# Message::Message
# Creates a message in the correct structure for the warehouse
class MessageBuilder < Message::Message
# Produces the message in the correct format
# Example:
# {"limsId"=>"Traction",
# "messageCreateDateUtc"=>Mon, 15 Jul 2024 15:16:54.877858000 UTC +00:00,
# "messageUuid"=>"0a62ee15-bbf6-46f0-ba95-01d42622d076",
# "recordedAt"=>Mon, 15 Jul 2024 15:16:54.867713000 UTC +00:00,
# "volume"=>1.5, "concentration"=>10.0, "insertSize"=>100, "aliquotType"=>"primary",
# "limsUuid"=>"", "sourceType"=>"library", "sourceBarcode"=>"TRAC-2-35805",
# "sampleName"=>"Sample1", "usedByBarcode"=>"TRAC-2-35806", "usedByType"=>"pool"}}

def publish_data # rubocop:disable Metrics/MethodLength
# Memoize the data
return @publish_data if defined?(@publish_data)

aliquot = object
data = { source_type: '', source_barcode: '', sample_name: '',
used_by_type: 'nil', used_by_barcode: '', lims_uuid: aliquot.id.to_s || '' }

case aliquot.source_type
when 'Pacbio::Library'
data[:source_type] = 'library'
data[:source_barcode] = aliquot.source.tube.barcode
data[:sample_name] = aliquot.source.sample_name
end

case aliquot.used_by_type
when 'Pacbio::Well'
data[:used_by_type] = 'well'
data[:used_by_barcode] =
"#{aliquot.used_by.plate.sequencing_kit_box_barcode}:#{aliquot.used_by.plate.plate_number}:#{aliquot.used_by.position}" # rubocop:disable Layout/LineLength
when 'Pacbio::Pool'
data[:used_by_type] = 'pool'
data[:used_by_barcode] = aliquot.used_by.tube.barcode
end
@publish_data = data
end
end
end
87 changes: 87 additions & 0 deletions app/messages/emq/encoder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# frozen_string_literal: true

require 'net/http' # Add this line to require Net::HTTP
require 'avro'
require 'fileutils'

module Emq
# This class should be responsible for encoding messages using an Avro schema
# stored in RedPanda registry
class Encoder
attr_reader :schema_config, :validate_obj

# Initialize the validator with the subject, version and registry URL
# @param [String] subject the subject of the schema
# @param [String] version the version of the schema
# @param [String] registry_url the URL of the schema registry
def initialize(subject, version, registry_url)
@subject = subject
@version = version
@registry_url = registry_url
end

# Encode a message using the schema
# @param [Hash] message the message to encode
# @return [String] the encoded message
def encode_message(message) # rubocop:disable Metrics/MethodLength
# Create schema the schema to use for encoding
schema = create_message_schema
begin
schema = Avro::Schema.parse(schema)
rescue Avro::SchemaParseError => e
Rails.logger.error("Schema parsing error: <#{e.message}>. Schema: #{schema}")
raise
end
stream = StringIO.new
writer = Avro::IO::DatumWriter.new(schema)
encoder = Avro::IO::BinaryEncoder.new(stream)
encoder.write("\xC3\x01") # Avro single-object container file header
encoder.write([schema.crc_64_avro_fingerprint].pack('Q')) # 8 byte schema fingerprint
writer.write(message, encoder)
stream.string
rescue StandardError => e
Rails.logger.error("Error validating volume tracking message: <#{e.message}>")
raise
end

private

# Create the message schema
# @return [String] the schema for the message
def create_message_schema
# Prefer to use the cached schema if it exists.
cache_file_path = "data/avro_schema_cache/#{@subject}_v#{@version}.avsc"
if File.exist?(cache_file_path)
Rails.logger.debug { "Using cached schema for #{@subject} v#{@version}" }
return File.read(cache_file_path)
end

# Default to fetching the schema from the registry and caching it.
Rails.logger.debug { "Fetching and caching schema for #{@subject} v#{@version}" }
response = fetch_response("#{@registry_url}#{@subject}/versions/#{@version}")
resp_json = JSON.parse(response.body)
schema_str = resp_json['schema']
# Ensure the directory exists
FileUtils.mkdir_p(File.dirname(cache_file_path))
File.write(cache_file_path, schema_str)
schema_str
end

# Fetch the response from the URL
# @param [String] uri_str the URL to fetch
# @param [Integer] limit the number of redirects to follow
# @return [Net::HTTPResponse] the response
def fetch_response(uri_str, limit = 10)
raise IOError, 'Too many HTTP redirects' if limit == 0

response = Net::HTTP.get_response(URI.parse(uri_str))

case response
when Net::HTTPSuccess then response
when Net::HTTPRedirection then fetch_response(response['location'], limit - 1)
else
response.error!
end
end
end
end
19 changes: 19 additions & 0 deletions app/messages/emq/publisher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true

# This class should be responsible for sending messages to the EMQ which are validated
# against an Avro schema stored in the RedPanda registry before being sent
module Emq::Publisher
# Initialize the publisher with the bunny configuration
def self.publish_job
return @publish_job if defined?(@publish_job)

@publish_job = Emq::PublishingJob.new if Rails.configuration.bunny['enabled']
end

# Publish a message to the EMQ
def self.publish(aliquots, configuration, schema_key)
return if publish_job.nil?

publish_job.publish(aliquots, configuration, schema_key)
end
end
101 changes: 101 additions & 0 deletions app/messages/emq/publishing_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# frozen_string_literal: true

require 'ostruct'

module Emq
# This class should be responsible for publishing messages to the EMQ which are validated
# against an Avro schema stored in the RedPanda registry before being sent
class PublishingJob
attr_reader :bunny_config

# The prefix for the key which contains the version of the Avro schema to use
# by the message builder
AVRO_SCHEMA_VERSION_KEY = 'avro_schema_version_'

# Initialize the publishing job with the bunny configuration
def initialize
# Load the bunny configuration from the Rails configuration and convert it to an OpenStruct
@bunny_config = PublishingJob.deep_open_struct(Rails.configuration.bunny)
end

# Publish a message to the EMQ
# @param [Object] objects the object or objects to publish
# @param [Object] the pipeline configuration to construct
# the message to publish from the given object(s)
# @param [String] schema_key the key of the schema to validate the message against
# Note:-
# The schema_key must exist within the subjects hash of the bunny configuration and
# must also have a matching configuration within the pipeline settings.
# (See the 'volume_tracking' section in config/pipelines/pacbio.yml for reference.)
# Any messages published using publishing_job require a corresponding entry in the
# pipeline configuration, identified by the schema key.
#
def publish(objects, message_config, schema_key) # rubocop:disable Metrics/AbcSize,Metrics/MethodLength
# Check if the schema_key exists in the subjects hash and return early if it does not
schema = bunny_config.amqp.schemas.subjects[schema_key]
return if schema.nil?

# Get the subject and version from the schema and return early if either is nil
subject = bunny_config.amqp.schemas.subjects[schema_key].subject
version = bunny_config.amqp.schemas.subjects[schema_key].version
return if subject.nil? || version.nil?

# Get the message builder configuration for the schema key and version
# and create a message builder class from the configuration
message_builder_config_obj = message_builder_config(message_config, schema_key, version)
if message_builder_config_obj.nil?
Rails.logger.error("Message builder configuration not found for schema key: #{schema_key} and version: #{version}") # rubocop:disable Layout/LineLength
return
end
message_builder_class = message_builder_config_obj.message_class.to_s.constantize

# Create a validator and sender for the subject and version
encoder = Emq::Encoder.new(subject, version, bunny_config.amqp.schemas.registry_url)
sender = Emq::Sender.new(bunny_config.amqp.isg, subject, version)

# Publish each object to the EMQ
Array(objects).each do |object|
# Construct the message to publish from the object using the given configuration
message_object = message_builder_class.new(object:,
configuration: message_builder_config_obj)
.content

# check if the schema_key is present in the payload
next if message_object[schema_key].nil?

# Validate the message against the schema and send it to the EMQ
publish_message = message_object[schema_key]
message = encoder.encode_message(publish_message)
sender.send_message(message)
end
end

# recursively converts a nested hash into an OpenStruct,
# allowing for dot notation access to hash keys and their values.
def self.deep_open_struct(obj)
return obj unless obj.is_a?(Hash)

OpenStruct.new(obj.transform_values { |val| deep_open_struct(val) }) # rubocop:disable Style/OpenStructUse
end

private

# Get the message builder configuration for the schema key and version
# @param [Object] message_config the pipeline configuration to get the message builder
# configuration from
# @param [String] schema_key the key of the schema to get the message builder configuration for
# @param [Integer] version the version of the schema to get the message builder configuration
# @return [OpenStruct | nil] the message builder configuration for the schema key and version
# the builder configuratin should be in the format:

def message_builder_config(message_config, schema_key, version)
children = message_config.public_send(schema_key)&.instance_variable_get(:@children)
return unless children

builder_config = children["#{AVRO_SCHEMA_VERSION_KEY}#{version}"]
return unless builder_config

OpenStruct.new(builder_config) # rubocop:disable Style/OpenStructUse
end
end
end
Loading

0 comments on commit bee2cf7

Please sign in to comment.