This project sets up a real-time data pipeline using:
- Pub/Sub for message ingestion
- Dataflow (Apache Beam) for stream processing
- BigQuery for storage and analytics
.
├── publisher.py # Publishes test JSON events to Pub/Sub
├── streaming_pipeline.py # Apache Beam pipeline: Pub/Sub → BigQuery
├── requirements.txt # Python dependencies
└── README.md # Setup instructions
└── setup.sh # Setup GCP environment scripts
- Python 3.7+
- Google Cloud SDK installed (
gcloud init) - Google Cloud account with billing enabled
- Enable the following APIs:
- Pub/Sub
- BigQuery
- Dataflow
- Create or use an existing GCP project
git clone https://github.com/your-username/gcp-dataengineering-mini-projects.git
cd gcp-dataengineering-mini-projects/2.realtime-data-ingestionpip install -r requirements.txtPROJECT_ID="your-project-id"
TOPIC_ID="your-topic-id"
SUBSCRIPTION_ID="your-sub-id"
BQ_DATASET="your_dataset"
BQ_TABLE="your_table"
GCS_BUCKET="your-bucket-name"
REGION="us-central1"
. ./setup.sh
Now your GCP Environment is Ready !!!!
Edit publisher.py to fill in:
project_id
topic_id
Run it:
python publisher.py
NOTE: Open in another terminal and run the dataflow pipeline locally
Edit streaming_pipeline.py to fill in:
project_id = ""
dataset_id = ""
table = ""
subscription_id = ""
use DirectRunner to run locally:
python streaming_pipeline.py
This is for testing purposes only.
Update these options in streaming_pipeline.py:
options.view_as(StandardOptions).runner = "DataflowRunner"
options.view_as(GoogleCloudOptions).project = "your-project-id"
options.view_as(GoogleCloudOptions).region = "your-region"
options.view_as(GoogleCloudOptions).staging_location = "gs://your-bucket-name/staging"
options.view_as(GoogleCloudOptions).temp_location = "gs://your-bucket-name/temp"
Then run:
python streaming_pipeline.py
