Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,48 +56,14 @@
Read_from_Head On
Path_Key log_file_path

# Input from MI analytics log files (synapse-analytics.log)
# Reads structured analytics events written by the Synapse analytics publisher
[INPUT]
Name tail
Path /var/log/mi/${MI_ANALYTICS_LOG_FILE_NAME}
Tag mi_metrics
# Position tracking database (persistent)
DB /fluent-bit/db/tail-mi-metrics.db
DB.locking true
DB.journal_mode WAL
# File rotation handling
Refresh_Interval 5
Ignore_Older 2h
# Buffer settings
Buffer_Chunk_Size 512KB
Buffer_Max_Size 2MB
Mem_Buf_Limit 256MB
Skip_Long_Lines On
Read_from_Head On

# Add application name from file path - For Ballerina logs
[FILTER]
Name lua
Match ballerina.*
Script /fluent-bit/scripts/scripts.lua
Call extract_app_from_path

# ========== BALLERINA LOG PROCESSING ==========

# Enrich all Ballerina logs with common fields
[FILTER]
Name lua
Match ballerina.*
Script /fluent-bit/scripts/scripts.lua
Call enrich_bal_logs

# Construct Ballerina app names
# Extract app name from path, enrich with common fields, construct display name
[FILTER]
Name lua
Match ballerina.*
Script /fluent-bit/scripts/scripts.lua
Call construct_bal_app_name
Call process_bal_logs

# First: Separate Ballerina metrics logs (those with logger="metrics")
[FILTER]
Expand Down Expand Up @@ -138,51 +104,29 @@
Script /fluent-bit/scripts/scripts.lua
Call enrich_mi_logs

# This catches all mi.* logs
[FILTER]
Name rewrite_tag
Match mi.*
Rule $service_type ^MI$ mi_app_logs false
Emitter_Name re_emitted_mi_app_logs

# ========== MI ANALYTICS LOG PROCESSING ==========

# Only keep log lines that contain the analytics prefix written by Synapse
[FILTER]
Name grep
Match mi_metrics
Regex log SYNAPSE_ANALYTICS_DATA

# Step 1: Extract the raw JSON string from the log line
# Route SYNAPSE_ANALYTICS_DATA lines from carbon log to mi_metrics tag
# Must come before the mi_app_logs rewrite (which drops the original mi.* record)
[FILTER]
Name parser
Match mi_metrics
Key_Name log
Parser mi_metrics_json_extract
Reserve_Data Off

# Step 2: Parse the extracted JSON string into structured fields
# Reserve_Data must be On to preserve icp_runtimeId extracted in Step 1
[FILTER]
Name parser
Match mi_metrics
Key_Name json_str
Parser mi_metrics_parse_json
Reserve_Data On
Name rewrite_tag
Match mi.*
Rule $message ^SYNAPSE_ANALYTICS_DATA mi_metrics false
Emitter_Name re_emitted_mi_metrics

# Enrich MI metrics records with common metadata
# This catches all remaining mi.* logs
[FILTER]
Name lua
Match mi_metrics
Script /fluent-bit/scripts/scripts.lua
Call enrich_mi_metrics
Name rewrite_tag
Match mi.*
Rule $service_type ^MI$ mi_app_logs false
Emitter_Name re_emitted_mi_app_logs

# Route metrics records to a dedicated tag
# Parse analytics JSON and enrich with metadata
[FILTER]
Name rewrite_tag
Match mi_metrics
Rule $log_type ^metrics$ mi_metrics_logs false
Emitter_Name re_emitted_mi_metrics
Name lua
Match mi_metrics
Script /fluent-bit/scripts/scripts.lua
Call parse_mi_analytics

