Skip to content

DRAFT: Add Kafka support in batch processing #1243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pipelines/batch/Dockerfile-kafka
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM eclipse-temurin:17-jdk-focal

ARG WORK_DIR="/usr/src/Main"
COPY target/batch-bundled.jar ${WORK_DIR}/app.jar
WORKDIR ${WORK_DIR}

ENTRYPOINT java -jar /usr/src/Main/app.jar \
--fhirFetchMode=TOPIC_LISTENER \
--kafkaBootstrapServers=kafka-01:9092 \
--kafkaTopic=2xx \
--kafkaConsumerGroup=fhir-processor \
--outputParquetPath=/tmp/fhir-data/
Empty file.
14 changes: 14 additions & 0 deletions pipelines/batch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,20 @@
<artifactId>${flink.artifact.name}</artifactId>
<version>${beam.version}</version>
</dependency>

<!-- Apache Beam Kafka IO -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>${beam.version}</version>
</dependency>

<!-- Kafka Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version> <!-- Use appropriate version -->
</dependency>
</dependencies>

<build>
Expand Down
9 changes: 9 additions & 0 deletions pipelines/batch/rm_batch_kafka.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

docker service rm fhir-batch-kafka-service
docker service rm spark-thriftserver
docker service rm fhir-views-service

docker volume rm spark_vol_single

