|
| 1 | +import unittest |
| 2 | +import os |
| 3 | +import sys |
| 4 | + |
| 5 | +sys.path.insert( |
| 6 | + 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../")) |
| 7 | +) |
| 8 | + |
| 9 | +from liveanalytics_migration_plugin.liveanalytics_migration_plugin import ( |
| 10 | + HttpStatus, |
| 11 | + create_http_response, |
| 12 | +) |
| 13 | + |
| 14 | + |
| 15 | +class PluginTestCase(unittest.TestCase): |
| 16 | + def test_create_http_response_with_token(self): |
| 17 | + """ |
| 18 | + Tests creating an HTTP response where an InfluxDB v3 API token is included in the message. |
| 19 | + """ |
| 20 | + token = "apiv3_SomeTokenTextThatIsSensitive" |
| 21 | + message = f"The request was successful. This is your token: {token}" |
| 22 | + http_response = create_http_response(HttpStatus.OK, message) |
| 23 | + self.assertEqual( |
| 24 | + http_response["message"], |
| 25 | + "The request was successful. This is your token: *****", |
| 26 | + ) |
| 27 | + |
| 28 | + def test_create_http_response_with_presigned_https_url(self): |
| 29 | + """ |
| 30 | + Tests creating an HTTP response where a presigned HTTPS URL is included in the message. |
| 31 | + """ |
| 32 | + presigned_url = ( |
| 33 | + "https://example.com/someDB/someTable/" |
| 34 | + "results/partition_date%3D23-04-18/data.parquet/done.ack?" |
| 35 | + "AWSAccessKeyId=test_access_key_id" |
| 36 | + "&Signature=test_signature" |
| 37 | + "&x-amz-security-token=test_security_token" |
| 38 | + "&Expires=1770931001" |
| 39 | + ) |
| 40 | + message = ( |
| 41 | + "2026-02-05T21:17:04.215743Z ERROR " |
| 42 | + "influxdb3_py_api::system_py: processing engine: " |
| 43 | + "Error putting done file for someDB/someTable/results" |
| 44 | + "/partition_date=23-04-18/data.parquet: " |
| 45 | + "403 Client Error: Forbidden for url:" |
| 46 | + ) |
| 47 | + http_response = create_http_response( |
| 48 | + HttpStatus.INTERNAL_ERROR, f"{message} {presigned_url}" |
| 49 | + ) |
| 50 | + self.assertEqual(http_response["message"], f"{message} *****") |
| 51 | + |
| 52 | + def test_create_http_response_with_presigned_http_url(self): |
| 53 | + """ |
| 54 | + Tests creating an HTTP response where a presigned HTTP URL is included in the message. |
| 55 | + """ |
| 56 | + presigned_url = ( |
| 57 | + "http://example.com/someDB/someTable/" |
| 58 | + "results/partition_date%3D23-04-18/data.parquet/done.ack?" |
| 59 | + "AWSAccessKeyId=test_access_key_id" |
| 60 | + "&Signature=test_signature" |
| 61 | + "&x-amz-security-token=test_security_token" |
| 62 | + "&Expires=1770931001" |
| 63 | + ) |
| 64 | + message = ( |
| 65 | + "2026-02-05T21:17:04.215743Z ERROR " |
| 66 | + "influxdb3_py_api::system_py: processing engine: " |
| 67 | + "Error putting done file for someDB/someTable/results" |
| 68 | + "/partition_date=23-04-18/data.parquet: " |
| 69 | + "403 Client Error: Forbidden for url:" |
| 70 | + ) |
| 71 | + http_response = create_http_response( |
| 72 | + HttpStatus.INTERNAL_ERROR, f"{message} {presigned_url}" |
| 73 | + ) |
| 74 | + self.assertEqual(http_response["message"], f"{message} *****") |
0 commit comments