# ========== DOCUMENT ID GENERATION ==========
# Generate document IDs for deduplication
Expand Down Expand Up @@ -214,7 +158,7 @@
# Generate IDs for MI metrics logs
[FILTER]
Name lua
Match mi_metrics_logs
Match mi_metrics
Script /fluent-bit/scripts/scripts.lua
Call generate_mi_metrics_document_id
time_as_table true
Expand Down Expand Up @@ -284,7 +228,7 @@
Logstash_Format On
Logstash_DateFormat %Y-%m-%d
Logstash_Prefix mi-metrics-logs
Match mi_metrics_logs
Match mi_metrics
Buffer_Size 2M
Port 9200
Replace_Dots On
Expand All @@ -294,19 +238,4 @@
tls.verify Off
Trace_Error On

# Fallback for any unmatched logs
[OUTPUT]
Name opensearch
Host opensearch
HTTP_User admin
HTTP_Passwd ${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
Logstash_Format On
Logstash_DateFormat %Y-%m-%d
Logstash_Prefix general-logs
Match_regex ^(mi|wso2apim).*
Port 9200
Replace_Dots On
Suppress_Type_Name On
tls On
tls.verify Off
Trace_Error On

Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,6 @@
Time_Format %Y-%m-%dT%H:%M:%S.%L%z
Time_Keep On

[PARSER]
Name bal_parser
Format logfmt
Types response_time_seconds:float

[PARSER]
Name jsonparser
Format json
Time_Key time
Time_Keep On

[PARSER]
Name docker
Format json
Time_Key time
Time_Format %Y-%m-%dT%H:%M:%S.%L
Time_Keep On

# MI (Micro Integrator) log parser
[PARSER]
Name mi_log_parser
Expand All @@ -44,16 +26,3 @@
Rule "start_state" "^\[[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}:[\d]{2}" "cont"
Rule "cont" "^(?!\[[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}:[\d]{2})" "cont"

# MI Analytics log parsers (for synapse-analytics.log)
# Step 1: Extract the JSON string that follows SYNAPSE_ANALYTICS_DATA in the log line
# Also captures the optional [icp.runtimeId=...] suffix appended by the MI runtime
[PARSER]
Name mi_metrics_json_extract
Format regex
Regex SYNAPSE_ANALYTICS_DATA\s+(?<json_str>\{.*\})(?:\s+\[icp\.runtimeId=(?<icp_runtimeId>[^\]]+)\])?

# Step 2: Parse the extracted JSON string into structured fields
[PARSER]
Name mi_metrics_parse_json
Format json
Time_Keep Off
Original file line number Diff line number Diff line change
@@ -1,41 +1,31 @@
-- Enhanced Lua Scripts for Fluent Bit - Ballerina Focus
-- scripts/scripts.lua

function extract_app_from_path(tag, timestamp, record)
function process_bal_logs(tag, timestamp, record)
record["product"] = "ballerina integrator"

-- Extract app name from path: /var/log/bi/myapp/app.log → "myapp"
if record["log_file_path"] then
local path = record["log_file_path"]
-- Extract application name from path like /var/log/ballerina/ballerina-app/app.log
local app_name = string.match(path, "/var/log/[^/]+/([^/]+)/")
if app_name then
record["app_name"] = app_name
else
record["app_name"] = "unknown"
end

-- Extract service type from path
local service_type = string.match(path, "/var/log/([^/]+)/")
if service_type then
record["service_type"] = service_type
end
record["app_name"] = string.match(record["log_file_path"], "/var/log/[^/]+/([^/]+)/") or "unknown"
else
record["app_name"] = "unknown"
end
return 1, timestamp, record
end

function construct_bal_app_name(tag, timestamp, record)
local deployment = record["app_name"] or "unknown"

local moduleName = record["module"]
if record["src.module"] then
moduleName = record["src.module"]
-- Extract app_module from module (first segment before /)
if record["module"] then
record["app_module"] = string.match(record["module"], "^([^/]+)")
end

-- Construct display name: "deployment - module"
local deployment = record["app_name"]
local moduleName = record["src.module"] or record["module"]
if moduleName then
record["app"] = deployment .. " - " .. moduleName
else
record["app"] = deployment
record["app"] = deployment
end

