Skip to content

Commit ad07c95

Browse files
committed
Compatible with MySQL and improve documentation
1 parent cab9d14 commit ad07c95

File tree

1 file changed

+131
-37
lines changed

1 file changed

+131
-37
lines changed

service/sqldriver.lua

Lines changed: 131 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,23 @@ local buffer = require("buffer")
33
local json = require("json")
44
local tbinsert = table.insert
55

6+
-- Module configuration passed via `conf` from service launcher.
7+
-- `conf` expected fields:
8+
-- - name: (optional) when set, this module acts as the provider side
9+
-- (accepting and executing queries using `conf.provider`).
10+
-- - provider: name of the provider module (e.g. "pg"), used when
11+
-- running in provider mode.
12+
-- - opts: provider connection options (host/port/user/password/dbname).
13+
-- - poolsize: number of per-service queues (default 1).
14+
--
15+
-- This file implements two roles depending on `conf.name`:
16+
-- 1) Provider/service side: manages a pool of DB connections/queues and
17+
-- executes queries received from other services. This includes retry
18+
-- logic and reconnection handling.
19+
-- 2) Client side (when `conf.name` is nil): convenience helpers that
20+
-- forward queries to the running sqldriver service via `moon.send`
21+
-- / `moon.call` (remote RPC).
22+
623
local conf = ...
724

825
---@class SqlClient
@@ -15,6 +32,27 @@ if conf.name then
1532

1633
local clone = buffer.to_shared
1734

35+
--- Execute a single query on a DB connection with retry and reconnect.
36+
---
37+
--- Behavior:
38+
--- - If `db` is provided, call `provider[cmd](db, sql)` to execute.
39+
--- - On socket-level errors (res.code == "SOCKET") the connection is
40+
--- closed and the function will attempt to reconnect and retry.
41+
--- - If the provider returns an error and `sessionid == 0` the error
42+
--- is logged with stack info; otherwise the response is sent back
43+
--- to the requester via `moon.response`.
44+
--- - On permanent failures the function returns without responding
45+
--- (the caller handles returning error objects when appropriate).
46+
---
47+
--- Parameters:
48+
--- - db: provider connection object or `nil` if not connected yet.
49+
--- - sql: query buffer (shared pointer) or raw SQL string/table.
50+
--- - sender: service id of the caller (used for `moon.response`).
51+
--- - sessionid: RPC session id (0 means no response expected).
52+
--- - cmd: provider command name, e.g. "query" or "query_params".
53+
---
54+
--- Returns:
55+
--- - db: the (possibly reconnected) db connection to keep in pool.
1856
---@param sql buffer_shr_ptr
1957
local function exec_one(db, sql, sender, sessionid, cmd)
2058
local faild_times = 0
@@ -64,43 +102,57 @@ if conf.name then
64102
end
65103
end
66104

105+
-- Number of per-hash queues (controls concurrency). Incoming requests
106+
-- are assigned to a queue using `hash % db_pool_size + 1`. This helps
107+
-- distribute load across multiple DB connections / workers.
67108
local db_pool_size = conf.poolsize or 1
68109

69110
local traceback = debug.traceback
70111
local xpcall = xpcall
71112

113+
-- Pool holds `db_pool_size` contexts. Each context contains:
114+
-- - queue: a FIFO of pending requests
115+
-- - running: whether a worker coroutine is currently processing the queue
116+
-- - db: the active DB connection for that context (or nil)
72117
local pool = {}
73118

74119
for _ = 1, db_pool_size do
75120
local one = { queue = list.new(), running = false, db = false }
76121
tbinsert(pool, one)
77122
end
78123

124+
-- Enqueue a request and ensure a worker is processing the queue.
125+
-- `args` format: { cmd, hash, sql_or_buffer }
126+
-- Worker loop processes items one-by-one by popping from the queue and
127+
-- calling `exec_one`. Any returned `db` connection is stored back into
128+
-- the context for reuse.
79129
local function execute(sender, sessionid, args)
80-
local hash = args[2]
81-
hash = hash % db_pool_size + 1
82-
--print(moon.name, "db hash", hash, db_pool_size)
130+
local hash = args[2] % db_pool_size + 1
83131
local ctx = pool[hash]
84-
list.push(ctx.queue, { clone(args[3]), sender, sessionid, args[1] })
132+
local sql = type(args[3]) == "userdata" and clone(args[3]) or args[3]
133+
list.push(ctx.queue, { sql, sender, sessionid, args[1] })
134+
85135
if ctx.running then
86136
return
87137
end
88138

89139
ctx.running = true
90-
91140
moon.async(function()
92141
while true do
93-
---{sql, sender, sessionid}
94142
local req = list.pop(ctx.queue)
95143
if not req then
96144
break
97145
end
98146
local ok, db = xpcall(exec_one, traceback, ctx.db, req[1], req[2], req[3], req[4])
99-
if not ok then
100-
---lua error
147+
if ok then
148+
ctx.db = db
149+
else
101150
moon.error(db)
151+
if ctx.db then
152+
ctx.db:disconnect()
153+
end
154+
ctx.db = nil
102155
end
103-
ctx.db = db
104156
end
105157
ctx.running = false
106158
end)
@@ -123,33 +175,40 @@ if conf.name then
123175
function command.save_then_quit()
124176
moon.async(function()
125177
while true do
126-
local all = true
127-
for _, v in ipairs(pool) do
178+
local all_empty = true
179+
for i, v in ipairs(pool) do
128180
if list.front(v.queue) then
129-
all = false
130-
print("wait_all_send", _, list.size(v.queue))
181+
all_empty = false
182+
print("wait_all_send", i, list.size(v.queue))
131183
break
132184
end
133185
end
134-
135-
if not all then
136-
moon.sleep(1000)
137-
else
186+
if all_empty then
138187
break
139188
end
189+
moon.sleep(1000)
140190
end
141-
142191
moon.quit()
143192
end)
144193
end
145194

