diff --git a/build.zig b/build.zig index 4e5bbaf..e29e43c 100644 --- a/build.zig +++ b/build.zig @@ -4,9 +4,6 @@ pub fn build(b: *std.Build) void { const target = b.standardTargetOptions(.{}); const optimize = b.standardOptimizeOption(.{}); - // Get dependencies - const questdb_dep = b.dependency("c-questdb-client", .{}); - // Create modules with explicit dependencies const rpc_mod = b.addModule("rpc", .{ .source_file = .{ .path = "src/rpc.zig" }, @@ -27,7 +24,6 @@ pub fn build(b: *std.Build) void { .source_file = .{ .path = "src/questdb.zig" }, .dependencies = &.{ .{ .name = "database", .module = database_mod }, - .{ .name = "c-questdb-client", .module = questdb_dep.module("c-questdb-client") }, }, }); diff --git a/build.zig.zon b/build.zig.zon index 5e9aa0a..643996a 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -2,12 +2,7 @@ .name = "zindexer", .version = "0.1.0", .minimum_zig_version = "0.14.0", - .dependencies = .{ - .@"c-questdb-client" = .{ - .url = "https://github.com/openSVM/c-questdb-client/archive/refs/heads/main.tar.gz", - .hash = "1220ffdabd90f696e3d850485b2eae8df806173a9ab6f50ae4fbb708928d06037145", - }, - }, + .dependencies = .{}, .paths = .{ "src", "build.zig", diff --git a/src/questdb/account.zig b/src/questdb/account.zig index 9115473..0678c0d 100644 --- a/src/questdb/account.zig +++ b/src/questdb/account.zig @@ -7,12 +7,8 @@ const types = @import("types.zig"); /// Insert an account into QuestDB pub fn insertAccount(self: *@This(), network: []const u8, pubkey: []const u8, slot: u64, block_time: i64, owner: []const u8, lamports: u64, executable: u8, rent_epoch: u64, data_len: u64, write_version: u64) !void { - if (self.logging_only) { - std.log.info("Logging-only mode, skipping account insert for {s}", .{pubkey}); - return; } - if (self.ilp_client == null) return types.QuestDBError.ConnectionFailed; var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); @@ -54,13 +50,12 @@ pub fn insertAccount(self: *@This(), network: []const u8, pubkey: []const u8, sl // Timestamp (use block_time as timestamp in nanoseconds) try ilp_buffer.appendSlice(" "); - try std.fmt.format(ilp_buffer.writer(), "{d}000000", .{block_time}); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); try ilp_buffer.appendSlice("\n"); // Send the ILP data to QuestDB if (self.ilp_client) |client| { - _ = c_questdb.questdb_client_insert_ilp(client, ilp_buffer.items.ptr, ilp_buffer.items.len) catch |err| { std.log.err("Failed to insert account ILP data: {any}", .{err}); return types.QuestDBError.QueryFailed; }; @@ -69,12 +64,8 @@ pub fn insertAccount(self: *@This(), network: []const u8, pubkey: []const u8, sl /// Insert an account update into QuestDB pub fn insertAccountUpdate(self: *@This(), network: []const u8, pubkey: []const u8, slot: u64, block_time: i64, owner: []const u8, lamports: u64, executable: u8, rent_epoch: u64, data_len: u64, write_version: u64) !void { - if (self.logging_only) { - std.log.info("Logging-only mode, skipping account update insert for {s}", .{pubkey}); - return; } - if (self.ilp_client == null) return types.QuestDBError.ConnectionFailed; var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); @@ -116,15 +107,63 @@ pub fn insertAccountUpdate(self: *@This(), network: []const u8, pubkey: []const // Timestamp (use block_time as timestamp in nanoseconds) try ilp_buffer.appendSlice(" "); - try std.fmt.format(ilp_buffer.writer(), "{d}000000", .{block_time}); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); try ilp_buffer.appendSlice("\n"); // Send the ILP data to QuestDB if (self.ilp_client) |client| { - _ = c_questdb.questdb_client_insert_ilp(client, ilp_buffer.items.ptr, ilp_buffer.items.len) catch |err| { std.log.err("Failed to insert account update ILP data: {any}", .{err}); return types.QuestDBError.QueryFailed; }; } +} + +/// Insert account activity into QuestDB (equivalent to insertAccountActivity in ClickHouse) +pub fn insertAccountActivity(self: *@This(), network: []const u8, pubkey: []const u8, slot: u64, block_time: i64, program_id: []const u8, write_count: u32, cu_consumed: u64, fee_paid: u64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("account_activity,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",pubkey="); + try ilp_buffer.appendSlice(pubkey); + try ilp_buffer.appendSlice(",program_id="); + try ilp_buffer.appendSlice(program_id); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",write_count="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{write_count}); + + try ilp_buffer.appendSlice(",cu_consumed="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{cu_consumed}); + + try ilp_buffer.appendSlice(",fee_paid="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{fee_paid}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert account activity ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } } \ No newline at end of file diff --git a/src/questdb/client.zig b/src/questdb/client.zig index 846290f..4906753 100644 --- a/src/questdb/client.zig +++ b/src/questdb/client.zig @@ -11,7 +11,6 @@ const security = @import("security.zig"); const instruction = @import("instruction.zig"); const account = @import("account.zig"); const database = @import("../database.zig"); -const c_questdb = @import("c-questdb-client"); pub const QuestDBClient = struct { allocator: Allocator, @@ -19,8 +18,7 @@ pub const QuestDBClient = struct { user: []const u8, password: []const u8, database: []const u8, - ilp_client: ?*c_questdb.QuestDBClient, - logging_only: bool, + http_client: std.http.Client, db_client: database.DatabaseClient, const Self = @This(); @@ -48,16 +46,8 @@ pub const QuestDBClient = struct { // Validate URL _ = try std.Uri.parse(url); - // Initialize the QuestDB client - var ilp_client: ?*c_questdb.QuestDBClient = null; - var logging_only = false; - - // Create the client - ilp_client = c_questdb.questdb_client_new(url.ptr, url.len) catch |err| { - std.log.warn("Failed to create QuestDB client: {any} - continuing in logging-only mode", .{err}); - logging_only = true; - ilp_client = null; - }; + // Create HTTP client + var http_client = std.http.Client{ .allocator = allocator }; // Create the client instance var client = Self{ @@ -66,8 +56,7 @@ pub const QuestDBClient = struct { .user = try allocator.dupe(u8, user), .password = try allocator.dupe(u8, password), .database = try allocator.dupe(u8, database), - .ilp_client = ilp_client, - .logging_only = logging_only, + .http_client = http_client, .db_client = database.DatabaseClient{ .vtable = &vtable, }, @@ -83,9 +72,7 @@ pub const QuestDBClient = struct { } pub fn deinit(self: *Self) void { - if (self.ilp_client) |client| { - c_questdb.questdb_client_close(client); - } + self.http_client.deinit(); self.allocator.free(self.url); self.allocator.free(self.user); self.allocator.free(self.password); @@ -98,28 +85,33 @@ pub const QuestDBClient = struct { } pub fn executeQuery(self: *Self, query: []const u8) !void { - if (self.logging_only) { - std.log.info("Logging-only mode, skipping query: {s}", .{query}); - return; + std.log.debug("Executing QuestDB query: {s}", .{query}); + + // Parse the URL to get the host and port + const uri = try std.Uri.parse(self.url); + const host = uri.host orelse return types.QuestDBError.InvalidUrl; + const port = uri.port orelse 9000; + + // Create HTTP request to QuestDB SQL endpoint + var request_uri_buffer: [1024]u8 = undefined; + const request_uri = try std.fmt.bufPrint(request_uri_buffer, "{s}/exec?query={s}", .{ self.url, query }); + + var req = try self.http_client.open(.GET, try std.Uri.parse(request_uri), .{ + .server_header_buffer = &.{}, + }); + defer req.deinit(); + + try req.send(); + try req.finish(); + try req.wait(); + + // Check response status + if (req.response.status != .ok) { + std.log.err("QuestDB query failed with status: {}", .{req.response.status}); + return types.QuestDBError.QueryFailed; } - if (self.ilp_client) |client| { - // Execute the query using QuestDB's REST API - const result = c_questdb.questdb_client_execute_query(client, query.ptr, query.len) catch |err| { - std.log.err("Failed to execute query: {any}", .{err}); - return types.QuestDBError.QueryFailed; - }; - defer c_questdb.questdb_result_free(result); - - // Check for errors - if (c_questdb.questdb_result_has_error(result)) { - const error_msg = c_questdb.questdb_result_get_error(result); - std.log.err("Query failed: {s}", .{error_msg}); - return types.QuestDBError.QueryFailed; - } - } else { - return types.QuestDBError.ConnectionFailed; - } + std.log.debug("QuestDB query executed successfully"); } fn verifyConnectionImpl(ptr: *anyopaque) database.DatabaseError!void { @@ -139,15 +131,9 @@ pub const QuestDBClient = struct { } pub fn createTables(self: *Self) !void { - // First verify connection - self.verifyConnection() catch |err| { - std.log.warn("Failed to connect to QuestDB: {any} - continuing in logging-only mode", .{err}); - self.logging_only = true; - return; - }; - - if (self.logging_only) return; - + // Verify connection first + try self.verifyConnection(); + // Create tables - these would be created by the schema application script // We'll just verify they exist here try self.executeQuery("SHOW TABLES"); @@ -159,12 +145,7 @@ pub const QuestDBClient = struct { } pub fn insertTransactionBatch(self: *Self, transactions: []const std.json.Value, network_name: []const u8) !void { - if (self.logging_only) { - std.log.info("Logging-only mode, skipping batch insert of {d} transactions for network {s}", .{transactions.len, network_name}); - return; - } - - if (self.ilp_client == null) return types.QuestDBError.ConnectionFailed; + std.log.debug("Inserting batch of {d} transactions for network {s}", .{ transactions.len, network_name }); var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); @@ -213,18 +194,47 @@ pub const QuestDBClient = struct { // Timestamp (use block_time as timestamp in nanoseconds) try ilp_buffer.appendSlice(" "); - try std.fmt.format(ilp_buffer.writer(), "{d}000000", .{tx.get("blockTime").?.integer}); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{tx.get("blockTime").?.integer}); try ilp_buffer.appendSlice("\n"); } - // Send the ILP data to QuestDB - if (self.ilp_client) |client| { - _ = c_questdb.questdb_client_insert_ilp(client, ilp_buffer.items.ptr, ilp_buffer.items.len) catch |err| { - std.log.err("Failed to insert ILP data: {any}", .{err}); - return types.QuestDBError.QueryFailed; - }; + // Send the ILP data to QuestDB via HTTP + try self.sendILP(ilp_buffer.items); + } + + /// Send ILP data to QuestDB via HTTP + fn sendILP(self: *Self, ilp_data: []const u8) !void { + // Parse the URL to get the host and port + const uri = try std.Uri.parse(self.url); + const host = uri.host orelse return types.QuestDBError.InvalidUrl; + const port = uri.port orelse 9000; + + // Create HTTP request to QuestDB ILP endpoint + var request_uri_buffer: [1024]u8 = undefined; + const request_uri = try std.fmt.bufPrint(request_uri_buffer, "{s}/write", .{self.url}); + + var req = try self.http_client.open(.POST, try std.Uri.parse(request_uri), .{ + .server_header_buffer = &.{}, + }); + defer req.deinit(); + + // Set content type for ILP + try req.headers.append("content-type", "text/plain"); + + req.transfer_encoding = .{ .content_length = ilp_data.len }; + try req.send(); + try req.writeAll(ilp_data); + try req.finish(); + try req.wait(); + + // Check response status + if (req.response.status != .ok and req.response.status != .no_content) { + std.log.err("QuestDB ILP insert failed with status: {}", .{req.response.status}); + return types.QuestDBError.QueryFailed; } + + std.log.debug("ILP data sent successfully to QuestDB"); } fn getDatabaseSizeImpl(ptr: *anyopaque) database.DatabaseError!usize { @@ -233,35 +243,13 @@ pub const QuestDBClient = struct { } pub fn getDatabaseSize(self: *Self) !usize { - if (self.logging_only) return 0; - if (self.ilp_client == null) return 0; - // Query to get database size - const query = try std.fmt.allocPrint(self.allocator, - \\SELECT sum(size) FROM sys.tables - , .{}); - defer self.allocator.free(query); - - // Execute query and parse result - if (self.ilp_client) |client| { - const result = c_questdb.questdb_client_execute_query(client, query.ptr, query.len) catch |err| { - std.log.warn("Failed to get database size: {any}", .{err}); - return 0; - }; - defer c_questdb.questdb_result_free(result); - - if (c_questdb.questdb_result_has_error(result)) { - std.log.warn("Failed to get database size: {s}", .{c_questdb.questdb_result_get_error(result)}); - return 0; - } - - // Get the first row, first column as size - if (c_questdb.questdb_result_row_count(result) > 0) { - const size_str = c_questdb.questdb_result_get_value(result, 0, 0); - return std.fmt.parseInt(usize, size_str, 10) catch 0; - } - } - + const query = "SELECT sum(table_size) FROM sys.tables"; + + // For now, return 0 since we'd need to parse the HTTP response + // This would require implementing a full HTTP response parser + _ = self; + _ = query; return 0; } @@ -271,35 +259,10 @@ pub const QuestDBClient = struct { } pub fn getTableSize(self: *Self, table_name: []const u8) !usize { - if (self.logging_only) return 0; - if (self.ilp_client == null) return 0; - - // Query to get table size - const query = try std.fmt.allocPrint(self.allocator, - \\SELECT size FROM sys.tables WHERE name = '{s}' - , .{table_name}); - defer self.allocator.free(query); - - // Execute query and parse result - if (self.ilp_client) |client| { - const result = c_questdb.questdb_client_execute_query(client, query.ptr, query.len) catch |err| { - std.log.warn("Failed to get table size: {any}", .{err}); - return 0; - }; - defer c_questdb.questdb_result_free(result); - - if (c_questdb.questdb_result_has_error(result)) { - std.log.warn("Failed to get table size: {s}", .{c_questdb.questdb_result_get_error(result)}); - return 0; - } - - // Get the first row, first column as size - if (c_questdb.questdb_result_row_count(result) > 0) { - const size_str = c_questdb.questdb_result_get_value(result, 0, 0); - return std.fmt.parseInt(usize, size_str, 10) catch 0; - } - } - + // For now, return 0 since we'd need to parse the HTTP response + // This would require implementing a full HTTP response parser + _ = self; + _ = table_name; return 0; } diff --git a/src/questdb/core.zig b/src/questdb/core.zig index 4dc07f4..54535d8 100644 --- a/src/questdb/core.zig +++ b/src/questdb/core.zig @@ -4,13 +4,6 @@ const types = @import("types.zig"); /// Insert a block into QuestDB pub fn insertBlock(self: *@This(), network: []const u8, slot: u64, blockhash: []const u8, previous_blockhash: []const u8, parent_slot: u64, block_time: i64, block_height: ?u64, leader_identity: []const u8, rewards: f64, transaction_count: u32, successful_transaction_count: u32, failed_transaction_count: u32) !void { - if (self.logging_only) { - std.log.info("Logging-only mode, skipping block insert for slot {d}", .{slot}); - return; - } - - if (self.ilp_client == null) return types.QuestDBError.ConnectionFailed; - var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); @@ -49,7 +42,7 @@ pub fn insertBlock(self: *@This(), network: []const u8, slot: u64, blockhash: [] try ilp_buffer.appendSlice("\""); try ilp_buffer.appendSlice(",rewards="); - try std.fmt.format(ilp_buffer.writer(), "{d}", .{rewards}); + try std.fmt.format(ilp_buffer.writer(), "{f}", .{rewards}); try ilp_buffer.appendSlice(",transaction_count="); try std.fmt.format(ilp_buffer.writer(), "{d}", .{transaction_count}); @@ -62,15 +55,201 @@ pub fn insertBlock(self: *@This(), network: []const u8, slot: u64, blockhash: [] // Timestamp (use block_time as timestamp in nanoseconds) try ilp_buffer.appendSlice(" "); - try std.fmt.format(ilp_buffer.writer(), "{d}000000", .{block_time}); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); try ilp_buffer.appendSlice("\n"); // Send the ILP data to QuestDB - if (self.ilp_client) |client| { - _ = c_questdb.questdb_client_insert_ilp(client, ilp_buffer.items.ptr, ilp_buffer.items.len) catch |err| { - std.log.err("Failed to insert block ILP data: {any}", .{err}); - return types.QuestDBError.QueryFailed; - }; + try self.sendILP(ilp_buffer.items); +} + +/// Insert a transaction into QuestDB +pub fn insertTransaction(self: *@This(), network: []const u8, signature: []const u8, slot: u64, block_time: i64, success: bool, fee: u64, compute_units_consumed: u64, compute_units_price: u64, recent_blockhash: []const u8, program_ids: []const []const u8, signers: []const []const u8, account_keys: []const []const u8, pre_balances: []const u8, post_balances: []const u8, pre_token_balances: []const u8, post_token_balances: []const u8, log_messages: []const []const u8, error_msg: ?[]const u8) !void { + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("transactions,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",signature="); + try ilp_buffer.appendSlice(signature); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",success="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{if (success) 1 else 0}); + + try ilp_buffer.appendSlice(",fee="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{fee}); + + try ilp_buffer.appendSlice(",compute_units_consumed="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{compute_units_consumed}); + + try ilp_buffer.appendSlice(",compute_units_price="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{compute_units_price}); + + try ilp_buffer.appendSlice(",recent_blockhash=\""); + try ilp_buffer.appendSlice(recent_blockhash); + try ilp_buffer.appendSlice("\""); + + if (error_msg) |err_msg| { + try ilp_buffer.appendSlice(",error_msg=\""); + try ilp_buffer.appendSlice(err_msg); + try ilp_buffer.appendSlice("\""); } + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + try self.sendILP(ilp_buffer.items); +} + +/// Insert program execution metrics into QuestDB +pub fn insertProgramExecution(self: *@This(), network: []const u8, program_id: []const u8, slot: u64, block_time: i64, execution_count: u32, total_cu_consumed: u64, total_fee: u64, success_count: u32, error_count: u32) !void { + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("program_executions,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",program_id="); + try ilp_buffer.appendSlice(program_id); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",execution_count="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{execution_count}); + + try ilp_buffer.appendSlice(",total_cu_consumed="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_cu_consumed}); + + try ilp_buffer.appendSlice(",total_fee="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_fee}); + + try ilp_buffer.appendSlice(",success_count="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{success_count}); + + try ilp_buffer.appendSlice(",error_count="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{error_count}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + try self.sendILP(ilp_buffer.items); +} + +/// Insert program metrics into QuestDB +pub fn insertProgramMetrics(self: *@This(), network: []const u8, program_id: []const u8, slot: u64, block_time: i64, total_transactions: u64, unique_users: u64, total_compute_units: u64, total_fees: u64, avg_compute_units: f64, avg_fees: f64, error_rate: f64) !void { + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("program_metrics,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",program_id="); + try ilp_buffer.appendSlice(program_id); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",total_transactions="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_transactions}); + + try ilp_buffer.appendSlice(",unique_users="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{unique_users}); + + try ilp_buffer.appendSlice(",total_compute_units="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_compute_units}); + + try ilp_buffer.appendSlice(",total_fees="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_fees}); + + try ilp_buffer.appendSlice(",avg_compute_units="); + try std.fmt.format(ilp_buffer.writer(), "{f}", .{avg_compute_units}); + + try ilp_buffer.appendSlice(",avg_fees="); + try std.fmt.format(ilp_buffer.writer(), "{f}", .{avg_fees}); + + try ilp_buffer.appendSlice(",error_rate="); + try std.fmt.format(ilp_buffer.writer(), "{f}", .{error_rate}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + try self.sendILP(ilp_buffer.items); +} + +/// Insert an account activity record into QuestDB +pub fn insertAccountActivity(self: *@This(), network: []const u8, account: []const u8, slot: u64, block_time: i64, transaction_count: u32, total_sol_transferred: u64, total_fee_paid: u64, program_interactions: []const []const u8, transaction_signatures: []const []const u8) !void { + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("account_activity,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",account="); + try ilp_buffer.appendSlice(account); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",transaction_count="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{transaction_count}); + + try ilp_buffer.appendSlice(",total_sol_transferred="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_sol_transferred}); + + try ilp_buffer.appendSlice(",total_fee_paid="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_fee_paid}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + try self.sendILP(ilp_buffer.items); } \ No newline at end of file diff --git a/src/questdb/defi.zig b/src/questdb/defi.zig index ae2a3fa..f790f40 100644 --- a/src/questdb/defi.zig +++ b/src/questdb/defi.zig @@ -7,12 +7,8 @@ const types = @import("types.zig"); /// Insert a liquidity pool into QuestDB pub fn insertLiquidityPool(self: *@This(), network: []const u8, pool_address: []const u8, slot: u64, block_time: i64, protocol: []const u8, token_a_mint: []const u8, token_b_mint: []const u8, token_a_amount: u64, token_b_amount: u64, lp_token_mint: []const u8, lp_token_supply: u64) !void { - if (self.logging_only) { - std.log.info("Logging-only mode, skipping liquidity pool insert for {s}", .{pool_address}); - return; } - if (self.ilp_client == null) return types.QuestDBError.ConnectionFailed; var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); @@ -58,15 +54,479 @@ pub fn insertLiquidityPool(self: *@This(), network: []const u8, pool_address: [] // Timestamp (use block_time as timestamp in nanoseconds) try ilp_buffer.appendSlice(" "); - try std.fmt.format(ilp_buffer.writer(), "{d}000000", .{block_time}); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); try ilp_buffer.appendSlice("\n"); // Send the ILP data to QuestDB if (self.ilp_client) |client| { - _ = c_questdb.questdb_client_insert_ilp(client, ilp_buffer.items.ptr, ilp_buffer.items.len) catch |err| { std.log.err("Failed to insert liquidity pool ILP data: {any}", .{err}); return types.QuestDBError.QueryFailed; }; } +} + +/// Insert a pool swap into QuestDB +pub fn insertPoolSwap(self: *@This(), network: []const u8, signature: []const u8, slot: u64, block_time: i64, pool_address: []const u8, user_account: []const u8, token_in_mint: []const u8, token_out_mint: []const u8, token_in_amount: u64, token_out_amount: u64, token_in_price_usd: f64, token_out_price_usd: f64, fee_amount: u64, program_id: []const u8) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("pool_swaps,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",signature="); + try ilp_buffer.appendSlice(signature); + try ilp_buffer.appendSlice(",pool_address="); + try ilp_buffer.appendSlice(pool_address); + try ilp_buffer.appendSlice(",program_id="); + try ilp_buffer.appendSlice(program_id); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",user_account=\""); + try ilp_buffer.appendSlice(user_account); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",token_in_mint=\""); + try ilp_buffer.appendSlice(token_in_mint); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",token_out_mint=\""); + try ilp_buffer.appendSlice(token_out_mint); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",token_in_amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{token_in_amount}); + + try ilp_buffer.appendSlice(",token_out_amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{token_out_amount}); + + try ilp_buffer.appendSlice(",token_in_price_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{token_in_price_usd}); + + try ilp_buffer.appendSlice(",token_out_price_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{token_out_price_usd}); + + try ilp_buffer.appendSlice(",fee_amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{fee_amount}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert pool swap ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert lending market into QuestDB +pub fn insertLendingMarket(self: *@This(), network: []const u8, market_address: []const u8, slot: u64, block_time: i64, protocol_id: []const u8, asset_mint: []const u8, c_token_mint: []const u8, total_deposits: u64, total_borrows: u64, deposit_rate: f64, borrow_rate: f64, utilization_rate: f64, liquidation_threshold: f64, ltv_ratio: f64, asset_price_usd: f64, tvl_usd: f64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("lending_markets,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",market_address="); + try ilp_buffer.appendSlice(market_address); + try ilp_buffer.appendSlice(",protocol_id="); + try ilp_buffer.appendSlice(protocol_id); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",asset_mint=\""); + try ilp_buffer.appendSlice(asset_mint); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",c_token_mint=\""); + try ilp_buffer.appendSlice(c_token_mint); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",total_deposits="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_deposits}); + + try ilp_buffer.appendSlice(",total_borrows="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_borrows}); + + try ilp_buffer.appendSlice(",deposit_rate="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{deposit_rate}); + + try ilp_buffer.appendSlice(",borrow_rate="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{borrow_rate}); + + try ilp_buffer.appendSlice(",utilization_rate="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{utilization_rate}); + + try ilp_buffer.appendSlice(",liquidation_threshold="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{liquidation_threshold}); + + try ilp_buffer.appendSlice(",ltv_ratio="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{ltv_ratio}); + + try ilp_buffer.appendSlice(",asset_price_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{asset_price_usd}); + + try ilp_buffer.appendSlice(",tvl_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{tvl_usd}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert lending market ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert lending position into QuestDB +pub fn insertLendingPosition(self: *@This(), network: []const u8, position_address: []const u8, slot: u64, block_time: i64, market_address: []const u8, owner: []const u8, deposit_amount: u64, borrow_amount: u64, collateral_amount: u64, liquidation_threshold: f64, health_factor: f64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("lending_positions,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",position_address="); + try ilp_buffer.appendSlice(position_address); + try ilp_buffer.appendSlice(",market_address="); + try ilp_buffer.appendSlice(market_address); + try ilp_buffer.appendSlice(",owner="); + try ilp_buffer.appendSlice(owner); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",deposit_amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{deposit_amount}); + + try ilp_buffer.appendSlice(",borrow_amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{borrow_amount}); + + try ilp_buffer.appendSlice(",collateral_amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{collateral_amount}); + + try ilp_buffer.appendSlice(",liquidation_threshold="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{liquidation_threshold}); + + try ilp_buffer.appendSlice(",health_factor="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{health_factor}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert lending position ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert perpetual market into QuestDB +pub fn insertPerpetualMarket(self: *@This(), network: []const u8, market_address: []const u8, slot: u64, block_time: i64, protocol_id: []const u8, base_token_mint: []const u8, quote_token_mint: []const u8, base_price_usd: f64, mark_price_usd: f64, index_price_usd: f64, funding_rate: f64, open_interest: u64, volume_24h_usd: f64, base_deposit_total: u64, quote_deposit_total: u64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("perpetual_markets,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",market_address="); + try ilp_buffer.appendSlice(market_address); + try ilp_buffer.appendSlice(",protocol_id="); + try ilp_buffer.appendSlice(protocol_id); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",base_token_mint=\""); + try ilp_buffer.appendSlice(base_token_mint); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",quote_token_mint=\""); + try ilp_buffer.appendSlice(quote_token_mint); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",base_price_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{base_price_usd}); + + try ilp_buffer.appendSlice(",mark_price_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{mark_price_usd}); + + try ilp_buffer.appendSlice(",index_price_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{index_price_usd}); + + try ilp_buffer.appendSlice(",funding_rate="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{funding_rate}); + + try ilp_buffer.appendSlice(",open_interest="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{open_interest}); + + try ilp_buffer.appendSlice(",volume_24h_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{volume_24h_usd}); + + try ilp_buffer.appendSlice(",base_deposit_total="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{base_deposit_total}); + + try ilp_buffer.appendSlice(",quote_deposit_total="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{quote_deposit_total}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert perpetual market ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert perpetual position into QuestDB +pub fn insertPerpetualPosition(self: *@This(), network: []const u8, position_address: []const u8, slot: u64, block_time: i64, market_address: []const u8, owner: []const u8, position_size: i64, entry_price: f64, liquidation_price: f64, unrealized_pnl: f64, realized_pnl: f64, collateral_amount: u64, leverage: f64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("perpetual_positions,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",position_address="); + try ilp_buffer.appendSlice(position_address); + try ilp_buffer.appendSlice(",market_address="); + try ilp_buffer.appendSlice(market_address); + try ilp_buffer.appendSlice(",owner="); + try ilp_buffer.appendSlice(owner); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",position_size="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{position_size}); + + try ilp_buffer.appendSlice(",entry_price="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{entry_price}); + + try ilp_buffer.appendSlice(",liquidation_price="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{liquidation_price}); + + try ilp_buffer.appendSlice(",unrealized_pnl="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{unrealized_pnl}); + + try ilp_buffer.appendSlice(",realized_pnl="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{realized_pnl}); + + try ilp_buffer.appendSlice(",collateral_amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{collateral_amount}); + + try ilp_buffer.appendSlice(",leverage="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{leverage}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert perpetual position ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert DeFi event into QuestDB +pub fn insertDefiEvent(self: *@This(), network: []const u8, signature: []const u8, slot: u64, block_time: i64, protocol_id: []const u8, event_type: []const u8, user_account: []const u8, market_address: []const u8, token_a_mint: []const u8, token_b_mint: []const u8, token_a_amount: u64, token_b_amount: u64, token_a_price_usd: f64, token_b_price_usd: f64, fee_amount: u64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("defi_events,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",signature="); + try ilp_buffer.appendSlice(signature); + try ilp_buffer.appendSlice(",protocol_id="); + try ilp_buffer.appendSlice(protocol_id); + try ilp_buffer.appendSlice(",event_type="); + try ilp_buffer.appendSlice(event_type); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",user_account=\""); + try ilp_buffer.appendSlice(user_account); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",market_address=\""); + try ilp_buffer.appendSlice(market_address); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",token_a_mint=\""); + try ilp_buffer.appendSlice(token_a_mint); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",token_b_mint=\""); + try ilp_buffer.appendSlice(token_b_mint); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",token_a_amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{token_a_amount}); + + try ilp_buffer.appendSlice(",token_b_amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{token_b_amount}); + + try ilp_buffer.appendSlice(",token_a_price_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{token_a_price_usd}); + + try ilp_buffer.appendSlice(",token_b_price_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{token_b_price_usd}); + + try ilp_buffer.appendSlice(",fee_amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{fee_amount}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert DeFi event ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert DeFi analytics into QuestDB +pub fn insertDefiAnalytics(self: *@This(), network: []const u8, protocol_id: []const u8, slot: u64, block_time: i64, tvl_usd: f64, volume_24h_usd: f64, fee_24h_usd: f64, unique_users_24h: u64, transaction_count_24h: u64, revenue_24h_usd: f64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("defi_analytics,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",protocol_id="); + try ilp_buffer.appendSlice(protocol_id); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",tvl_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{tvl_usd}); + + try ilp_buffer.appendSlice(",volume_24h_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{volume_24h_usd}); + + try ilp_buffer.appendSlice(",fee_24h_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{fee_24h_usd}); + + try ilp_buffer.appendSlice(",unique_users_24h="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{unique_users_24h}); + + try ilp_buffer.appendSlice(",transaction_count_24h="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{transaction_count_24h}); + + try ilp_buffer.appendSlice(",revenue_24h_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{revenue_24h_usd}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert DeFi analytics ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } } \ No newline at end of file diff --git a/src/questdb/instruction.zig b/src/questdb/instruction.zig index 27c7c5e..9539a18 100644 --- a/src/questdb/instruction.zig +++ b/src/questdb/instruction.zig @@ -7,12 +7,8 @@ const types = @import("types.zig"); /// Insert an instruction into QuestDB pub fn insertInstruction(self: *@This(), network: []const u8, signature: []const u8, slot: u64, block_time: i64, program_id: []const u8, instruction_index: u32, inner_instruction_index: ?u32, instruction_type: []const u8, parsed_data: []const u8, accounts: []const []const u8) !void { - if (self.logging_only) { - std.log.info("Logging-only mode, skipping instruction insert for {s}:{d}", .{signature, instruction_index}); - return; } - if (self.ilp_client == null) return types.QuestDBError.ConnectionFailed; var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); @@ -68,13 +64,12 @@ pub fn insertInstruction(self: *@This(), network: []const u8, signature: []const // Timestamp (use block_time as timestamp in nanoseconds) try ilp_buffer.appendSlice(" "); - try std.fmt.format(ilp_buffer.writer(), "{d}000000", .{block_time}); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); try ilp_buffer.appendSlice("\n"); // Send the ILP data to QuestDB if (self.ilp_client) |client| { - _ = c_questdb.questdb_client_insert_ilp(client, ilp_buffer.items.ptr, ilp_buffer.items.len) catch |err| { std.log.err("Failed to insert instruction ILP data: {any}", .{err}); return types.QuestDBError.QueryFailed; }; diff --git a/src/questdb/nft.zig b/src/questdb/nft.zig index e629fcf..04354fe 100644 --- a/src/questdb/nft.zig +++ b/src/questdb/nft.zig @@ -7,12 +7,8 @@ const types = @import("types.zig"); /// Insert an NFT collection into QuestDB pub fn insertNftCollection(self: *@This(), network: []const u8, collection_address: []const u8, slot: u64, block_time: i64, name: []const u8, symbol: []const u8, uri: []const u8, seller_fee_basis_points: u16, creator_addresses: []const []const u8, creator_shares: []const u8) !void { - if (self.logging_only) { - std.log.info("Logging-only mode, skipping NFT collection insert for {s}", .{collection_address}); - return; } - if (self.ilp_client == null) return types.QuestDBError.ConnectionFailed; var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); @@ -57,23 +53,409 @@ pub fn insertNftCollection(self: *@This(), network: []const u8, collection_addre try ilp_buffer.appendSlice("\""); try ilp_buffer.appendSlice(",creator_shares=\""); - for (creator_shares, 0..) |share, i| { - if (i > 0) try ilp_buffer.appendSlice(","); - try std.fmt.format(ilp_buffer.writer(), "{d}", .{share}); - } + try ilp_buffer.appendSlice(creator_shares); try ilp_buffer.appendSlice("\""); // Timestamp (use block_time as timestamp in nanoseconds) try ilp_buffer.appendSlice(" "); - try std.fmt.format(ilp_buffer.writer(), "{d}000000", .{block_time}); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); try ilp_buffer.appendSlice("\n"); // Send the ILP data to QuestDB if (self.ilp_client) |client| { - _ = c_questdb.questdb_client_insert_ilp(client, ilp_buffer.items.ptr, ilp_buffer.items.len) catch |err| { std.log.err("Failed to insert NFT collection ILP data: {any}", .{err}); return types.QuestDBError.QueryFailed; }; } +} + +/// Insert NFT mint into QuestDB +pub fn insertNftMint(self: *@This(), network: []const u8, mint_address: []const u8, slot: u64, block_time: i64, collection_address: []const u8, owner: []const u8, creator_address: []const u8, name: []const u8, symbol: []const u8, uri: []const u8, seller_fee_basis_points: u16, primary_sale_happened: bool, is_mutable: bool, edition_nonce: ?u64, token_standard: []const u8, uses: ?[]const u8) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("nft_mints,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",mint_address="); + try ilp_buffer.appendSlice(mint_address); + try ilp_buffer.appendSlice(",collection_address="); + try ilp_buffer.appendSlice(collection_address); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",owner=\""); + try ilp_buffer.appendSlice(owner); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",creator_address=\""); + try ilp_buffer.appendSlice(creator_address); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",name=\""); + try ilp_buffer.appendSlice(name); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",symbol=\""); + try ilp_buffer.appendSlice(symbol); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",uri=\""); + try ilp_buffer.appendSlice(uri); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",seller_fee_basis_points="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{seller_fee_basis_points}); + + try ilp_buffer.appendSlice(",primary_sale_happened="); + try std.fmt.format(ilp_buffer.writer(), "{}", .{primary_sale_happened}); + + try ilp_buffer.appendSlice(",is_mutable="); + try std.fmt.format(ilp_buffer.writer(), "{}", .{is_mutable}); + + if (edition_nonce) |nonce| { + try ilp_buffer.appendSlice(",edition_nonce="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{nonce}); + } + + try ilp_buffer.appendSlice(",token_standard=\""); + try ilp_buffer.appendSlice(token_standard); + try ilp_buffer.appendSlice("\""); + + if (uses) |uses_str| { + try ilp_buffer.appendSlice(",uses=\""); + try ilp_buffer.appendSlice(uses_str); + try ilp_buffer.appendSlice("\""); + } + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert NFT mint ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert NFT listing into QuestDB +pub fn insertNftListing(self: *@This(), network: []const u8, listing_address: []const u8, slot: u64, block_time: i64, marketplace: []const u8, mint_address: []const u8, collection_address: []const u8, seller: []const u8, price_sol: f64, expiry_time: i64, cancelled: bool) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("nft_listings,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",listing_address="); + try ilp_buffer.appendSlice(listing_address); + try ilp_buffer.appendSlice(",marketplace="); + try ilp_buffer.appendSlice(marketplace); + try ilp_buffer.appendSlice(",mint_address="); + try ilp_buffer.appendSlice(mint_address); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",collection_address=\""); + try ilp_buffer.appendSlice(collection_address); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",seller=\""); + try ilp_buffer.appendSlice(seller); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",price_sol="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{price_sol}); + + try ilp_buffer.appendSlice(",expiry_time="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{expiry_time}); + + try ilp_buffer.appendSlice(",cancelled="); + try std.fmt.format(ilp_buffer.writer(), "{}", .{cancelled}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert NFT listing ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert NFT sale into QuestDB +pub fn insertNftSale(self: *@This(), network: []const u8, signature: []const u8, slot: u64, block_time: i64, marketplace: []const u8, mint_address: []const u8, collection_address: []const u8, seller: []const u8, buyer: []const u8, price_sol: f64, price_usd: f64, fee_amount: f64, royalty_amount: f64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("nft_sales,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",signature="); + try ilp_buffer.appendSlice(signature); + try ilp_buffer.appendSlice(",marketplace="); + try ilp_buffer.appendSlice(marketplace); + try ilp_buffer.appendSlice(",mint_address="); + try ilp_buffer.appendSlice(mint_address); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",collection_address=\""); + try ilp_buffer.appendSlice(collection_address); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",seller=\""); + try ilp_buffer.appendSlice(seller); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",buyer=\""); + try ilp_buffer.appendSlice(buyer); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",price_sol="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{price_sol}); + + try ilp_buffer.appendSlice(",price_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{price_usd}); + + try ilp_buffer.appendSlice(",fee_amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{fee_amount}); + + try ilp_buffer.appendSlice(",royalty_amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{royalty_amount}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert NFT sale ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert NFT bid into QuestDB +pub fn insertNftBid(self: *@This(), network: []const u8, bid_address: []const u8, slot: u64, block_time: i64, marketplace: []const u8, mint_address: []const u8, collection_address: []const u8, bidder: []const u8, price_sol: f64, expiry_time: i64, cancelled: bool) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("nft_bids,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",bid_address="); + try ilp_buffer.appendSlice(bid_address); + try ilp_buffer.appendSlice(",marketplace="); + try ilp_buffer.appendSlice(marketplace); + try ilp_buffer.appendSlice(",mint_address="); + try ilp_buffer.appendSlice(mint_address); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",collection_address=\""); + try ilp_buffer.appendSlice(collection_address); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",bidder=\""); + try ilp_buffer.appendSlice(bidder); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",price_sol="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{price_sol}); + + try ilp_buffer.appendSlice(",expiry_time="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{expiry_time}); + + try ilp_buffer.appendSlice(",cancelled="); + try std.fmt.format(ilp_buffer.writer(), "{}", .{cancelled}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert NFT bid ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert NFT activity into QuestDB +pub fn insertNftActivity(self: *@This(), network: []const u8, signature: []const u8, slot: u64, block_time: i64, activity_type: []const u8, marketplace: []const u8, mint_address: []const u8, collection_address: []const u8, user_account: []const u8, price_sol: f64, price_usd: f64, success: bool) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("nft_activity,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",signature="); + try ilp_buffer.appendSlice(signature); + try ilp_buffer.appendSlice(",activity_type="); + try ilp_buffer.appendSlice(activity_type); + try ilp_buffer.appendSlice(",marketplace="); + try ilp_buffer.appendSlice(marketplace); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",mint_address=\""); + try ilp_buffer.appendSlice(mint_address); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",collection_address=\""); + try ilp_buffer.appendSlice(collection_address); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",user_account=\""); + try ilp_buffer.appendSlice(user_account); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",price_sol="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{price_sol}); + + try ilp_buffer.appendSlice(",price_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{price_usd}); + + try ilp_buffer.appendSlice(",success="); + try std.fmt.format(ilp_buffer.writer(), "{}", .{success}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert NFT activity ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert NFT analytics into QuestDB +pub fn insertNftAnalytics(self: *@This(), network: []const u8, collection_address: []const u8, slot: u64, block_time: i64, mint_count: u64, sale_count: u64, listing_count: u64, bid_count: u64, unique_holders: u64, total_volume_sol: f64, avg_price_sol: f64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("nft_analytics,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",collection_address="); + try ilp_buffer.appendSlice(collection_address); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",mint_count="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{mint_count}); + + try ilp_buffer.appendSlice(",sale_count="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{sale_count}); + + try ilp_buffer.appendSlice(",listing_count="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{listing_count}); + + try ilp_buffer.appendSlice(",bid_count="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{bid_count}); + + try ilp_buffer.appendSlice(",unique_holders="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{unique_holders}); + + try ilp_buffer.appendSlice(",total_volume_sol="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_volume_sol}); + + try ilp_buffer.appendSlice(",avg_price_sol="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{avg_price_sol}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert NFT analytics ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } } \ No newline at end of file diff --git a/src/questdb/security.zig b/src/questdb/security.zig index 1d4394d..71fa8f0 100644 --- a/src/questdb/security.zig +++ b/src/questdb/security.zig @@ -7,12 +7,8 @@ const types = @import("types.zig"); /// Insert a security event into QuestDB pub fn insertSecurityEvent(self: *@This(), network: []const u8, event_type: []const u8, slot: u64, block_time: i64, signature: []const u8, program_id: []const u8, account_address: []const u8, severity: []const u8, description: []const u8) !void { - if (self.logging_only) { - std.log.info("Logging-only mode, skipping security event insert for {s}", .{signature}); - return; } - if (self.ilp_client == null) return types.QuestDBError.ConnectionFailed; var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); @@ -61,15 +57,348 @@ pub fn insertSecurityEvent(self: *@This(), network: []const u8, event_type: []co // Timestamp (use block_time as timestamp in nanoseconds) try ilp_buffer.appendSlice(" "); - try std.fmt.format(ilp_buffer.writer(), "{d}000000", .{block_time}); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); try ilp_buffer.appendSlice("\n"); // Send the ILP data to QuestDB if (self.ilp_client) |client| { - _ = c_questdb.questdb_client_insert_ilp(client, ilp_buffer.items.ptr, ilp_buffer.items.len) catch |err| { std.log.err("Failed to insert security event ILP data: {any}", .{err}); return types.QuestDBError.QueryFailed; }; } +} + +/// Insert suspicious account into QuestDB +pub fn insertSuspiciousAccount(self: *@This(), network: []const u8, account_address: []const u8, slot: u64, block_time: i64, risk_score: f64, risk_factors: []const []const u8, associated_events: []const []const u8, linked_accounts: []const []const u8, last_activity_slot: u64, total_volume_usd: f64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("suspicious_accounts,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",account_address="); + try ilp_buffer.appendSlice(account_address); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",risk_score="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{risk_score}); + + // Format risk factors as JSON array + try ilp_buffer.appendSlice(",risk_factors=\""); + for (risk_factors, 0..) |factor, i| { + if (i > 0) try ilp_buffer.appendSlice(","); + try ilp_buffer.appendSlice(factor); + } + try ilp_buffer.appendSlice("\""); + + // Format associated events as JSON array + try ilp_buffer.appendSlice(",associated_events=\""); + for (associated_events, 0..) |event, i| { + if (i > 0) try ilp_buffer.appendSlice(","); + try ilp_buffer.appendSlice(event); + } + try ilp_buffer.appendSlice("\""); + + // Format linked accounts as JSON array + try ilp_buffer.appendSlice(",linked_accounts=\""); + for (linked_accounts, 0..) |account, i| { + if (i > 0) try ilp_buffer.appendSlice(","); + try ilp_buffer.appendSlice(account); + } + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",last_activity_slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{last_activity_slot}); + + try ilp_buffer.appendSlice(",total_volume_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_volume_usd}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert suspicious account ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert program security metrics into QuestDB +pub fn insertProgramSecurityMetrics(self: *@This(), network: []const u8, program_id: []const u8, slot: u64, block_time: i64, audit_status: []const u8, vulnerability_count: u32, critical_vulnerabilities: u32, high_vulnerabilities: u32, medium_vulnerabilities: u32, low_vulnerabilities: u32, last_audit_date: i64, auditor: []const u8, tvl_at_risk_usd: f64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("program_security_metrics,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",program_id="); + try ilp_buffer.appendSlice(program_id); + try ilp_buffer.appendSlice(",audit_status="); + try ilp_buffer.appendSlice(audit_status); + try ilp_buffer.appendSlice(",auditor="); + try ilp_buffer.appendSlice(auditor); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",vulnerability_count="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{vulnerability_count}); + + try ilp_buffer.appendSlice(",critical_vulnerabilities="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{critical_vulnerabilities}); + + try ilp_buffer.appendSlice(",high_vulnerabilities="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{high_vulnerabilities}); + + try ilp_buffer.appendSlice(",medium_vulnerabilities="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{medium_vulnerabilities}); + + try ilp_buffer.appendSlice(",low_vulnerabilities="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{low_vulnerabilities}); + + try ilp_buffer.appendSlice(",last_audit_date="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{last_audit_date}); + + try ilp_buffer.appendSlice(",tvl_at_risk_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{tvl_at_risk_usd}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert program security metrics ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert security analytics into QuestDB +pub fn insertSecurityAnalytics(self: *@This(), network: []const u8, slot: u64, block_time: i64, category: []const u8, total_events_24h: u64, critical_events_24h: u64, affected_users_24h: u64, total_loss_usd: f64, average_risk_score: f64, unique_attack_vectors: u64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("security_analytics,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",category="); + try ilp_buffer.appendSlice(category); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",total_events_24h="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_events_24h}); + + try ilp_buffer.appendSlice(",critical_events_24h="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{critical_events_24h}); + + try ilp_buffer.appendSlice(",affected_users_24h="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{affected_users_24h}); + + try ilp_buffer.appendSlice(",total_loss_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_loss_usd}); + + try ilp_buffer.appendSlice(",average_risk_score="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{average_risk_score}); + + try ilp_buffer.appendSlice(",unique_attack_vectors="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{unique_attack_vectors}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert security analytics ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert risk assessment into QuestDB +pub fn insertRiskAssessment(self: *@This(), network: []const u8, program_id: []const u8, slot: u64, block_time: i64, risk_category: []const u8, risk_score: f64, risk_factors: []const []const u8, mitigation_steps: []const []const u8, impact_score: f64, likelihood_score: f64, tvl_exposed_usd: f64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("risk_assessments,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",program_id="); + try ilp_buffer.appendSlice(program_id); + try ilp_buffer.appendSlice(",risk_category="); + try ilp_buffer.appendSlice(risk_category); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",risk_score="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{risk_score}); + + // Format risk factors as JSON array + try ilp_buffer.appendSlice(",risk_factors=\""); + for (risk_factors, 0..) |factor, i| { + if (i > 0) try ilp_buffer.appendSlice(","); + try ilp_buffer.appendSlice(factor); + } + try ilp_buffer.appendSlice("\""); + + // Format mitigation steps as JSON array + try ilp_buffer.appendSlice(",mitigation_steps=\""); + for (mitigation_steps, 0..) |step, i| { + if (i > 0) try ilp_buffer.appendSlice(","); + try ilp_buffer.appendSlice(step); + } + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",impact_score="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{impact_score}); + + try ilp_buffer.appendSlice(",likelihood_score="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{likelihood_score}); + + try ilp_buffer.appendSlice(",tvl_exposed_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{tvl_exposed_usd}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert risk assessment ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert security alert into QuestDB +pub fn insertSecurityAlert(self: *@This(), network: []const u8, alert_id: []const u8, slot: u64, block_time: i64, alert_type: []const u8, severity: []const u8, description: []const u8, affected_accounts: []const []const u8, affected_programs: []const []const u8, loss_amount_usd: f64, resolved: bool) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("security_alerts,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",alert_id="); + try ilp_buffer.appendSlice(alert_id); + try ilp_buffer.appendSlice(",alert_type="); + try ilp_buffer.appendSlice(alert_type); + try ilp_buffer.appendSlice(",severity="); + try ilp_buffer.appendSlice(severity); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",description=\""); + // Escape quotes in description + var escaped_desc = std.ArrayList(u8).init(arena.allocator()); + for (description) |c| { + if (c == '"') { + try escaped_desc.appendSlice("\\"); + } + try escaped_desc.append(c); + } + try ilp_buffer.appendSlice(escaped_desc.items); + try ilp_buffer.appendSlice("\""); + + // Format affected accounts as JSON array + try ilp_buffer.appendSlice(",affected_accounts=\""); + for (affected_accounts, 0..) |account, i| { + if (i > 0) try ilp_buffer.appendSlice(","); + try ilp_buffer.appendSlice(account); + } + try ilp_buffer.appendSlice("\""); + + // Format affected programs as JSON array + try ilp_buffer.appendSlice(",affected_programs=\""); + for (affected_programs, 0..) |program, i| { + if (i > 0) try ilp_buffer.appendSlice(","); + try ilp_buffer.appendSlice(program); + } + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",loss_amount_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{loss_amount_usd}); + + try ilp_buffer.appendSlice(",resolved="); + try std.fmt.format(ilp_buffer.writer(), "{}", .{resolved}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert security alert ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } } \ No newline at end of file diff --git a/src/questdb/token.zig b/src/questdb/token.zig index e32cbf6..2487f10 100644 --- a/src/questdb/token.zig +++ b/src/questdb/token.zig @@ -7,12 +7,8 @@ const types = @import("types.zig"); /// Insert a token mint into QuestDB pub fn insertTokenMint(self: *@This(), network: []const u8, mint_address: []const u8, slot: u64, block_time: i64, owner: []const u8, supply: u64, decimals: u8, is_nft: bool) !void { - if (self.logging_only) { - std.log.info("Logging-only mode, skipping token mint insert for {s}", .{mint_address}); - return; } - if (self.ilp_client == null) return types.QuestDBError.ConnectionFailed; var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); @@ -48,15 +44,397 @@ pub fn insertTokenMint(self: *@This(), network: []const u8, mint_address: []cons // Timestamp (use block_time as timestamp in nanoseconds) try ilp_buffer.appendSlice(" "); - try std.fmt.format(ilp_buffer.writer(), "{d}000000", .{block_time}); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); try ilp_buffer.appendSlice("\n"); // Send the ILP data to QuestDB if (self.ilp_client) |client| { - _ = c_questdb.questdb_client_insert_ilp(client, ilp_buffer.items.ptr, ilp_buffer.items.len) catch |err| { std.log.err("Failed to insert token mint ILP data: {any}", .{err}); return types.QuestDBError.QueryFailed; }; } +} + +/// Insert a token account into QuestDB +pub fn insertTokenAccount(self: *@This(), network: []const u8, account_address: []const u8, mint_address: []const u8, slot: u64, block_time: i64, owner: []const u8, amount: u64, delegate: ?[]const u8, delegated_amount: u64, is_initialized: bool, is_frozen: bool, is_native: bool, rent_exempt_reserve: ?u64, close_authority: ?[]const u8) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("token_accounts,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",account_address="); + try ilp_buffer.appendSlice(account_address); + try ilp_buffer.appendSlice(",mint_address="); + try ilp_buffer.appendSlice(mint_address); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",owner=\""); + try ilp_buffer.appendSlice(owner); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{amount}); + + if (delegate) |del| { + try ilp_buffer.appendSlice(",delegate=\""); + try ilp_buffer.appendSlice(del); + try ilp_buffer.appendSlice("\""); + } + + try ilp_buffer.appendSlice(",delegated_amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{delegated_amount}); + + try ilp_buffer.appendSlice(",is_initialized="); + try std.fmt.format(ilp_buffer.writer(), "{}", .{is_initialized}); + + try ilp_buffer.appendSlice(",is_frozen="); + try std.fmt.format(ilp_buffer.writer(), "{}", .{is_frozen}); + + try ilp_buffer.appendSlice(",is_native="); + try std.fmt.format(ilp_buffer.writer(), "{}", .{is_native}); + + if (rent_exempt_reserve) |rer| { + try ilp_buffer.appendSlice(",rent_exempt_reserve="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{rer}); + } + + if (close_authority) |ca| { + try ilp_buffer.appendSlice(",close_authority=\""); + try ilp_buffer.appendSlice(ca); + try ilp_buffer.appendSlice("\""); + } + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert token account ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert a token transfer into QuestDB +pub fn insertTokenTransfer(self: *@This(), network: []const u8, signature: []const u8, slot: u64, block_time: i64, mint_address: []const u8, from_account: []const u8, to_account: []const u8, amount: u64, decimals: u8, program_id: []const u8, instruction_type: []const u8) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("token_transfers,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",signature="); + try ilp_buffer.appendSlice(signature); + try ilp_buffer.appendSlice(",mint_address="); + try ilp_buffer.appendSlice(mint_address); + try ilp_buffer.appendSlice(",program_id="); + try ilp_buffer.appendSlice(program_id); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",from_account=\""); + try ilp_buffer.appendSlice(from_account); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",to_account=\""); + try ilp_buffer.appendSlice(to_account); + try ilp_buffer.appendSlice("\""); + + try ilp_buffer.appendSlice(",amount="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{amount}); + + try ilp_buffer.appendSlice(",decimals="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{decimals}); + + try ilp_buffer.appendSlice(",instruction_type=\""); + try ilp_buffer.appendSlice(instruction_type); + try ilp_buffer.appendSlice("\""); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert token transfer ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert token holder information into QuestDB +pub fn insertTokenHolder(self: *@This(), network: []const u8, mint_address: []const u8, slot: u64, block_time: i64, owner: []const u8, balance: u64, balance_usd: f64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("token_holders,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",mint_address="); + try ilp_buffer.appendSlice(mint_address); + try ilp_buffer.appendSlice(",owner="); + try ilp_buffer.appendSlice(owner); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",balance="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{balance}); + + try ilp_buffer.appendSlice(",balance_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{balance_usd}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert token holder ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert token analytics into QuestDB +pub fn insertTokenAnalytics(self: *@This(), network: []const u8, mint_address: []const u8, slot: u64, block_time: i64, transfer_count: u64, unique_holders: u64, active_accounts: u64, total_volume_usd: f64, avg_transaction_size: f64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("token_analytics,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",mint_address="); + try ilp_buffer.appendSlice(mint_address); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",transfer_count="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{transfer_count}); + + try ilp_buffer.appendSlice(",unique_holders="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{unique_holders}); + + try ilp_buffer.appendSlice(",active_accounts="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{active_accounts}); + + try ilp_buffer.appendSlice(",total_volume_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_volume_usd}); + + try ilp_buffer.appendSlice(",avg_transaction_size="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{avg_transaction_size}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert token analytics ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert token program activity into QuestDB +pub fn insertTokenProgramActivity(self: *@This(), network: []const u8, program_id: []const u8, slot: u64, block_time: i64, instruction_type: []const u8, execution_count: u64, error_count: u64, unique_users: u64, unique_tokens: u64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("token_program_activity,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",program_id="); + try ilp_buffer.appendSlice(program_id); + try ilp_buffer.appendSlice(",instruction_type="); + try ilp_buffer.appendSlice(instruction_type); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",execution_count="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{execution_count}); + + try ilp_buffer.appendSlice(",error_count="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{error_count}); + + try ilp_buffer.appendSlice(",unique_users="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{unique_users}); + + try ilp_buffer.appendSlice(",unique_tokens="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{unique_tokens}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert token program activity ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert token supply history into QuestDB +pub fn insertTokenSupplyHistory(self: *@This(), network: []const u8, mint_address: []const u8, slot: u64, block_time: i64, total_supply: u64, circulating_supply: u64, holder_count: u64) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("token_supply_history,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",mint_address="); + try ilp_buffer.appendSlice(mint_address); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",total_supply="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{total_supply}); + + try ilp_buffer.appendSlice(",circulating_supply="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{circulating_supply}); + + try ilp_buffer.appendSlice(",holder_count="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{holder_count}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert token supply history ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } +} + +/// Insert token price into QuestDB +pub fn insertTokenPrice(self: *@This(), network: []const u8, mint_address: []const u8, slot: u64, block_time: i64, price_usd: f64, volume_usd: f64, liquidity_usd: f64, source: []const u8) !void { + } + + + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + + // Format as ILP (InfluxDB Line Protocol) + var ilp_buffer = std.ArrayList(u8).init(arena.allocator()); + + // Format: measurement,tag_set field_set timestamp + try ilp_buffer.appendSlice("token_prices,"); + + // Tags + try ilp_buffer.appendSlice("network="); + try ilp_buffer.appendSlice(network); + try ilp_buffer.appendSlice(",mint_address="); + try ilp_buffer.appendSlice(mint_address); + try ilp_buffer.appendSlice(",source="); + try ilp_buffer.appendSlice(source); + + // Fields + try ilp_buffer.appendSlice(" slot="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot}); + + try ilp_buffer.appendSlice(",price_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{price_usd}); + + try ilp_buffer.appendSlice(",volume_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{volume_usd}); + + try ilp_buffer.appendSlice(",liquidity_usd="); + try std.fmt.format(ilp_buffer.writer(), "{d}", .{liquidity_usd}); + + // Timestamp (use block_time as timestamp in nanoseconds) + try ilp_buffer.appendSlice(" "); + try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time}); + + try ilp_buffer.appendSlice("\n"); + + // Send the ILP data to QuestDB + if (self.ilp_client) |client| { + std.log.err("Failed to insert token price ILP data: {any}", .{err}); + return types.QuestDBError.QueryFailed; + }; + } } \ No newline at end of file