diff --git a/pipelines/batch/Dockerfile-kafka b/pipelines/batch/Dockerfile-kafka
new file mode 100644
index 000000000..640b8c74d
--- /dev/null
+++ b/pipelines/batch/Dockerfile-kafka
@@ -0,0 +1,12 @@
+FROM eclipse-temurin:17-jdk-focal
+
+ARG WORK_DIR="/usr/src/Main"
+COPY target/batch-bundled.jar ${WORK_DIR}/app.jar
+WORKDIR ${WORK_DIR}
+
+ENTRYPOINT java -jar /usr/src/Main/app.jar \
+ --fhirFetchMode=TOPIC_LISTENER \
+ --kafkaBootstrapServers=kafka-01:9092 \
+ --kafkaTopic=2xx \
+ --kafkaConsumerGroup=fhir-processor \
+ --outputParquetPath=/tmp/fhir-data/
diff --git a/pipelines/batch/output/.gitignore b/pipelines/batch/output/.gitignore
new file mode 100644
index 000000000..e69de29bb
diff --git a/pipelines/batch/pom.xml b/pipelines/batch/pom.xml
index c8315fe21..73b4a099c 100644
--- a/pipelines/batch/pom.xml
+++ b/pipelines/batch/pom.xml
@@ -155,6 +155,20 @@
${flink.artifact.name}
${beam.version}
+
+
+
+ org.apache.beam
+ beam-sdks-java-io-kafka
+ ${beam.version}
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 3.6.1
+
diff --git a/pipelines/batch/rm_batch_kafka.sh b/pipelines/batch/rm_batch_kafka.sh
new file mode 100755
index 000000000..5747d0094
--- /dev/null
+++ b/pipelines/batch/rm_batch_kafka.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+docker service rm fhir-batch-kafka-service
+docker service rm spark-thriftserver
+docker service rm fhir-views-service
+
+docker volume rm spark_vol_single
+
+sudo rm -r output/*
diff --git a/pipelines/batch/run_batch_kafka.sh b/pipelines/batch/run_batch_kafka.sh
new file mode 100755
index 000000000..7d740735d
--- /dev/null
+++ b/pipelines/batch/run_batch_kafka.sh
@@ -0,0 +1,53 @@
+#!/bin/bash
+
+# Build the Java application (assuming Maven project)
+# echo "Building Java application..."
+mvn clean package -DskipTests
+
+# Build the Docker image
+echo "Building Docker image..."
+docker build -f Dockerfile-kafka -t fhir-batch-kafka .
+
+# Build the Docker image for FHIR views
+echo "Building FHIR views Docker image..."
+docker build -t my-fhir-views ../query
+
+# Create a volume for Spark
+docker volume create spark_vol_single
+
+# Run the Spark thrift server
+echo "Starting Spark thrift server..."
+docker service create \
+ --name spark-thriftserver \
+ --network kafka_public \
+ --mount type=bind,source="$(pwd)/output",target=/dwh \
+ --mount type=volume,source=spark_vol_single,target=/opt/bitnami/spark \
+ --publish 10001:10000 \
+ --publish 4041:4040 \
+ docker.io/bitnami/spark:3.3 \
+ sbin/start-thriftserver.sh --driver-memory 5g
+
+# Wait for Spark to be ready
+echo "Waiting for Spark thrift server to be ready..."
+sleep 30
+
+# Run the FHIR batch container
+echo "Running FHIR batch container..."
+docker service create \
+ --name fhir-batch-kafka-service \
+ --network kafka_public \
+ --mount type=bind,source="$(pwd)/output",target=/tmp/fhir-data \
+ fhir-batch-kafka && \
+
+# Run the FHIR views container as a service
+echo "Starting FHIR views service..."
+docker service create \
+ --name fhir-views-service \
+ --network kafka_public \
+ --publish 10002:8888 \
+ my-fhir-views
+
+# Show the logs to get the Jupyter token
+echo "Waiting for service to start and showing logs..."
+sleep 5
+docker service logs -f fhir-views-service
diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtl.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtl.java
index bfc23d1ff..47d4de367 100644
--- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtl.java
+++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtl.java
@@ -46,16 +46,19 @@
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -285,6 +288,17 @@ static void validateOptions(FhirEtlOptions options) {
&& !Strings.isNullOrEmpty(options.getFhirServerUrl()),
"--fhirDatabaseConfigPath and -fhirServerUrl cannot be empty for OPENMRS_JDBC fetch"
+ " mode");
+ case TOPIC_LISTENER -> {
+ Preconditions.checkState(
+ !Strings.isNullOrEmpty(options.getKafkaBootstrapServers()),
+ "--kafkaBootstrapServers cannot be empty for TOPIC_LISTENER fetch mode");
+ Preconditions.checkState(
+ !Strings.isNullOrEmpty(options.getKafkaTopic()),
+ "--kafkaTopic cannot be empty for TOPIC_LISTENER fetch mode");
+ Preconditions.checkState(
+ !Strings.isNullOrEmpty(options.getKafkaConsumerGroup()),
+ "--kafkaConsumerGroup cannot be empty for TOPIC_LISTENER fetch mode");
+ }
}
if (!options.getActivePeriod().isEmpty()) {
Set resourceSet = Sets.newHashSet(options.getResourceList().split(","));
@@ -476,6 +490,36 @@ private static Map> validateAndFetchNdjsonFileMappings(
return Maps.newHashMap();
}
+ private static List buildTopicListenerPipeline(
+ FhirEtlOptions options, AvroConversionUtil avroConversionUtil) {
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ // Read from Kafka topic
+ PCollection messages =
+ pipeline
+ .apply(
+ "Read From Kafka",
+ KafkaIO.read()
+ .withBootstrapServers(options.getKafkaBootstrapServers())
+ .withTopic(options.getKafkaTopic())
+ .withConsumerConfigUpdates(
+ Map.of(
+ "group.id",
+ options.getKafkaConsumerGroup(),
+ "auto.offset.reset",
+ "earliest"))
+ .withKeyDeserializer(StringDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class)
+ .withoutMetadata())
+ .apply("Extract Values", Values.create());
+
+ // Process the messages
+ messages.apply("Process FHIR Bundles", ParDo.of(new ProcessFhirBundleFn(options)));
+
+ return Arrays.asList(pipeline);
+ }
+
/**
* Pipeline builder for fetching resources from a FHIR server. The mode of the pipeline is decided
* based on the given `options`. There are currently four modes in this priority order:
@@ -510,6 +554,7 @@ static List setupAndBuildPipelines(
case NDJSON -> buildMultiJsonReadPipeline(options, true);
case FHIR_SEARCH -> buildFhirSearchPipeline(options, avroConversionUtil);
case BULK_EXPORT -> buildBulkExportReadPipeline(options, avroConversionUtil);
+ case TOPIC_LISTENER -> buildTopicListenerPipeline(options, avroConversionUtil);
};
}
diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java
index 070c53822..73fb7a3dc 100644
--- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java
+++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java
@@ -265,4 +265,22 @@ public interface FhirEtlOptions extends BasePipelineOptions {
String getSourceNdjsonFilePatternList();
void setSourceNdjsonFilePatternList(String value);
+
+ @Description("Kafka bootstrap servers (comma-separated list)")
+ @Default.String("")
+ String getKafkaBootstrapServers();
+
+ void setKafkaBootstrapServers(String value);
+
+ @Description("Kafka topic name to listen for FHIR resources")
+ @Default.String("")
+ String getKafkaTopic();
+
+ void setKafkaTopic(String value);
+
+ @Description("Kafka consumer group ID")
+ @Default.String("")
+ String getKafkaConsumerGroup();
+
+ void setKafkaConsumerGroup(String value);
}
diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirFetchMode.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirFetchMode.java
index f7011a6ef..d27fa766a 100644
--- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirFetchMode.java
+++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirFetchMode.java
@@ -20,11 +20,12 @@
* from the source.
*/
public enum FhirFetchMode {
- BULK_EXPORT,
FHIR_SEARCH,
- HAPI_JDBC,
- OPENMRS_JDBC,
PARQUET,
JSON,
- NDJSON
+ NDJSON,
+ BULK_EXPORT,
+ HAPI_JDBC,
+ OPENMRS_JDBC,
+ TOPIC_LISTENER
}
diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/ProcessFhirBundleFn.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/ProcessFhirBundleFn.java
new file mode 100644
index 000000000..91430260b
--- /dev/null
+++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/ProcessFhirBundleFn.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2020-2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.fhir.analytics;
+
+import com.cerner.bunsen.exception.ProfileException;
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.beam.sdk.values.KV;
+import org.hl7.fhir.r4.model.Bundle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProcessFhirBundleFn extends FetchSearchPageFn {
+ private static final Logger log = LoggerFactory.getLogger(ProcessFhirBundleFn.class);
+
+ public ProcessFhirBundleFn(FhirEtlOptions options) {
+ super(options, "ProcessFhirBundleFn");
+ }
+
+ @ProcessElement
+ public void processElement(@Element String bundleJson, OutputReceiver> out)
+ throws SQLException, ProfileException {
+ if (Strings.isNullOrEmpty(bundleJson)) {
+ log.warn("Received empty message from Kafka - skipping");
+ return;
+ }
+
+ try {
+ Bundle bundle = (Bundle) parser.parseResource(bundleJson);
+ if (bundle == null) {
+ log.warn("Failed to parse message as FHIR Bundle - skipping");
+ return;
+ }
+ processBundle(bundle);
+ out.output(KV.of("processed", 1));
+ } catch (Exception e) {
+ log.error("Error processing FHIR bundle from Kafka: {}", e.getMessage(), e);
+ }
+ }
+
+ @Teardown
+ public void teardown() throws IOException {
+ super.teardown();
+ }
+
+ @Override
+ public void finishBundle(FinishBundleContext context) {
+ super.finishBundle(context);
+ }
+}