-
Notifications
You must be signed in to change notification settings - Fork 66
Ingesting data from S3 into Database
In this example we will use the Versatile Data Kit to develop an ingestion Data Job. This job will download its source data from a CSV file contained in an AWS S3 bucket, read the data and ingest it into a target table from a local SQLite database.
Before you continue, make sure you are familiar with the Getting Started section of the wiki.
You are invited to proceed and run the following example Data Job on your machine to get first-hand experience working with Data Jobs.
The Data Job name is: ingest-from-csv-example-job.
The relevant Data Job code is available here. You can use it as a template and extend it to make a Data Job that fits your use case more closely.
For this example we use a public AWS S3 bucket which can be found here. It is managed by National Oceanic and Atmospheric Administration. The Global Historical Climatology Network daily (GHCNd) is an integrated database of daily climate summaries from land surface stations across the globe. The 1763.csv file we ingest contains data about min/max temperatures for every day of the year 1763.
Downloading data from a public AWS S3 bucket does not require having an AWS account.
If you have not done so already, you can install Versatile Data Kit and the plugins required for this example by running the following command from a terminal:
pip install quickstart-vdkIngestion requires us to set environment variables for:
- the type of database in which we will be ingesting;
- the ingestion method;
- the ingestion target - the location of the target database - if this file is not present, ingestion will create it in the current directory. For this example, we will use vdk-sqlite.db file which will be created in the current directory;
- the file of the default SQLite database against which vdk runs (same value as ingestion target in this case);
export VDK_DB_DEFAULT_TYPE=SQLITE
export VDK_INGEST_METHOD_DEFAULT=sqlite
export VDK_INGEST_TARGET_DEFAULT=vdk-sqlite.db
export VDK_SQLITE_FILE=vdk-sqlite.dbThe structure of our Data Job is as follows:
ingest-from-csv-example-job/
├── 10_drop_table.sql
├── 20_create_table.sql
├── 30_download_and_ingest.py
The purpose of this example is to demonstrate how the user can download a CSV file and then ingest its data to the
target database. Our Data Job ingest-from-csv-example-job uses the public AWS S3 bucket from where it downloads the CSV file and a local SQLite database (vdk-sqlite.db) in which it ingests the data.
ingest-from-csv-example-job consists of two SQL steps and one Python step. Note that VDK allows us the mix Python and SQL steps in whatever order we would prefer.
The reason the step names are prefixed by numbers is that steps are executed in alphabetical order, so it is a good practice to prefix the steps with numbers, which makes their order clear both to Versatile Data Kit and to other users who might read through the Data Job.
10_drop_table.sql
DROP TABLE IF EXISTS noaa_ghcn_data_1763;
20_create_table.sql
CREATE TABLE noaa_ghcn_data_1763 (
StationID INTEGER,
Date NVARCHAR,
Element NVARCHAR,
ElementValue NVARCHAR,
MFlag NVARCHAR,
QFlag NVARCHAR,
SFlag NVARCHAR,
ObsTime NVARCHAR
);
30_ingest_to_table.py
import csv
import boto3 from botocore import UNSIGNED from botocore.client import Config
def run(job_input): s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED)) s3.download_file( Bucket="noaa-ghcn-pds", Key="csv/1763.csv", Filename="1763_data.csv" )
with open("1763_data.csv", encoding="utf-8") as csv_file: csv_reader = csv.reader(csv_file)
job_input.send_tabular_data_for_ingestion( rows=csv_reader, column_names=[ "StationID", "Date", "Element", "ElementValue", "MFlag", "QFlag", "SFlag", "ObsTime", ], destination_table="noaa_ghcn_data_1763", )
- The first step deletes the target table if it exists. This query only serves to make the Data Job repeatable;
- The second step creates the target table we will be inserting data into;
- The third step downloads the CSV file from the public AWS S3 bucket, reads its data and then ingests it into the destination_table in the target database.
To run the Data Job, we navigate to the parent directory of the Job, and run the following command from a terminal:
vdk run ingest-from-csv-example-job/Upon successful completion of the Data Job, we should see a log similar to this:
Result logs
2021-09-02 10:39:13,481=1630568353[VDK] ingest-from-csv-example-job [INFO ] vdk.internal.builtin_plugins.run cli_run.py:66 run_job [OpId:1630568347-4793de-8391d6]- Data Job execution summary: {
"data_job_name": "ingest-from-csv-example-job",
"execution_id": "1630568347-4793de",
"start_time": "2021-09-02T07:39:07.872347",
"end_time": "2021-09-02T07:39:09.001044",
"status": "success",
"steps_list": [
{
"name": "10_drop_table.sql",
"type": "sql",
"start_time": "2021-09-02T07:39:07.872399",
"end_time": "2021-09-02T07:39:07.879354",
"status": "success",
"details": null,
"exception": null
},
{
"name": "20_create_table.sql",
"type": "sql",
"start_time": "2021-09-02T07:39:07.879440",
"end_time": "2021-09-02T07:39:07.885531",
"status": "success",
"details": null,
"exception": null
},
{
"name": "30_download_and_ingest.py",
"type": "python",
"start_time": "2021-09-02T07:39:07.885856",
"end_time": "2021-09-02T07:39:09.000959",
"status": "success",
"details": null,
"exception": null
}
],
"exception": null
}
After running the Data Job, we can check whether the new backup table was populated correctly by using the sqlite-query command afforded to us by the vdk-sqlite plugin,
which we can use to execute queries against the configured SQLite database (VDK_SQLITE_FILE environment variable) without having to set up a Data Job:
vdk sqlite-query -q 'SELECT * FROM noaa_ghcn_data_1763'
We should see an output of many lines similar to these:
----------- -------- ---- --- - -
ITE00100554 17630101 TMAX -36 E
ITE00100554 17630101 TMIN -50 E
ITE00100554 17630102 TMAX -26 E
ITE00100554 17630102 TMIN -40 E
ITE00100554 17630103 TMAX -9 E
ITE00100554 17630103 TMIN -29 E
ITE00100554 17630104 TMAX -4 E
ITE00100554 17630104 TMIN -24 E
.
.
.
----------- -------- ---- --- - -
You can find a list of all Versatile Data Kit examples here.
SDK - Develop Data Jobs
SDK Key Concepts
Control Service - Deploy Data Jobs
Control Service Key Concepts
- Scheduling a Data Job for automatic execution
- Deployment
- Execution
- Production
- Properties and Secrets
Operations UI
Community
Contacts