The purpose of this demo is to combine different Enterprise Integration Patterns (EIPs) to process a CSV file. This demo uses the Synthea patient dataset 1.
- CSV file: patients.csv downloaded from this link and that looks like the following:
| Id | BIRTHDATE | DEATHDATE | PREFIX | FIRST | LAST | SUFFIX | MAIDEN | MARITAL | RACE | ETHNICITY | GENDER | BIRTHPLACE | ADDRESS | CITY | STATE | COUNTY | ZIP | LAT | LON |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 5605b66b-e92d-c16c-1b83-b8bf7040d51f | 1977-03-19 | Mrs. | Nikita578 | Erdman779 | Leannon79 | M | white | nonhispanic | F | Wakefield Massachusetts US | 510 Little Station Unit 69 | Quincy | Massachusetts | Norfolk County | 02186 | 42.290937381211286 | -70.97550306 |
-
Kafka cluster - Choose one option:
- Use the provided compose file:
podman-compose -f kafka-postgres.compose.yaml up -d - Use Camel testcontainers:
camel infra run kafka
- Use the provided compose file:
-
Postgres database - Choose one option:
- Use the provided compose file (same as above)
- Use Camel testcontainers:
camel infra run postgres
-
Create a table with the following query (for simplification, only id, birthdate and ZIP will be used):
CREATE TABLE patients (
id character(64) NOT null,
birthdate character(10),
ZIP character(5)
);- Kaoto UI: install Kaoto according to the instruction in the installation guide
Security Note: This demo uses the debezium/kafka image in the compose file for ease of local setup. A scan might show vulnerabilities in lz4-java or log4j. These are non-critical for local development as the broker is only exposed on localhost and not intended for production traffic.
-
CSV Ingestion (
route-csv.camel.yaml)- Reads the CSV file from the local directory
- Splits the file into individual patient records
- Routes each record to the filter stage
- Moves the file to done directory once the processing is finished
-
Data Filtering (
filter-publish.camel.yaml)- Filters patients based on ZIP code presence
- Valid records (with ZIP) → routed to database storage and published to Kafka for monitoring
- Invalid records (missing ZIP) → routed to error handling
-
Database Storage (
db-store.camel.yaml)- Stores valid records in Postgres
-
Error Handling (
error-storage.camel.yaml)- Writes invalid records to error files
-
Kafka Monitoring (
kafka-logger.camel.yaml)- Logs all valid records via Kafka consumer
- Place your CSV file in
test-file/directory - Start the infrastructure:
podman-compose -f kafka-postgres.compose.yaml up -d - Update the
application.propertiesfile with the correct credentials for Postgres and Kafka if changed - Create the patients table in Postgres
- Run Kaoto UI and run the routes in the same Camel context by clicking in the play icon next to the name of the folder:
-
Monitor the output:
- The processed CSV file will be moved to
test-file/done/directory - Valid records appear in Postgres
patientstable - Invalid records are written into files in the
errors/directory - All valid records are logged via Kafka consumer
- The processed CSV file will be moved to
-
To stop the infrastructure once the process is finished:
podman-compose -f kafka-postgres.compose.yaml down
Footnotes
-
Jason Walonoski, Mark Kramer, Joseph Nichols, Andre Quina, Chris Moesel, Dylan Hall, Carlton Duffett, Kudakwashe Dube, Thomas Gallagher, Scott McLachlan, Synthea: An approach, method, and software mechanism for generating synthetic patients and the synthetic electronic health care record, Journal of the American Medical Informatics Association, 25(3), 230–238, 2018. ↩
