Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ pub fn build(b: *std.Build) void {
const target = b.standardTargetOptions(.{});
const optimize = b.standardOptimizeOption(.{});

// Get dependencies
const questdb_dep = b.dependency("c-questdb-client", .{});

// Create modules with explicit dependencies
const rpc_mod = b.addModule("rpc", .{
.source_file = .{ .path = "src/rpc.zig" },
Expand All @@ -27,7 +24,6 @@ pub fn build(b: *std.Build) void {
.source_file = .{ .path = "src/questdb.zig" },
.dependencies = &.{
.{ .name = "database", .module = database_mod },
.{ .name = "c-questdb-client", .module = questdb_dep.module("c-questdb-client") },
},
});

Expand Down
7 changes: 1 addition & 6 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,7 @@
.name = "zindexer",
.version = "0.1.0",
.minimum_zig_version = "0.14.0",
.dependencies = .{
.@"c-questdb-client" = .{
.url = "https://github.com/openSVM/c-questdb-client/archive/refs/heads/main.tar.gz",
.hash = "1220ffdabd90f696e3d850485b2eae8df806173a9ab6f50ae4fbb708928d06037145",
},
},
.dependencies = .{},
.paths = .{
"src",
"build.zig",
Expand Down
63 changes: 51 additions & 12 deletions src/questdb/account.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@ const types = @import("types.zig");

/// Insert an account into QuestDB
pub fn insertAccount(self: *@This(), network: []const u8, pubkey: []const u8, slot: u64, block_time: i64, owner: []const u8, lamports: u64, executable: u8, rent_epoch: u64, data_len: u64, write_version: u64) !void {
if (self.logging_only) {
std.log.info("Logging-only mode, skipping account insert for {s}", .{pubkey});
return;
}

if (self.ilp_client == null) return types.QuestDBError.ConnectionFailed;

var arena = std.heap.ArenaAllocator.init(self.allocator);
defer arena.deinit();
Expand Down Expand Up @@ -54,13 +50,12 @@ pub fn insertAccount(self: *@This(), network: []const u8, pubkey: []const u8, sl

// Timestamp (use block_time as timestamp in nanoseconds)
try ilp_buffer.appendSlice(" ");
try std.fmt.format(ilp_buffer.writer(), "{d}000000", .{block_time});
try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time});

try ilp_buffer.appendSlice("\n");

// Send the ILP data to QuestDB
if (self.ilp_client) |client| {
_ = c_questdb.questdb_client_insert_ilp(client, ilp_buffer.items.ptr, ilp_buffer.items.len) catch |err| {
std.log.err("Failed to insert account ILP data: {any}", .{err});
return types.QuestDBError.QueryFailed;
};
Expand All @@ -69,12 +64,8 @@ pub fn insertAccount(self: *@This(), network: []const u8, pubkey: []const u8, sl

/// Insert an account update into QuestDB
pub fn insertAccountUpdate(self: *@This(), network: []const u8, pubkey: []const u8, slot: u64, block_time: i64, owner: []const u8, lamports: u64, executable: u8, rent_epoch: u64, data_len: u64, write_version: u64) !void {
if (self.logging_only) {
std.log.info("Logging-only mode, skipping account update insert for {s}", .{pubkey});
return;
}

if (self.ilp_client == null) return types.QuestDBError.ConnectionFailed;

var arena = std.heap.ArenaAllocator.init(self.allocator);
defer arena.deinit();
Expand Down Expand Up @@ -116,15 +107,63 @@ pub fn insertAccountUpdate(self: *@This(), network: []const u8, pubkey: []const

// Timestamp (use block_time as timestamp in nanoseconds)
try ilp_buffer.appendSlice(" ");
try std.fmt.format(ilp_buffer.writer(), "{d}000000", .{block_time});
try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time});

try ilp_buffer.appendSlice("\n");

// Send the ILP data to QuestDB
if (self.ilp_client) |client| {
_ = c_questdb.questdb_client_insert_ilp(client, ilp_buffer.items.ptr, ilp_buffer.items.len) catch |err| {
std.log.err("Failed to insert account update ILP data: {any}", .{err});
return types.QuestDBError.QueryFailed;
};
}
}

/// Insert account activity into QuestDB (equivalent to insertAccountActivity in ClickHouse)
pub fn insertAccountActivity(self: *@This(), network: []const u8, pubkey: []const u8, slot: u64, block_time: i64, program_id: []const u8, write_count: u32, cu_consumed: u64, fee_paid: u64) !void {
}


var arena = std.heap.ArenaAllocator.init(self.allocator);
defer arena.deinit();

// Format as ILP (InfluxDB Line Protocol)
var ilp_buffer = std.ArrayList(u8).init(arena.allocator());

// Format: measurement,tag_set field_set timestamp
try ilp_buffer.appendSlice("account_activity,");

// Tags
try ilp_buffer.appendSlice("network=");
try ilp_buffer.appendSlice(network);
try ilp_buffer.appendSlice(",pubkey=");
try ilp_buffer.appendSlice(pubkey);
try ilp_buffer.appendSlice(",program_id=");
try ilp_buffer.appendSlice(program_id);

// Fields
try ilp_buffer.appendSlice(" slot=");
try std.fmt.format(ilp_buffer.writer(), "{d}", .{slot});

try ilp_buffer.appendSlice(",write_count=");
try std.fmt.format(ilp_buffer.writer(), "{d}", .{write_count});

try ilp_buffer.appendSlice(",cu_consumed=");
try std.fmt.format(ilp_buffer.writer(), "{d}", .{cu_consumed});

try ilp_buffer.appendSlice(",fee_paid=");
try std.fmt.format(ilp_buffer.writer(), "{d}", .{fee_paid});

// Timestamp (use block_time as timestamp in nanoseconds)
try ilp_buffer.appendSlice(" ");
try std.fmt.format(ilp_buffer.writer(), "{d}000000000", .{block_time});

try ilp_buffer.appendSlice("\n");

// Send the ILP data to QuestDB
if (self.ilp_client) |client| {
std.log.err("Failed to insert account activity ILP data: {any}", .{err});
return types.QuestDBError.QueryFailed;
};
}
}
Loading
Loading