sudo rm -r output/*
53 changes: 53 additions & 0 deletions pipelines/batch/run_batch_kafka.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/bin/bash

# Build the Java application (assuming Maven project)
# echo "Building Java application..."
mvn clean package -DskipTests

# Build the Docker image
echo "Building Docker image..."
docker build -f Dockerfile-kafka -t fhir-batch-kafka .

# Build the Docker image for FHIR views
echo "Building FHIR views Docker image..."
docker build -t my-fhir-views ../query

# Create a volume for Spark
docker volume create spark_vol_single

# Run the Spark thrift server
echo "Starting Spark thrift server..."
docker service create \
--name spark-thriftserver \
--network kafka_public \
--mount type=bind,source="$(pwd)/output",target=/dwh \
--mount type=volume,source=spark_vol_single,target=/opt/bitnami/spark \
--publish 10001:10000 \
--publish 4041:4040 \
docker.io/bitnami/spark:3.3 \
sbin/start-thriftserver.sh --driver-memory 5g

# Wait for Spark to be ready
echo "Waiting for Spark thrift server to be ready..."
sleep 30

# Run the FHIR batch container
echo "Running FHIR batch container..."
docker service create \
--name fhir-batch-kafka-service \
--network kafka_public \
--mount type=bind,source="$(pwd)/output",target=/tmp/fhir-data \
fhir-batch-kafka && \

# Run the FHIR views container as a service
echo "Starting FHIR views service..."
docker service create \
--name fhir-views-service \
--network kafka_public \
--publish 10002:8888 \
my-fhir-views

# Show the logs to get the Jupyter token
echo "Waiting for service to start and showing logs..."
sleep 5
docker service logs -f fhir-views-service
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,19 @@
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -285,6 +288,17 @@ static void validateOptions(FhirEtlOptions options) {
&& !Strings.isNullOrEmpty(options.getFhirServerUrl()),
"--fhirDatabaseConfigPath and -fhirServerUrl cannot be empty for OPENMRS_JDBC fetch"
+ " mode");
case TOPIC_LISTENER -> {
Preconditions.checkState(
!Strings.isNullOrEmpty(options.getKafkaBootstrapServers()),
"--kafkaBootstrapServers cannot be empty for TOPIC_LISTENER fetch mode");
Preconditions.checkState(
!Strings.isNullOrEmpty(options.getKafkaTopic()),
"--kafkaTopic cannot be empty for TOPIC_LISTENER fetch mode");
Preconditions.checkState(
!Strings.isNullOrEmpty(options.getKafkaConsumerGroup()),
"--kafkaConsumerGroup cannot be empty for TOPIC_LISTENER fetch mode");
}
}
if (!options.getActivePeriod().isEmpty()) {
Set<String> resourceSet = Sets.newHashSet(options.getResourceList().split(","));
Expand Down Expand Up @@ -476,6 +490,36 @@ private static Map<String, List<String>> validateAndFetchNdjsonFileMappings(
return Maps.newHashMap();
}

private static List<Pipeline> buildTopicListenerPipeline(
FhirEtlOptions options, AvroConversionUtil avroConversionUtil) {

Pipeline pipeline = Pipeline.create(options);

// Read from Kafka topic
PCollection<String> messages =
pipeline
.apply(
"Read From Kafka",
KafkaIO.<String, String>read()
.withBootstrapServers(options.getKafkaBootstrapServers())
.withTopic(options.getKafkaTopic())
.withConsumerConfigUpdates(
Map.of(
"group.id",
options.getKafkaConsumerGroup(),
"auto.offset.reset",
"earliest"))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata())
.apply("Extract Values", Values.create());

// Process the messages
messages.apply("Process FHIR Bundles", ParDo.of(new ProcessFhirBundleFn(options)));

return Arrays.asList(pipeline);
}

/**
* Pipeline builder for fetching resources from a FHIR server. The mode of the pipeline is decided
* based on the given `options`. There are currently four modes in this priority order:
Expand Down Expand Up @@ -510,6 +554,7 @@ static List<Pipeline> setupAndBuildPipelines(
case NDJSON -> buildMultiJsonReadPipeline(options, true);
case FHIR_SEARCH -> buildFhirSearchPipeline(options, avroConversionUtil);
case BULK_EXPORT -> buildBulkExportReadPipeline(options, avroConversionUtil);
case TOPIC_LISTENER -> buildTopicListenerPipeline(options, avroConversionUtil);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,22 @@ public interface FhirEtlOptions extends BasePipelineOptions {
String getSourceNdjsonFilePatternList();

void setSourceNdjsonFilePatternList(String value);

@Description("Kafka bootstrap servers (comma-separated list)")
@Default.String("")
String getKafkaBootstrapServers();

void setKafkaBootstrapServers(String value);

@Description("Kafka topic name to listen for FHIR resources")
@Default.String("")
String getKafkaTopic();

void setKafkaTopic(String value);

@Description("Kafka consumer group ID")
@Default.String("")
String getKafkaConsumerGroup();

void setKafkaConsumerGroup(String value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
* from the source.
*/
public enum FhirFetchMode {
BULK_EXPORT,
FHIR_SEARCH,
HAPI_JDBC,
OPENMRS_JDBC,
PARQUET,
JSON,
NDJSON
NDJSON,
BULK_EXPORT,
HAPI_JDBC,
OPENMRS_JDBC,
TOPIC_LISTENER
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2020-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.fhir.analytics;

import com.cerner.bunsen.exception.ProfileException;
import com.google.common.base.Strings;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.beam.sdk.values.KV;
import org.hl7.fhir.r4.model.Bundle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessFhirBundleFn extends FetchSearchPageFn<String> {
private static final Logger log = LoggerFactory.getLogger(ProcessFhirBundleFn.class);

public ProcessFhirBundleFn(FhirEtlOptions options) {
super(options, "ProcessFhirBundleFn");
}

@ProcessElement
public void processElement(@Element String bundleJson, OutputReceiver<KV<String, Integer>> out)
throws SQLException, ProfileException {
if (Strings.isNullOrEmpty(bundleJson)) {
log.warn("Received empty message from Kafka - skipping");
return;
}

try {
Bundle bundle = (Bundle) parser.parseResource(bundleJson);
if (bundle == null) {
log.warn("Failed to parse message as FHIR Bundle - skipping");
return;
}
processBundle(bundle);
out.output(KV.of("processed", 1));
} catch (Exception e) {
log.error("Error processing FHIR bundle from Kafka: {}", e.getMessage(), e);
}
}

@Teardown
public void teardown() throws IOException {
super.teardown();
}

@Override
public void finishBundle(FinishBundleContext context) {
super.finishBundle(context);
}
}