From 78e16fbebe45acf790fb5fd7ddb8ab56ef6477b0 Mon Sep 17 00:00:00 2001 From: Manuranga Perera Date: Wed, 8 Apr 2026 14:26:36 +0530 Subject: [PATCH] Simplify observability pipeline configs - Remove separate MI analytics log input; read SYNAPSE_ANALYTICS_DATA from carbon log via rewrite_tag + lua parse_mi_analytics instead - Remove general-logs output that double-wrote MI data (Match_regex ^(mi|wso2apim).* also matched rewritten tags mi_app_logs, mi_metrics) - Remove dead parsers: bal_parser, jsonparser, docker (unreferenced) - Remove mi_metrics_json_extract, mi_metrics_parse_json (replaced by lua) - Merge 3 sequential BI lua filters (extract_app_from_path, enrich_bal_logs, construct_bal_app_name) into single process_bal_logs - Add ballerina-metrics-logs-* and mi-metrics-logs-* to index template - Remove unused kubernetes object from index template 312 -> 240 lines fluent-bit.conf 59 -> 28 lines parsers.conf 235 -> 233 lines scripts.lua (net +50 for parse_mi_analytics, -52 for merged/removed) 75 -> 60 lines index-template-request.json --- .../config/fluent-bit/fluent-bit.conf | 113 ++++-------------- .../config/fluent-bit/parsers.conf | 31 ----- .../config/fluent-bit/scripts/scripts.lua | 104 ++++++++-------- .../setup/index-template-request.json | 19 +-- 4 files changed, 74 insertions(+), 193 deletions(-) diff --git a/icp_server/resources/observability/opensearch-observability-dashboard/config/fluent-bit/fluent-bit.conf b/icp_server/resources/observability/opensearch-observability-dashboard/config/fluent-bit/fluent-bit.conf index 7a3ef8bee..eb7bfdaa2 100644 --- a/icp_server/resources/observability/opensearch-observability-dashboard/config/fluent-bit/fluent-bit.conf +++ b/icp_server/resources/observability/opensearch-observability-dashboard/config/fluent-bit/fluent-bit.conf @@ -56,48 +56,14 @@ Read_from_Head On Path_Key log_file_path -# Input from MI analytics log files (synapse-analytics.log) -# Reads structured analytics events written by the Synapse analytics publisher -[INPUT] - Name tail - Path /var/log/mi/${MI_ANALYTICS_LOG_FILE_NAME} - Tag mi_metrics - # Position tracking database (persistent) - DB /fluent-bit/db/tail-mi-metrics.db - DB.locking true - DB.journal_mode WAL - # File rotation handling - Refresh_Interval 5 - Ignore_Older 2h - # Buffer settings - Buffer_Chunk_Size 512KB - Buffer_Max_Size 2MB - Mem_Buf_Limit 256MB - Skip_Long_Lines On - Read_from_Head On - -# Add application name from file path - For Ballerina logs -[FILTER] - Name lua - Match ballerina.* - Script /fluent-bit/scripts/scripts.lua - Call extract_app_from_path - # ========== BALLERINA LOG PROCESSING ========== -# Enrich all Ballerina logs with common fields -[FILTER] - Name lua - Match ballerina.* - Script /fluent-bit/scripts/scripts.lua - Call enrich_bal_logs - -# Construct Ballerina app names +# Extract app name from path, enrich with common fields, construct display name [FILTER] Name lua Match ballerina.* Script /fluent-bit/scripts/scripts.lua - Call construct_bal_app_name + Call process_bal_logs # First: Separate Ballerina metrics logs (those with logger="metrics") [FILTER] @@ -138,51 +104,29 @@ Script /fluent-bit/scripts/scripts.lua Call enrich_mi_logs -# This catches all mi.* logs -[FILTER] - Name rewrite_tag - Match mi.* - Rule $service_type ^MI$ mi_app_logs false - Emitter_Name re_emitted_mi_app_logs - # ========== MI ANALYTICS LOG PROCESSING ========== -# Only keep log lines that contain the analytics prefix written by Synapse -[FILTER] - Name grep - Match mi_metrics - Regex log SYNAPSE_ANALYTICS_DATA - -# Step 1: Extract the raw JSON string from the log line +# Route SYNAPSE_ANALYTICS_DATA lines from carbon log to mi_metrics tag +# Must come before the mi_app_logs rewrite (which drops the original mi.* record) [FILTER] - Name parser - Match mi_metrics - Key_Name log - Parser mi_metrics_json_extract - Reserve_Data Off - -# Step 2: Parse the extracted JSON string into structured fields -# Reserve_Data must be On to preserve icp_runtimeId extracted in Step 1 -[FILTER] - Name parser - Match mi_metrics - Key_Name json_str - Parser mi_metrics_parse_json - Reserve_Data On + Name rewrite_tag + Match mi.* + Rule $message ^SYNAPSE_ANALYTICS_DATA mi_metrics false + Emitter_Name re_emitted_mi_metrics -# Enrich MI metrics records with common metadata +# This catches all remaining mi.* logs [FILTER] - Name lua - Match mi_metrics - Script /fluent-bit/scripts/scripts.lua - Call enrich_mi_metrics + Name rewrite_tag + Match mi.* + Rule $service_type ^MI$ mi_app_logs false + Emitter_Name re_emitted_mi_app_logs -# Route metrics records to a dedicated tag +# Parse analytics JSON and enrich with metadata [FILTER] - Name rewrite_tag - Match mi_metrics - Rule $log_type ^metrics$ mi_metrics_logs false - Emitter_Name re_emitted_mi_metrics + Name lua + Match mi_metrics + Script /fluent-bit/scripts/scripts.lua + Call parse_mi_analytics # ========== DOCUMENT ID GENERATION ========== # Generate document IDs for deduplication @@ -214,7 +158,7 @@ # Generate IDs for MI metrics logs [FILTER] Name lua - Match mi_metrics_logs + Match mi_metrics Script /fluent-bit/scripts/scripts.lua Call generate_mi_metrics_document_id time_as_table true @@ -284,7 +228,7 @@ Logstash_Format On Logstash_DateFormat %Y-%m-%d Logstash_Prefix mi-metrics-logs - Match mi_metrics_logs + Match mi_metrics Buffer_Size 2M Port 9200 Replace_Dots On @@ -294,19 +238,4 @@ tls.verify Off Trace_Error On -# Fallback for any unmatched logs -[OUTPUT] - Name opensearch - Host opensearch - HTTP_User admin - HTTP_Passwd ${OPENSEARCH_INITIAL_ADMIN_PASSWORD} - Logstash_Format On - Logstash_DateFormat %Y-%m-%d - Logstash_Prefix general-logs - Match_regex ^(mi|wso2apim).* - Port 9200 - Replace_Dots On - Suppress_Type_Name On - tls On - tls.verify Off - Trace_Error On + diff --git a/icp_server/resources/observability/opensearch-observability-dashboard/config/fluent-bit/parsers.conf b/icp_server/resources/observability/opensearch-observability-dashboard/config/fluent-bit/parsers.conf index 3aad7b4b4..cc24b7cfb 100644 --- a/icp_server/resources/observability/opensearch-observability-dashboard/config/fluent-bit/parsers.conf +++ b/icp_server/resources/observability/opensearch-observability-dashboard/config/fluent-bit/parsers.conf @@ -9,24 +9,6 @@ Time_Format %Y-%m-%dT%H:%M:%S.%L%z Time_Keep On -[PARSER] - Name bal_parser - Format logfmt - Types response_time_seconds:float - -[PARSER] - Name jsonparser - Format json - Time_Key time - Time_Keep On - -[PARSER] - Name docker - Format json - Time_Key time - Time_Format %Y-%m-%dT%H:%M:%S.%L - Time_Keep On - # MI (Micro Integrator) log parser [PARSER] Name mi_log_parser @@ -44,16 +26,3 @@ Rule "start_state" "^\[[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}:[\d]{2}" "cont" Rule "cont" "^(?!\[[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}:[\d]{2})" "cont" -# MI Analytics log parsers (for synapse-analytics.log) -# Step 1: Extract the JSON string that follows SYNAPSE_ANALYTICS_DATA in the log line -# Also captures the optional [icp.runtimeId=...] suffix appended by the MI runtime -[PARSER] - Name mi_metrics_json_extract - Format regex - Regex SYNAPSE_ANALYTICS_DATA\s+(?\{.*\})(?:\s+\[icp\.runtimeId=(?[^\]]+)\])? - -# Step 2: Parse the extracted JSON string into structured fields -[PARSER] - Name mi_metrics_parse_json - Format json - Time_Keep Off diff --git a/icp_server/resources/observability/opensearch-observability-dashboard/config/fluent-bit/scripts/scripts.lua b/icp_server/resources/observability/opensearch-observability-dashboard/config/fluent-bit/scripts/scripts.lua index 9c7d9e26a..0c38c5e40 100644 --- a/icp_server/resources/observability/opensearch-observability-dashboard/config/fluent-bit/scripts/scripts.lua +++ b/icp_server/resources/observability/opensearch-observability-dashboard/config/fluent-bit/scripts/scripts.lua @@ -1,41 +1,31 @@ -- Enhanced Lua Scripts for Fluent Bit - Ballerina Focus -- scripts/scripts.lua -function extract_app_from_path(tag, timestamp, record) +function process_bal_logs(tag, timestamp, record) + record["product"] = "ballerina integrator" + + -- Extract app name from path: /var/log/bi/myapp/app.log → "myapp" if record["log_file_path"] then - local path = record["log_file_path"] - -- Extract application name from path like /var/log/ballerina/ballerina-app/app.log - local app_name = string.match(path, "/var/log/[^/]+/([^/]+)/") - if app_name then - record["app_name"] = app_name - else - record["app_name"] = "unknown" - end - - -- Extract service type from path - local service_type = string.match(path, "/var/log/([^/]+)/") - if service_type then - record["service_type"] = service_type - end + record["app_name"] = string.match(record["log_file_path"], "/var/log/[^/]+/([^/]+)/") or "unknown" + else + record["app_name"] = "unknown" end - return 1, timestamp, record -end -function construct_bal_app_name(tag, timestamp, record) - local deployment = record["app_name"] or "unknown" - - local moduleName = record["module"] - if record["src.module"] then - moduleName = record["src.module"] + -- Extract app_module from module (first segment before /) + if record["module"] then + record["app_module"] = string.match(record["module"], "^([^/]+)") end + -- Construct display name: "deployment - module" + local deployment = record["app_name"] + local moduleName = record["src.module"] or record["module"] if moduleName then record["app"] = deployment .. " - " .. moduleName else - record["app"] = deployment + record["app"] = deployment end - record["deployment"] = deployment + return 1, timestamp, record end @@ -77,24 +67,6 @@ function extract_bal_metrics_data(tag, timestamp, record) return 1, timestamp, record end -function enrich_bal_logs(tag, timestamp, record) - -- Add common fields for all Ballerina logs - record["product"] = "ballerina integrator" - - -- Extract module info if available - if record["module"] then - local module_parts = {} - for part in string.gmatch(record["module"], "[^/]+") do - table.insert(module_parts, part) - end - if #module_parts > 0 then - record["app_module"] = module_parts[1] - end - end - - return 1, timestamp, record -end - function enrich_mi_logs(tag, timestamp, record) -- Add common fields for all MI logs record["product"] = "Micro Integrator" @@ -139,19 +111,45 @@ function simple_hash(str) return string.format("%08x%08x", hash1, hash2) end --- ========== MI Metrics (synapse-analytics.log) ========== - --- Enrich MI metrics records with common metadata fields -function enrich_mi_metrics(tag, timestamp, record) - record["product"] = "Micro Integrator" - record["service_type"] = "MI" - record["log_type"] = "metrics" +-- Parse SYNAPSE_ANALYTICS_DATA from MI carbon log message into structured fields +function parse_mi_analytics(tag, timestamp, record) + local message = record["message"] or "" + local json_str = string.match(message, "^SYNAPSE_ANALYTICS_DATA%s+(.+)$") + if not json_str then + return 1, timestamp, record + end - -- Ensure icp_runtimeId is always present (extracted by mi_metrics_json_extract parser) - if not record["icp_runtimeId"] or record["icp_runtimeId"] == "" then - record["icp_runtimeId"] = "" + record["@timestamp"] = string.match(json_str, '"@timestamp"%s*:%s*"([^"]+)"') + + record["serverInfo"] = { + hostname = string.match(json_str, '"hostname"%s*:%s*"([^"]+)"') or "", + serverName = string.match(json_str, '"serverName"%s*:%s*"([^"]+)"') or "", + id = string.match(json_str, '"id"%s*:%s*"([^"]+)"') or "" + } + + local payload = {} + payload["entityType"] = string.match(json_str, '"entityType"%s*:%s*"([^"]+)"') or "" + local latency = string.match(json_str, '"latency"%s*:%s*(%d+)') + if latency then payload["latency"] = tonumber(latency) end + payload["failure"] = string.match(json_str, '"failure"%s*:%s*(%a+)') == "true" + payload["faultResponse"] = string.match(json_str, '"faultResponse"%s*:%s*(%a+)') == "true" + + local api = string.match(json_str, '"api"%s*:%s*"([^"]+)"') + if api then + payload["apiDetails"] = { + api = api, + apiContext = string.match(json_str, '"apiContext"%s*:%s*"([^"]+)"') or "", + method = string.match(json_str, '"method"%s*:%s*"([^"]+)"') or "", + transport = string.match(json_str, '"transport"%s*:%s*"([^"]+)"') or "", + subRequestPath = string.match(json_str, '"subRequestPath"%s*:%s*"([^"]+)"') or "" + } end + record["payload"] = payload + record["service_type"] = "MI" + record["product"] = "Micro Integrator" + record["message"] = nil + return 1, timestamp, record end diff --git a/icp_server/resources/observability/opensearch-observability-dashboard/setup/index-template-request.json b/icp_server/resources/observability/opensearch-observability-dashboard/setup/index-template-request.json index f4c173e35..e21d9c490 100644 --- a/icp_server/resources/observability/opensearch-observability-dashboard/setup/index-template-request.json +++ b/icp_server/resources/observability/opensearch-observability-dashboard/setup/index-template-request.json @@ -1,26 +1,11 @@ { "index_patterns": [ - "ballerina-application-logs-*", "mi-application-logs-*" + "ballerina-application-logs-*", "ballerina-metrics-logs-*", + "mi-application-logs-*", "mi-metrics-logs-*" ], "template": { "mappings": { "properties": { - "kubernetes": { - "type": "object", - "properties": { - "labels": { - "type": "object", - "properties": { - "app": { - "type": "keyword" - }, - "component": { - "type": "keyword" - } - } - } - } - }, "level": { "type": "keyword" },