Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified icp_server/database/icpdb.mv.db
Binary file not shown.
273 changes: 253 additions & 20 deletions icp_server/modules/storage/artifact_repository.bal

Large diffs are not rendered by default.

115 changes: 69 additions & 46 deletions icp_server/modules/storage/heartbeat_repository.bal
Original file line number Diff line number Diff line change
Expand Up @@ -143,24 +143,26 @@ public isolated function processDeltaHeartbeat(types:DeltaHeartbeat deltaHeartbe
WHERE runtime_id = ${deltaHeartbeat.runtime}
`);

boolean runtimeExists = true;
if timestampResult is error {
log:printError(string `Failed to update timestamp for runtime ${deltaHeartbeat.runtime}`, timestampResult);
// Continue processing - timestamp update failure shouldn't block command retrieval
// Determine if runtime is missing; in that case, avoid FK violations when auditing
runtimeExists = false;
} else {
// If no rows were updated, runtime may have been deleted concurrently
runtimeExists = (timestampResult.affectedRowCount ?: 0) > 0;
}

// Handle control commands and audit logging in a transaction
// This ensures commands are marked as 'sent' atomically with audit log creation
// Commands are marked as 'sent' atomically with audit log creation
transaction {
// Retrieve pending control commands for this runtime
// Lock pending commands for this runtime to avoid concurrent modifications
// Use FOR UPDATE so rows cannot be changed by other transactions while we process them
// Retrieve pending control commands for this runtime (no DB-specific locks)
stream<types:ControlCommand, sql:Error?> commandStream = dbClient->query(`
SELECT command_id, runtime_id, target_artifact, action, issued_at, status
FROM control_commands
WHERE runtime_id = ${deltaHeartbeat.runtime}
AND status = 'pending'
ORDER BY issued_at ASC
FOR UPDATE
`);

check from types:ControlCommand command in commandStream
Expand All @@ -180,15 +182,28 @@ public isolated function processDeltaHeartbeat(types:DeltaHeartbeat deltaHeartbe
}

// Create audit log entry
_ = check dbClient->execute(`
INSERT INTO audit_logs (
runtime_id, action, details, timestamp
) VALUES (
${deltaHeartbeat.runtime}, 'DELTA_HEARTBEAT',
${string `Delta heartbeat processed with hash ${deltaHeartbeat.runtimeHash}`},
${currentTimeStr}
)
`);
if runtimeExists {
_ = check dbClient->execute(`
INSERT INTO audit_logs (
runtime_id, action, details, timestamp
) VALUES (
${deltaHeartbeat.runtime}, 'DELTA_HEARTBEAT',
${string `Delta heartbeat processed with hash ${deltaHeartbeat.runtimeHash}`},
${currentTimeStr}
)
`);
} else {
// Runtime was deleted or not yet created; log without FK reference to avoid integrity violation
_ = check dbClient->execute(`
INSERT INTO audit_logs (
action, details, timestamp
) VALUES (
'DELTA_HEARTBEAT',
${string `Delta heartbeat received for missing runtime ${deltaHeartbeat.runtime} with hash ${deltaHeartbeat.runtimeHash}`},
${currentTimeStr}
)
`);
}

check commit;
log:printInfo(string `Successfully processed delta heartbeat for runtime ${deltaHeartbeat.runtime}`);
Expand Down Expand Up @@ -394,7 +409,7 @@ isolated function upsertRuntime(types:Heartbeat heartbeat) returns boolean|error
string runtimeHostname = heartbeat.runtimeHostname ?: "";
string runtimePort = heartbeat.runtimePort ?: "";

sql:ExecutionResult result = check dbClient->execute(`
sql:ExecutionResult|error insertRes = dbClient->execute(`
INSERT INTO runtimes (
Comment on lines 411 to 413

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 3

Suggested change
sql:ExecutionResult result = check dbClient->execute(`
sql:ExecutionResult|error insertRes = dbClient->execute(`
INSERT INTO runtimes (
sql:ExecutionResult|error insertRes = dbClient->execute(`
log:printDebug(string `Attempting to insert runtime ${heartbeat.runtime} with type ${heartbeat.runtimeType}`);
INSERT INTO runtimes (

runtime_id, name, runtime_type, status, version,
runtime_hostname, runtime_port,
Expand All @@ -403,7 +418,7 @@ isolated function upsertRuntime(types:Heartbeat heartbeat) returns boolean|error
os_name, os_version,
carbon_home, java_vendor, java_version,
total_memory, free_memory, max_memory, used_memory,
os_arch, server_name
os_arch, server_name, last_heartbeat
) VALUES (
${heartbeat.runtime}, ${heartbeat.runtime}, ${heartbeat.runtimeType}, ${heartbeat.status}, ${heartbeat.version},
${runtimeHostname}, ${runtimePort},
Expand All @@ -412,37 +427,45 @@ isolated function upsertRuntime(types:Heartbeat heartbeat) returns boolean|error
${heartbeat.nodeInfo.osName}, ${heartbeat.nodeInfo.osVersion},
${heartbeat.nodeInfo.carbonHome}, ${heartbeat.nodeInfo.javaVendor}, ${heartbeat.nodeInfo.javaVersion},
${heartbeat.nodeInfo.totalMemory}, ${heartbeat.nodeInfo.freeMemory}, ${heartbeat.nodeInfo.maxMemory}, ${heartbeat.nodeInfo.usedMemory},
${heartbeat.nodeInfo.osArch}, ${heartbeat.nodeInfo.platformName}
${heartbeat.nodeInfo.osArch}, ${heartbeat.nodeInfo.platformName}, CURRENT_TIMESTAMP
)
ON DUPLICATE KEY UPDATE
name = VALUES(name),
runtime_type = VALUES(runtime_type),
status = VALUES(status),
version = VALUES(version),
runtime_hostname = VALUES(runtime_hostname),
runtime_port = VALUES(runtime_port),
environment_id = VALUES(environment_id),
project_id = VALUES(project_id),
component_id = VALUES(component_id),
platform_name = VALUES(platform_name),
platform_version = VALUES(platform_version),
platform_home = VALUES(platform_home),
os_name = VALUES(os_name),
os_version = VALUES(os_version),
carbon_home = VALUES(carbon_home),
java_vendor = VALUES(java_vendor),
java_version = VALUES(java_version),
total_memory = VALUES(total_memory),
free_memory = VALUES(free_memory),
max_memory = VALUES(max_memory),
used_memory = VALUES(used_memory),
os_arch = VALUES(os_arch),
server_name = VALUES(server_name),
`);

if insertRes is sql:ExecutionResult {
int? rows = insertRes.affectedRowCount;
return rows == 1;
}
Comment on lines +434 to +437

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 4

Suggested change
if insertRes is sql:ExecutionResult {
int? rows = insertRes.affectedRowCount;
return rows == 1;
}
if insertRes is sql:ExecutionResult {
int? rows = insertRes.affectedRowCount;
log:printInfo(string `Successfully inserted runtime ${heartbeat.runtime}`);
return rows == 1;


_ = check dbClient->execute(`
UPDATE runtimes SET
name = ${heartbeat.runtime},
runtime_type = ${heartbeat.runtimeType},
status = ${heartbeat.status},
version = ${heartbeat.version},
runtime_hostname = ${runtimeHostname},
runtime_port = ${runtimePort},
environment_id = ${heartbeat.environment},
project_id = ${heartbeat.project},
component_id = ${heartbeat.component},
platform_name = ${heartbeat.nodeInfo.platformName},
platform_version = ${heartbeat.nodeInfo.platformVersion},
platform_home = ${heartbeat.nodeInfo.platformHome},
os_name = ${heartbeat.nodeInfo.osName},
os_version = ${heartbeat.nodeInfo.osVersion},
carbon_home = ${heartbeat.nodeInfo.carbonHome},
java_vendor = ${heartbeat.nodeInfo.javaVendor},
java_version = ${heartbeat.nodeInfo.javaVersion},
total_memory = ${heartbeat.nodeInfo.totalMemory},
free_memory = ${heartbeat.nodeInfo.freeMemory},
max_memory = ${heartbeat.nodeInfo.maxMemory},
used_memory = ${heartbeat.nodeInfo.usedMemory},
os_arch = ${heartbeat.nodeInfo.osArch},
server_name = ${heartbeat.nodeInfo.platformName},
last_heartbeat = CURRENT_TIMESTAMP
WHERE runtime_id = ${heartbeat.runtime}
`);

int? affectedRows = result.affectedRowCount;
return affectedRows == 1;
return false;
}

// Insert all runtime artifacts
Expand Down Expand Up @@ -547,9 +570,9 @@ isolated function insertMIArtifacts(types:Heartbeat heartbeat) returns error? {
string? transportsJson = proxy.transports is string[] ? (<string[]>proxy.transports).toJsonString() : ();
_ = check dbClient->execute(`
INSERT INTO runtime_proxy_services (
runtime_id, proxy_name, wsdl, transports, state
runtime_id, proxy_name, transports, state
) VALUES (
${heartbeat.runtime}, ${proxy.name}, ${proxy.wsdl},
${heartbeat.runtime}, ${proxy.name},
${transportsJson}, ${proxy.state}
)
`);
Expand Down
1 change: 0 additions & 1 deletion icp_server/modules/storage/repository_common.bal
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ isolated function retrieveAndMarkCommandsAsSent(string runtimeId) returns types:
WHERE runtime_id = ${runtimeId}
AND status = 'pending'
ORDER BY issued_at ASC
FOR UPDATE
`);

check from types:ControlCommand command in commandStream
Expand Down
3 changes: 1 addition & 2 deletions icp_server/modules/storage/runtime_repository.bal
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ isolated function getApiResourcesForRuntime(string runtimeId, string apiName) re
public isolated function getProxyServicesForRuntime(string runtimeId) returns types:ProxyService[]|error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 5

Suggested change
public isolated function getProxyServicesForRuntime(string runtimeId) returns types:ProxyService[]|error {
public isolated function getProxyServicesForRuntime(string runtimeId) returns types:ProxyService[]|error {
log.debug("Fetching proxy services for runtime: " + runtimeId);

types:ProxyService[] proxyList = [];
stream<types:ProxyServiceRecordInDB, sql:Error?> proxyStream = dbClient->query(`
SELECT proxy_name, wsdl, transports, state
SELECT proxy_name, transports, state
FROM runtime_proxy_services
WHERE runtime_id = ${runtimeId}
`);
Expand All @@ -269,7 +269,6 @@ public isolated function getProxyServicesForRuntime(string runtimeId) returns ty

types:ProxyService proxy = {
name: proxyRecord.proxy_name,
wsdl: proxyRecord.wsdl,
transports: transports,
state: proxyRecord.state
};
Expand Down
16 changes: 14 additions & 2 deletions icp_server/modules/types/types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ public type ProxyServiceRecordInDB record {
string proxy_package;
string base_path;
ArtifactState state;
string wsdl = "";
string transports = "";
};

Expand Down Expand Up @@ -438,6 +437,7 @@ public type Service record {
string 'type = ""; // "API", "ProxyService", "DataService", "InboundEndpoint", "ScheduledTask"
Resource[] resources;
Listener[] listeners;
string[] runtimeIds?;
};

public type Listener record {
Expand Down Expand Up @@ -483,6 +483,7 @@ public type Listener record {
name: "listener_state"
}
string state = "ENABLED"; // "ENABLED", "DISABLED"
string[] runtimeIds?;
};

// MI Runtime specific artifact types
Expand All @@ -499,6 +500,7 @@ public type RestApi record {
}
string state = "ENABLED"; // "ENABLED", "DISABLED"
ApiResource[] resources = []; // API resources (path + methods)
string[] runtimeIds?;
};

// API Resource type for MI API resources
Expand All @@ -515,12 +517,12 @@ public type ProxyService record {
name: "proxy_name"
}
string name;
string wsdl?;
string[] transports?;
@sql:Column {
name: "proxy_state"
}
string state = "ENABLED"; // "ENABLED", "DISABLED"
string[] runtimeIds?;
};

public type Endpoint record {
Expand All @@ -534,6 +536,7 @@ public type Endpoint record {
name: "endpoint_state"
}
string state = "ENABLED"; // "ENABLED", "DISABLED"
string[] runtimeIds?;
};

public type InboundEndpoint record {
Expand All @@ -547,6 +550,7 @@ public type InboundEndpoint record {
name: "inbound_state"
}
string state = "ENABLED"; // "ENABLED", "DISABLED"
string[] runtimeIds?;
};

public type Sequence record {
Expand All @@ -560,6 +564,7 @@ public type Sequence record {
name: "sequence_state"
}
string state = "ENABLED"; // "ENABLED", "DISABLED"
string[] runtimeIds?;
};

public type Task record {
Expand All @@ -573,6 +578,7 @@ public type Task record {
name: "task_state"
}
string state = "ENABLED"; // "ENABLED", "DISABLED"
string[] runtimeIds?;
};

public type Template record {
Expand All @@ -585,6 +591,7 @@ public type Template record {
name: "template_state"
}
string state = "ENABLED"; // "ENABLED", "DISABLED"
string[] runtimeIds?;
};

public type MessageStore record {
Expand All @@ -598,6 +605,7 @@ public type MessageStore record {
name: "store_state"
}
string state = "ENABLED"; // "ENABLED", "DISABLED"
string[] runtimeIds?;
};

public type MessageProcessor record {
Expand All @@ -611,6 +619,7 @@ public type MessageProcessor record {
name: "processor_state"
}
string state = "ENABLED"; // "ENABLED", "DISABLED"
string[] runtimeIds?;
};

public type LocalEntry record {
Expand All @@ -624,6 +633,7 @@ public type LocalEntry record {
name: "entry_state"
}
string state = "ENABLED"; // "ENABLED", "DISABLED"
string[] runtimeIds?;
};

public type DataService record {
Expand All @@ -637,6 +647,7 @@ public type DataService record {
name: "dataservice_state"
}
string state = "ENABLED"; // "ENABLED", "DISABLED"
string[] runtimeIds?;
};

public type CarbonApp record {
Expand All @@ -652,6 +663,7 @@ public type CarbonApp record {
string state = "Active"; // "Active", "Faulty"
// Artifacts packaged within the Carbon App (from heartbeat payload)
CarbonAppArtifact[] artifacts?;
string[] runtimeIds?;
};

// Artifact shape used inside CarbonApp
Expand Down
10 changes: 2 additions & 8 deletions icp_server/resources/db/init-scripts/h2_init.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
-- ============================================================================
-- ICP Server H2 Database Schema
-- ============================================================================

--
-- Drop all database objects (tables, views, sequences, etc.)
DROP ALL OBJECTS;

-- ============================================================================
-- ORGANIZATIONS
--
-- ============================================================================

CREATE TABLE organizations (
Expand Down Expand Up @@ -837,7 +832,6 @@ CREATE TABLE runtime_proxy_services (
id BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT,
runtime_id CHAR(36) NOT NULL,
proxy_name VARCHAR(200) NOT NULL,
wsdl TEXT,
transports CLOB,
state VARCHAR(20) NOT NULL DEFAULT 'ENABLED',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
Expand Down
1 change: 0 additions & 1 deletion icp_server/resources/db/init-scripts/mysql_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,6 @@ CREATE TABLE runtime_proxy_services (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
runtime_id CHAR(36) NOT NULL,
proxy_name VARCHAR(200) NOT NULL,
wsdl TEXT NULL,
transports JSON NULL,
state ENUM(
'ENABLED',
Expand Down
Loading