146195
local function xpcall_ret(ok, ...)
147-
if ok then
148-
return moon.pack(...)
149-
end
150-
return moon.pack(false, ...)
196+
return moon.pack(ok and ... or false, ...)
151197
end
152198

199+
-- Raw dispatcher for incoming 'lua' messages.
200+
-- Message format decoded by `moon.decode(msg, "SEC")` yields
201+
-- sender, sessionid, sz, len
202+
-- where `sz,len` are used by `moon.unpack` to reconstruct the args
203+
-- table. `args[1]` is the command name.
204+
--
205+
-- If `provider[cmd]` exists we treat it as a DB operation (query,
206+
-- query_params, pipe, etc.) and forward to `execute()` to enqueue
207+
-- and process it in the pool.
208+
--
209+
-- Otherwise, if `command[cmd]` exists we invoke it directly. If the
210+
-- caller expects a response (`sessionid ~= 0`) we run the handler in
211+
-- an async coroutine and return the packed result via `moon.raw_send`.
153212
moon.raw_dispatch('lua', function(msg)
154213
local sender, sessionid, sz, len = moon.decode(msg, "SEC")
155214

@@ -179,37 +238,72 @@ if conf.name then
179238
end
180239
end)
181240
else
182-
183-
local json = require("json")
184-
241+
-- Client-side helpers: when this module is required without `conf.name`
242+
-- it exports lightweight functions that send/call the sqldriver service
243+
-- to perform queries. These are convenience wrappers used by other
244+
-- services to interact with the DB without embedding DB driver logic.
245+
--
246+
-- Note on `hash` argument: a numeric `hash` (defaults to 1) is used by
247+
-- the sqldriver to choose which queue/worker (pool slot) will process
248+
-- the request: `index = hash % db_pool_size + 1`. This allows callers
249+
-- to route related requests to the same worker (affinity) if needed.
185250
local concat = json.concat
186251

252+
--- Send a query (no response expected).
253+
---@param db integer service ID of the sqldriver
254+
---@param sql string|table SQL text or table that `json.concat` accepts
255+
---@param hash? integer optional routing hash (default 1)
187256
function client.execute(db, sql, hash)
188257
moon.send("lua", db, "query", hash or 1, concat(sql))
189258
end
190259

260+
--- Call a query and wait for the result (synchronous RPC).
261+
---@param db integer service ID of the sqldriver
262+
---@param sql string|table SQL text or buffer
263+
---@param hash? integer optional routing hash (default 1)
264+
---@return table provider result (rows/error table)
191265
function client.query(db, sql, hash)
192266
return moon.call("lua", db, "query", hash or 1, concat(sql))
193267
end
194268

195-
-- SQL with parameters binding support
196-
---@param db integer @ service ID
197-
---@param sql string @ SQL statement with placeholders : `select * from table where id = $1`
198-
---@param ... any @ parameters to bind to the SQL statement
269+
--- Query with parameter binding (Postgres-style $1, $2 placeholders) or (Mysql-style ? placeholders).
270+
--- Accepts either `client.query_params(db, "select ... $1", param1)` or
271+
--- a prebuilt table form `{sql, param1, param2, ...}`.
272+
--- If the SQL contains `$` placeholders it will be converted with
273+
--- `json.pq_query`; otherwise params are forwarded as-is.
274+
---@param db integer service ID
275+
---@param sql string|table SQL string or table {sql, ...}
276+
---@param ... any parameters to bind
199277
function client.query_params(db, sql, ...)
200-
if string.find(sql, "$") ~= nil then -- PostgreSQL style parameter binding
201-
return moon.call("lua", db, "query_params", 1, json.pq_query({sql, ...}))
202-
else -- MySQL style parameter binding
203-
return moon.call("lua", db, "query_params", 1, {sql, ...})
204-
end
278+
local params = type(sql) == "string" and {sql, ...} or sql
279+
local query_data = string.find(params[1], "$", 1, true) and json.pq_query(params) or params
280+
return moon.call("lua", db, "query_params", 1, query_data)
205281
end
206282

207-
-- PostgreSQL pipe support, with transaction
208-
---@param db integer @ service ID
209-
---@param req table @ {{sql, param1, param2, ...}, {sql, param1, param2, ...}, ...}
283+
--- Like `query_params` but fire-and-forget (no response expected).
284+
---@param db integer service ID
285+
---@param sql string|table SQL string or table {sql, ...}
286+
---@param ... any parameters to bind
287+
function client.execute_params(db, sql, ...)
288+
local params = type(sql) == "string" and {sql, ...} or sql
289+
local query_data = string.find(params[1], "$", 1, true) and json.pq_query(params) or params
290+
moon.send("lua", db, "query_params", 1, query_data)
291+
end
292+
293+
--- PostgreSQL Pipe support: execute multiple statements in a single transaction.
294+
--- `req` should be a list of statements in the form
295+
--- `{{sql, param1, ...}, {sql, param1, ...}, ...}`. Returns provider
296+
--- result table on success or error on failure.
210297
function client.pipe(db, req)
211298
return moon.call("lua", db, "pipe", 1, json.pq_pipe(req))
212299
end
300+
301+
--- PostgreSQL pipe support, with transaction. Fire-and-forget version of `pipe`.
302+
---@param db integer @ service ID
303+
---@param req table @ {{sql, param1, param2, ...}, {sql, param1, param2, ...}, ...}
304+
function client.execute_pipe(db, req)
305+
moon.send("lua", db, "pipe", 1, json.pq_pipe(req))
306+
end
213307
end
214308

215309
return client

0 commit comments

Comments
 (0)