This document provides a comprehensive explanation of the Python script used to process and upload data to the IQAir API. The code is originally designed to read raw data from text files, compute hourly averages, store processing status in a SQLite database, write the aggregated results to CSV, and finally send the CSV data to an external API. This guide explains each component at a granular level and then outlines how to adapt the code to instead retrieve and process data directly from a SQL database (aggregated hourly averages).
- Overview
- Code Structure and Functionality
- Adapting the Code for SQL Database Data
- Testing and Debugging Considerations
- Conclusion
- References
The original code automates the process of:
- Reading raw sensor data from
.txtfiles. - Maintaining a processing state in a SQLite database to avoid duplicate processing.
- Aggregating the data on an hourly basis using Pandas.
- Saving the aggregated data into CSV files.
- Sending the CSV data to an external API using a POST request.
This modular design allows easy tracking of which data rows have been processed and ensures only new data is handled during each iteration of the main loop.
-
setup_database(db_path)- Purpose:
Creates a tableprocessing_statusif it doesn’t exist. This table stores the file name, the last processed raw timestamp, the last processed aggregated (hourly) timestamp, and the last row number that was processed. - Key Concepts:
Uses SQLite to maintain state, ensuring idempotency in data processing.
- Purpose:
-
get_processing_status_for_file(db_path, filename)- Purpose:
Retrieves the processing status for a specific file. If no record is found, it returns default values. - Detail:
The function returns a tuple(last_raw_timestamp, last_avg_timestamp, last_row), wherelast_rowindicates how many lines were already processed.
- Purpose:
-
update_processing_status(db_path, filename, last_raw_timestamp, last_avg_timestamp, last_row)- Purpose:
Updates an existing record or inserts a new record into theprocessing_statustable with the latest processing details. - Mechanism:
It conditionally constructs an SQL query based on the provided non-None parameters.
- Purpose:
get_relevant_txt_files(directory, db_path)- Purpose:
Scans a specified directory for.txtfiles and filters them to include only those files that have new lines (i.e., the current line count exceeds the storedlast_row). - Key Detail:
The function uses the stored processing state to avoid reprocessing data that has already been handled.
- Purpose:
process_file(local_path, output_path, filename, output_filename, db_path)- Purpose:
Processes a single text file by:- Reading the file into a Pandas DataFrame.
- Identifying new rows (based on
last_row). - Parsing date and time columns to create a
date_timecolumn. - Grouping data by hour (ensuring only complete hours are processed by comparing with the current hour).
- Calculating hourly averages for several sensor values (e.g., wind speed, temperature, particulate matter).
- Creating a new CSV DataFrame with a specific structure expected by the API.
- Appending or writing this CSV data to an output file.
- Updating the processing status in the SQLite database.
- Important Details:
- Date Parsing: Uses
pd.to_datetimewith a specified format. - Grouping: Uses the DataFrame's
groupbymethod to compute means for each hour. - CSV Generation: The resulting CSV includes hard-coded fields (e.g., station name, latitude, longitude) along with aggregated sensor values.
- Date Parsing: Uses
- Purpose:
send_csv_data(dataframe, output_filename)- Purpose:
Converts the CSV DataFrame into an in-memory CSV file and sends it to an API endpoint using a POST request. - Mechanism:
Utilizes therequestslibrary to post the data, logs the response status and text, and returns the JSON response. - Error Handling:
Catches exceptions from the POST request and logs appropriate error messages.
- Purpose:
main()Function:- Purpose:
Serves as the orchestrator of the script, running in an infinite loop:- Initializes the database.
- Determines directories for the raw text files and the output CSV files.
- Retrieves the list of relevant text files.
- Processes each file to compute hourly averages and update processing status.
- Sends the generated CSV data to the API.
- Waits for 60 seconds before repeating the process.
- Key Detail:
The loop ensures that the system continuously checks for and processes new data, making it suitable for near-real-time data ingestion.
- Purpose:
To adapt this code to send the same aggregated data but derived directly from a SQL database rather than text files, consider the following steps:
- Eliminate Functions:
Remove or bypass theget_relevant_txt_filesandprocess_filefunctions that depend on reading and parsing.txtfiles. - Rationale:
When the source data is already stored in a SQL database, the need to read external text files is obviated.
-
New Function:
Define a function, e.g.,fetch_hourly_aggregates_from_db(db_connection_params), that:- Connects to your SQL database (could be PostgreSQL, MySQL, or even SQLite, depending on your setup).
- Executes an SQL query that computes the hourly average of the sensor values.
-
Example SQL Query (SQLite syntax):
SELECT strftime('%Y%m%dT%H%M', date_time) || "+0400" AS datetime, 'Fidas Station (ACCESS)' AS name, 24.5254 AS lat, 54.4319 AS lon, AVG("wind speed") AS WV, AVG("wind direction") AS WD, AVG(T) AS TEMP, AVG(rH) AS HUMI, AVG(p) AS PRES, AVG(PM1) AS PM01, AVG("PM2.5") AS PM25, AVG(PM10) AS PM10 FROM sensor_data WHERE date_time < datetime('now', 'localtime', '-1 hour') -- only complete hours GROUP BY strftime('%Y%m%dT%H', date_time);