diff --git a/contrib/niwa/logger/writers/netcdf_writer.py b/contrib/niwa/logger/writers/netcdf_writer.py new file mode 100644 index 00000000..70f0253c --- /dev/null +++ b/contrib/niwa/logger/writers/netcdf_writer.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python3 + +import json +import logging +import numbers +import re +import sys + +try: + # requires pip install netCDF4 + from netCDF4 import Dataset +except ImportError: + logging.error('NetCDFWriter requires installation of the netCDF4 package. Please run "pip install netCDF4" and retry.') + +from os.path import dirname, realpath +sys.path.append(dirname(dirname(dirname(realpath(__file__))))) +from logger.utils.das_record import DASRecord # noqa: E402 +from logger.utils import timestamp # noqa: E402 +from logger.writers.writer import Writer # noqa: E402 +from logger.writers.file_writer import FileWriter # noqa: E402 + + +class NetCDFWriter(Writer): + """Write to netCDF files for the specified filebase, with datestamp appended. If filebase + is a : dict, write records to every filebase whose + regex appears in the record. + """ + # Based on the LogfileWriter + def __init__(self, filebase=None, flush=True, + time_format=timestamp.TIME_FORMAT, + rollover_format=timestamp.DATE_FORMAT, + split_char=' ', suffix='', header=None, + header_file=None, + quiet=False): + """Write timestamped records to a filebase. The filebase will + have the current date appended, in keeping with R2R format + recommendations (http://www.rvdata.us/operators/directory). When the + timestamped date on records differs from the rollover_format, create a new file + with the new suffix. + + If filebase is a dict of : pairs, The writer will + attempt to match a in the dict to each record it receives. + It will write the record to the filebase corresponding to the first + string it matches (Note that the order of comparison is not + guaranteed!). If no strings match, the record will be written to the + standalone filebase provided. + + Four formats of records can be written by a NetCDFWriter: + 1. A string prefixed by a timestamp + 2. A DASRecord + 3. A dict that has a 'timestamp' key + 4. A list of any of the above + + ``` + filebase A filebase string to write to or a dict mapping + :. + + flush If True (default), flush after every write() call + + time_format A strftime-compatible string, such as '%Y-%m-%dT%H:%M:%S.%fZ' + used to parse string records; + defaults to whatever's defined in + utils.timestamps.TIME_FORMAT. + + rollover_format A strftime-compatible string, such as '%Y-%m-%d'; + defaults to whatever's defined in + utils.timestamps.DATE_FORMAT. + + split_char Delimiter between timestamp and rest of message + + suffix string to apply to the end of the log filename + + header Add the specified header string to each file. + + header_file Add the content of the specified file to each file. + + quiet If True, don't complain if a record doesn't match + any mapped prefix + ``` + """ + self.filebase = filebase + self.flush = flush + self.time_format = time_format + self.rollover_format = rollover_format + self.split_char = split_char + self.suffix = suffix + self.header = header + self.header_file = header_file + self.quiet = quiet + + # If our filebase is a dict, we're going to be doing our + # fancy pattern->filebase mapping. + self.do_filebase_mapping = isinstance(self.filebase, dict) + + if self.do_filebase_mapping: + # Do our matches faster by precompiling + self.compiled_filebase_map = { + pattern: re.compile(pattern) for pattern in self.filebase + } + self.current_filename = {} + self.writer = {} + + ############################ + def write(self, record): + if record is None: + return + if record == '': + return + + # If we've got a list, hope it's a list of records. Recurse, + # calling write() on each of the list elements in order. + if isinstance(record, list): + for single_record in record: + self.write(single_record) + return + + # Look for the timestamp + if isinstance(record, DASRecord): # If DASRecord or structured dict, + ts = record.timestamp + + elif isinstance(record, dict): + ts = record.get('timestamp', None) + if ts is None: + if not self.quiet: + logging.error('NetCDFWriter.write() - bad timestamp: "%s"', record) + return + + elif isinstance(record, str): # If str, it better begin with time string + #TODO: LW - make this work + try: # Try to extract timestamp from record + time_str = record.split(self.split_char)[0] + ts = timestamp.timestamp(time_str, time_format=self.time_format) + except ValueError: + if not self.quiet: + logging.error('NetCDFWriter.write() - bad timestamp: "%s"', record) + return + else: + if not self.quiet: + logging.error(f'NetCDFWriter received badly formatted record. Must be DASRecord, ' + f'dict, or timestamp-prefixed string. Received: "{record}"') + return + + # Now parse ts into date strings + date_str = timestamp.date_str(ts, date_format=self.rollover_format) + time_str = date_str + self.suffix + logging.debug('NetCDFWriter time_str: %s', time_str) + + # Figure out where we're going to write + if self.do_filebase_mapping: + matched_patterns = [self.write_if_match(record, pattern, time_str) + for pattern in self.filebase] + if True not in matched_patterns: + if not self.quiet: + logging.warning(f'No patterns matched in NetCDFWriter ' + f'options for record "{record}"') + else: + pattern = 'fixed' # just an arbitrary fixed pattern + filename = self.filebase + '-' + time_str + self.write_filename(record, pattern, filename) + + ############################ + def write_if_match(self, record, pattern, time_str): + """If the record matches the pattern, write to the matching filebase.""" + if isinstance(record, DASRecord): # If DASRecord or structured dict, + if record.data_id != pattern: + return None + + elif isinstance(record, dict): + if record.get("data_id") != pattern: + return None + + filebase = self.filebase.get(pattern) + if filebase is None: + logging.error(f'System error: found no filebase matching pattern "{pattern}"!') + return None + + filename = filebase + '-' + time_str + self.write_filename(record, pattern, filename) + return True + + ############################ + def write_filename(self, record, pattern, filename): + """Write record to filename. If it's the first time we're writing to + this filename, create the appropriate FileWriter and insert it into + the map for the relevant pattern.""" + + # Are we currently writing to this file? If not, open/create it. + if not filename == self.current_filename.get(pattern, None): + + logging.info('NetCDFWriter opening new file: %s', filename) + self.current_filename[pattern] = filename + self.writer[pattern] = Dataset(f"{filename}.nc", "w", format="NETCDF4") + + # open in appending mode + self.writer[pattern] = Dataset(f"{filename}.nc", "a", format="NETCDF4") + # Now, if our logic is correct, should *always* have a matching_writer + matching_writer = self.writer.get(pattern) + self.write_data(matching_writer, record) + + + ### + # NetCDF object looks like: + # + # { + # "dimensions": { + # "time": [xxxxxx, yyyyyy, zzzzzz] + # }, + # "variables": { + # "value_1": [1, 2, 3, 4], + # "value_2": [2.2, 2.2, 3.2, 4.2] + # } + # } + # + # + ### + def write_data(self, writer, record): + + dimensions = writer.dimensions + variables = writer.variables + + if not dimensions.get("time"): + # needs to be a time dimension before writing any values + writer.createDimension("time") + # f8 = 64 bit floating point - https://unidata.github.io/netcdf4-python/#variables-in-a-netcdf-file + writer.createVariable("time", "f8", "time") + + time_var = variables.get("time") + + if isinstance(record, DASRecord): # If DASRecord or structured dict, + time_var[len(time_var)] = record.timestamp + fields = record.fields + field_keys = fields.keys() + + elif isinstance(record, dict): + time_var[len(time_var)] = record.get("timestamp") + fields = record.get("fields") + field_keys = fields.keys() + + else: + logging.error(f"Got unsupported record type: {type(record)}") + + + for field in field_keys: + existing_variable = variables.get(field) + value = fields[field] + + if not existing_variable: + writer.createDimension(field) + if isinstance(value, numbers.Number): + # 64 bit floating point + existing_variable = writer.createVariable(field, "f8", field) + else: + # a variable length string field to hold any data type + # using the "time" dimension + existing_variable = writer.createVariable(field, str, field) + + existing_values = len(existing_variable) + + if isinstance(existing_variable.datatype, numbers.Number): + existing_variable[existing_values] = value + else: + existing_variable[existing_values] = str(value) + + writer.close() \ No newline at end of file diff --git a/contrib/niwa/requirements.txt b/contrib/niwa/requirements.txt index 3c7d35c2..9878366e 100644 --- a/contrib/niwa/requirements.txt +++ b/contrib/niwa/requirements.txt @@ -4,4 +4,6 @@ psycopg2-binary django-ipware requests #MQTT onshore connection -paho-mqtt \ No newline at end of file +paho-mqtt +#netcdf output +netCDF4 \ No newline at end of file diff --git a/contrib/niwa/test/test_netcdf_writer.py b/contrib/niwa/test/test_netcdf_writer.py new file mode 100644 index 00000000..1d4b8d34 --- /dev/null +++ b/contrib/niwa/test/test_netcdf_writer.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 + +import logging +import sys +import tempfile +import unittest + +from netCDF4 import Dataset + +from logger.utils import timestamp + +sys.path.append('.') +from logger.utils.das_record import DASRecord # noqa: E402 +from contrib.niwa.logger.writers.netcdf_writer import NetCDFWriter # noqa: E402 + +SAMPLE_DATA_DICT = [ + {'timestamp': 1691410658.0, 'fields': {'F1': 4.26, 'F2': 121736.82}}, + {'timestamp': 1691410659.0, 'fields': {'F1': 5.26, 'F2': 121735.82}}, + {'timestamp': 1691410660.0, 'fields': {'F1': 6.26, 'F2': 121734.82}}, + {'timestamp': 1691410661.0, 'fields': {'F1': 7.26, 'F2': 121733.82}}, +] +SAMPLE_DATA_DICT_STR = """{"timestamp": 1691410658.0, "fields": {"F1": 4.26, "F2": 121736.82}} +{"timestamp": 1691410659.0, "fields": {"F1": 5.26, "F2": 121735.82}} +{"timestamp": 1691410660.0, "fields": {"F1": 6.26, "F2": 121734.82}} +{"timestamp": 1691410661.0, "fields": {"F1": 7.26, "F2": 121733.82}} +""" + +# flake8: noqa: E501 +SAMPLE_DATA_DASRECORD_STR = """{"data_id": "test", "message_type": null, "timestamp": 1691410658.0, "fields": {"F1": 4.26, "F2": 121736.82}, "metadata": {}} +{"data_id": "test", "message_type": null, "timestamp": 1691410659.0, "fields": {"F1": 5.26, "F2": 121735.82}, "metadata": {}} +{"data_id": "test", "message_type": null, "timestamp": 1691410660.0, "fields": {"F1": 6.26, "F2": 121734.82}, "metadata": {}} +{"data_id": "test", "message_type": null, "timestamp": 1691410661.0, "fields": {"F1": 7.26, "F2": 121733.82}, "metadata": {}} +""" + +SAMPLE_DATA_DICT_MONTHLY = [ + {'timestamp': 1691410658.0, 'fields': {'F1': 4.26, 'F2': 131736.82}}, + {'timestamp': 1694413659.0, 'fields': {'F1': 5.26, 'F2': 131735.82}}, + {'timestamp': 1696417660.0, 'fields': {'F1': 6.26, 'F2': 131734.82}}, + {'timestamp': 1699420661.0, 'fields': {'F1': 7.26, 'F2': 131733.82}}, +] + +SAMPLE_DATA_DICT_HOURLY = [ + {'timestamp': 1691410658.0, 'fields': {'F1': 4.26, 'F2': 141736.82}}, + {'timestamp': 1691413659.0, 'fields': {'F1': 5.26, 'F2': 141735.82}}, + {'timestamp': 1691417660.0, 'fields': {'F1': 6.26, 'F2': 141734.82}}, + {'timestamp': 1691420661.0, 'fields': {'F1': 7.26, 'F2': 141733.82}}, +] + + +class TestNetCDFWriter(unittest.TestCase): + + def open_netcdf_file(self, filepath): + return Dataset(f"{filepath}", "r", format="NETCDF4") + + + ############################ + def test_write_dict(self): + with tempfile.TemporaryDirectory() as tmpdirname: + + filebase = tmpdirname + '/logfile' + + writer = NetCDFWriter(filebase) + writer.write(SAMPLE_DATA_DICT) + + with self.open_netcdf_file(filebase + '-2023-08-07.nc') as outfile: + expected_dimensions = ["time", "F1", "F2",] + expected_variables = ["time", "F1", "F2",] + + self.assertEqual(list(outfile.dimensions.keys()), expected_dimensions) + self.assertEqual(list(outfile.variables.keys()), expected_variables) + + record_count = 0 + for record in SAMPLE_DATA_DICT: + self.assertEqual(record["timestamp"], outfile.variables.get("time")[record_count]) + self.assertEqual(record["fields"]["F1"], outfile.variables.get("F1")[record_count]) + self.assertEqual(record["fields"]["F2"], outfile.variables.get("F2")[record_count]) + + record_count+=1 + + ############################ + def test_write_das_record(self): + with tempfile.TemporaryDirectory() as tmpdirname: + filebase = tmpdirname + '/logfile' + + writer = NetCDFWriter(filebase) + for record in SAMPLE_DATA_DICT: + writer.write(DASRecord(timestamp=record['timestamp'], + data_id='test', + fields=record['fields'])) + + with self.open_netcdf_file(filebase + '-2023-08-07.nc') as outfile: + expected_dimensions = ["time", "F1", "F2",] + expected_variables = ["time", "F1", "F2",] + + self.assertEqual(list(outfile.dimensions.keys()), expected_dimensions) + self.assertEqual(list(outfile.variables.keys()), expected_variables) + + record_count = 0 + for record in SAMPLE_DATA_DICT: + self.assertEqual(record["timestamp"], outfile.variables.get("time")[record_count]) + self.assertEqual(record["fields"]["F1"], outfile.variables.get("F1")[record_count]) + self.assertEqual(record["fields"]["F2"], outfile.variables.get("F2")[record_count]) + + record_count+=1 + + + + ############################ + def test_map_write(self): + with tempfile.TemporaryDirectory() as tmpdirname: + filebase = { + 'AAA': tmpdirname + '/logfile_A', + 'BBB': tmpdirname + '/logfile_B', + 'CCC': tmpdirname + '/logfile_C', + } + + expected_values = { + 'AAA': [SAMPLE_DATA_DICT[0], SAMPLE_DATA_DICT[3],], + 'BBB': [SAMPLE_DATA_DICT[1],], + 'CCC': [SAMPLE_DATA_DICT[2],] + } + + writer = NetCDFWriter(filebase=filebase) + + bad_line = 'there is no timestamp here' + with self.assertLogs(logging.getLogger(), logging.ERROR) as cm: + writer.write(bad_line) + error = f'ERROR:root:NetCDFWriter.write() - bad timestamp: "{bad_line}"' + self.assertEqual(cm.output, [error]) + + writer.write(DASRecord(timestamp=SAMPLE_DATA_DICT[0]['timestamp'], + data_id='AAA', + fields=SAMPLE_DATA_DICT[0]['fields'])) + writer.write(DASRecord(timestamp=SAMPLE_DATA_DICT[1]['timestamp'], + data_id='BBB', + fields=SAMPLE_DATA_DICT[1]['fields'])) + writer.write(DASRecord(timestamp=SAMPLE_DATA_DICT[2]['timestamp'], + data_id='CCC', + fields=SAMPLE_DATA_DICT[2]['fields'])) + writer.write(DASRecord(timestamp=SAMPLE_DATA_DICT[3]['timestamp'], + data_id='AAA', + fields=SAMPLE_DATA_DICT[3]['fields'])) + + + for (data_id, filepath) in filebase.items(): + with self.open_netcdf_file(filepath + '-2023-08-07.nc') as outfile: + expected_dimensions = ["time", "F1", "F2",] + expected_variables = ["time", "F1", "F2",] + + self.assertEqual(list(outfile.dimensions.keys()), expected_dimensions) + self.assertEqual(list(outfile.variables.keys()), expected_variables) + + expected_records = expected_values.get(data_id) + + record_count = 0 + for record in expected_records: + self.assertEqual(record["timestamp"], outfile.variables.get("time")[record_count]) + self.assertEqual(record["fields"]["F1"], outfile.variables.get("F1")[record_count]) + self.assertEqual(record["fields"]["F2"], outfile.variables.get("F2")[record_count]) + + record_count+=1 + + + def test_rollover_format_monthly(self): + with tempfile.TemporaryDirectory() as tmpdirname: + + filebase = tmpdirname + '/logfile' + + writer = NetCDFWriter(filebase, rollover_format='%Y-%m') + writer.write(SAMPLE_DATA_DICT_MONTHLY) + + record_count = 0 + for record in SAMPLE_DATA_DICT_MONTHLY: + + with self.open_netcdf_file(filebase + f'-2023-{str(8 +record_count).zfill(2)}.nc') as outfile: + expected_dimensions = ["time", "F1", "F2",] + expected_variables = ["time", "F1", "F2",] + + self.assertEqual(list(outfile.dimensions.keys()), expected_dimensions) + self.assertEqual(list(outfile.variables.keys()), expected_variables) + + self.assertEqual(record["timestamp"], outfile.variables.get("time")[0]) + self.assertEqual(record["fields"]["F1"], outfile.variables.get("F1")[0]) + self.assertEqual(record["fields"]["F2"], outfile.variables.get("F2")[0]) + + record_count+=1 + + + def test_rollover_format_hourly(self): + with tempfile.TemporaryDirectory() as tmpdirname: + + filebase = tmpdirname + '/logfile' + + writer = NetCDFWriter(filebase, rollover_format='%Y-%m-%d_%H00') + writer.write(SAMPLE_DATA_DICT_HOURLY) + + record_count = 0 + for record in SAMPLE_DATA_DICT_HOURLY: + + with self.open_netcdf_file(filebase + f'-2023-08-07_1{2 +record_count}00.nc') as outfile: + expected_dimensions = ["time", "F1", "F2",] + expected_variables = ["time", "F1", "F2",] + + self.assertEqual(list(outfile.dimensions.keys()), expected_dimensions) + self.assertEqual(list(outfile.variables.keys()), expected_variables) + + self.assertEqual(record["timestamp"], outfile.variables.get("time")[0]) + self.assertEqual(record["fields"]["F1"], outfile.variables.get("F1")[0]) + self.assertEqual(record["fields"]["F2"], outfile.variables.get("F2")[0]) + + record_count+=1 + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('-v', '--verbosity', dest='verbosity', + default=0, action='count', + help='Increase output verbosity') + args = parser.parse_args() + + LOGGING_FORMAT = '%(asctime)-15s %(message)s' + logging.basicConfig(format=LOGGING_FORMAT) + + LOG_LEVELS = {0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG} + args.verbosity = min(args.verbosity, max(LOG_LEVELS)) + logging.getLogger().setLevel(LOG_LEVELS[args.verbosity]) + + unittest.main(warnings='ignore')