record["deployment"] = deployment

return 1, timestamp, record
end

Expand Down Expand Up @@ -77,24 +67,6 @@ function extract_bal_metrics_data(tag, timestamp, record)
return 1, timestamp, record
end

function enrich_bal_logs(tag, timestamp, record)
-- Add common fields for all Ballerina logs
record["product"] = "ballerina integrator"

-- Extract module info if available
if record["module"] then
local module_parts = {}
for part in string.gmatch(record["module"], "[^/]+") do
table.insert(module_parts, part)
end
if #module_parts > 0 then
record["app_module"] = module_parts[1]
end
end

return 1, timestamp, record
end

function enrich_mi_logs(tag, timestamp, record)
-- Add common fields for all MI logs
record["product"] = "Micro Integrator"
Expand Down Expand Up @@ -139,19 +111,45 @@ function simple_hash(str)
return string.format("%08x%08x", hash1, hash2)
end

-- ========== MI Metrics (synapse-analytics.log) ==========

-- Enrich MI metrics records with common metadata fields
function enrich_mi_metrics(tag, timestamp, record)
record["product"] = "Micro Integrator"
record["service_type"] = "MI"
record["log_type"] = "metrics"
-- Parse SYNAPSE_ANALYTICS_DATA from MI carbon log message into structured fields
function parse_mi_analytics(tag, timestamp, record)
local message = record["message"] or ""
local json_str = string.match(message, "^SYNAPSE_ANALYTICS_DATA%s+(.+)$")
if not json_str then
return 1, timestamp, record
end

-- Ensure icp_runtimeId is always present (extracted by mi_metrics_json_extract parser)
if not record["icp_runtimeId"] or record["icp_runtimeId"] == "" then
record["icp_runtimeId"] = ""
record["@timestamp"] = string.match(json_str, '"@timestamp"%s*:%s*"([^"]+)"')

record["serverInfo"] = {
hostname = string.match(json_str, '"hostname"%s*:%s*"([^"]+)"') or "",
serverName = string.match(json_str, '"serverName"%s*:%s*"([^"]+)"') or "",
id = string.match(json_str, '"id"%s*:%s*"([^"]+)"') or ""
}

local payload = {}
payload["entityType"] = string.match(json_str, '"entityType"%s*:%s*"([^"]+)"') or ""
local latency = string.match(json_str, '"latency"%s*:%s*(%d+)')
if latency then payload["latency"] = tonumber(latency) end
payload["failure"] = string.match(json_str, '"failure"%s*:%s*(%a+)') == "true"
payload["faultResponse"] = string.match(json_str, '"faultResponse"%s*:%s*(%a+)') == "true"

local api = string.match(json_str, '"api"%s*:%s*"([^"]+)"')
if api then
payload["apiDetails"] = {
api = api,
apiContext = string.match(json_str, '"apiContext"%s*:%s*"([^"]+)"') or "",
method = string.match(json_str, '"method"%s*:%s*"([^"]+)"') or "",
transport = string.match(json_str, '"transport"%s*:%s*"([^"]+)"') or "",
subRequestPath = string.match(json_str, '"subRequestPath"%s*:%s*"([^"]+)"') or ""
}
end

record["payload"] = payload
record["service_type"] = "MI"
record["product"] = "Micro Integrator"
record["message"] = nil

return 1, timestamp, record
end

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,11 @@
{
"index_patterns": [
"ballerina-application-logs-*", "mi-application-logs-*"
"ballerina-application-logs-*", "ballerina-metrics-logs-*",
"mi-application-logs-*", "mi-metrics-logs-*"
],
"template": {
"mappings": {
"properties": {
"kubernetes": {
"type": "object",
"properties": {
"labels": {
"type": "object",
"properties": {
"app": {
"type": "keyword"
},
"component": {
"type": "keyword"
}
}
}
}
},
"level": {
"type": "keyword"
},
Expand Down
Loading