diff --git a/ffi.nim b/ffi.nim index ce1c7ec..ed14bad 100644 --- a/ffi.nim +++ b/ffi.nim @@ -1,5 +1,8 @@ +import std/atomics, chronos import ffi/internal/[ffi_library, ffi_macro], ffi/[alloc, ffi_types, ffi_context, ffi_thread_request] -export alloc, ffi_library, ffi_macro, ffi_types, ffi_context, ffi_thread_request +export atomics, chronos +export + atomics, alloc, ffi_library, ffi_macro, ffi_types, ffi_context, ffi_thread_request diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 0acb17d..491f135 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -4,12 +4,14 @@ import std/[options, atomics, os, net, locks, json] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results -import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro +import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro, ./logging -type FFIContext* = object - ffiThread: Thread[(ptr FFIContext)] +type FFIContext*[T] = object + myLib*: T + # main library object (e.g., Waku, LibP2P, SDS, the one to be exposed as a library) + ffiThread: Thread[(ptr FFIContext[T])] # represents the main FFI thread in charge of attending API consumer actions - watchdogThread: Thread[(ptr FFIContext)] + watchdogThread: Thread[(ptr FFIContext[T])] # monitors the FFI thread and notifies the FFI API consumer if it hangs lock: Lock reqChannel: ChannelSPSCSingle[ptr FFIThreadRequest] @@ -21,6 +23,7 @@ type FFIContext* = object eventUserdata*: pointer running: Atomic[bool] # To control when the threads are running registeredRequests: ptr Table[cstring, FFIRequestProc] + # Pointer to with the registered requests at compile time const git_version* {.strdefine.} = "n/a" @@ -80,18 +83,18 @@ registerReqFFI(WatchdogReq, foo: ptr Foo): proc(): Future[Result[string, string]] {.async.} = return ok("waku thread is not blocked") -type JsonWakuNotRespondingEvent = object +type JsonNotRespondingEvent = object eventType: string -proc init(T: type JsonWakuNotRespondingEvent): T = - return JsonWakuNotRespondingEvent(eventType: "not_responding") +proc init(T: type JsonNotRespondingEvent): T = + return JsonNotRespondingEvent(eventType: "not_responding") -proc `$`(event: JsonWakuNotRespondingEvent): string = +proc `$`(event: JsonNotRespondingEvent): string = $(%*event) -proc onWakuNotResponding*(ctx: ptr FFIContext) = +proc onNotResponding*(ctx: ptr FFIContext) = callEventCallback(ctx, "onWakuNotResponsive"): - $JsonWakuNotRespondingEvent.init() + $JsonNotRespondingEvent.init() proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = ## Watchdog thread that monitors the Waku thread and notifies the library user if it hangs. @@ -120,15 +123,16 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = sendRequestToFFIThread(ctx, WatchdogReq.ffiNewReq(wakuCallback, nilUserData)).isOkOr: error "Failed to send watchdog request to FFI thread", error = $error - onWakuNotResponding(ctx) + onNotResponding(ctx) waitFor watchdogRun(ctx) -proc ffiThreadBody[TT](ctx: ptr FFIContext) {.thread.} = +proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = ## FFI thread that attends library user API requests - let ffiRun = proc(ctx: ptr FFIContext) {.async.} = - var ffiHandler: TT + logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) + + let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = while true: await ctx.reqSignal.wait() @@ -144,7 +148,7 @@ proc ffiThreadBody[TT](ctx: ptr FFIContext) {.thread.} = ## Handle the request asyncSpawn FFIThreadRequest.process( - request, addr ffiHandler, ctx.registeredRequests + request, addr ctx.myLib, ctx.registeredRequests ) let fireRes = ctx.reqReceivedSignal.fireSync() @@ -153,21 +157,21 @@ proc ffiThreadBody[TT](ctx: ptr FFIContext) {.thread.} = waitFor ffiRun(ctx) -proc createFFIContext*[T](tt: typedesc[T]): Result[ptr FFIContext, string] = +proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = ## This proc is called from the main thread and it creates ## the FFI working thread. - var ctx = createShared(FFIContext, 1) + var ctx = createShared(FFIContext[T], 1) ctx.reqSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create reqSignal ThreadSignalPtr") ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create reqReceivedSignal ThreadSignalPtr") ctx.lock.initLock() - ctx.registeredRequests = addr ffi_macro.registeredRequests + ctx.registeredRequests = addr ffi_types.registeredRequests ctx.running.store(true) try: - createThread(ctx.ffiThread, ffiThreadBody[tt], ctx) + createThread(ctx.ffiThread, ffiThreadBody[T], ctx) except ValueError, ResourceExhaustedError: freeShared(ctx) return err("failed to create the Waku thread: " & getCurrentExceptionMsg()) @@ -180,7 +184,7 @@ proc createFFIContext*[T](tt: typedesc[T]): Result[ptr FFIContext, string] = return ok(ctx) -proc destroyFFIContext*(ctx: ptr FFIContext): Result[void, string] = +proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.running.store(false) let signaledOnTime = ctx.reqSignal.fireSync().valueOr: diff --git a/ffi/ffi_types.nim b/ffi/ffi_types.nim index 9bd91a7..f62a428 100644 --- a/ffi/ffi_types.nim +++ b/ffi/ffi_types.nim @@ -1,3 +1,4 @@ +import std/tables import chronos ################################################################################ @@ -31,5 +32,8 @@ template foreignThreadGc*(body: untyped) = type onDone* = proc() +## Registered requests table populated at compile time +var registeredRequests* {.threadvar.}: Table[cstring, FFIRequestProc] + ### End of FFI utils ################################################################################ diff --git a/ffi/internal/ffi_library.nim b/ffi/internal/ffi_library.nim index 9df1cc3..3f1b757 100644 --- a/ffi/internal/ffi_library.nim +++ b/ffi/internal/ffi_library.nim @@ -62,7 +62,7 @@ macro declareLibrary*(libraryName: static[string]): untyped = let nimMainName = ident("lib" & libraryName & "NimMain") let initializeLibraryProc = quote: - proc `procName`() {.exported.} = + proc `procName`*() {.exported.} = if not initialized.exchange(true): ## Every Nim library needs to call `NimMain` once exactly, ## to initialize the Nim runtime. @@ -78,18 +78,4 @@ macro declareLibrary*(libraryName: static[string]): untyped = res.add(initializeLibraryProc) - ## Generate the exported C-callable callback setter - let setCallbackProc = quote: - proc set_event_callback( - ctx: ptr FFIContext, callback: FFICallBack, userData: pointer - ) {.dynlib, exportc.} = - initializeLibrary() - ctx[].eventCallback = cast[pointer](callback) - ctx[].eventUserData = userData - - res.add(setCallbackProc) - - # echo result.repr return res - - diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index 460c4f3..88fc75b 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -2,14 +2,12 @@ import std/[macros, tables] import chronos import ../ffi_types -var registeredRequests* {.threadvar.}: Table[cstring, FFIRequestProc] - proc extractFieldsFromLambda(body: NimNode): seq[NimNode] = ## Extracts the fields (params) from the given lambda body. var procNode = body if procNode.kind == nnkStmtList and procNode.len == 1: procNode = procNode[0] - if procNode.kind != nnkLambda: + if procNode.kind != nnkLambda and procNode.kind != nnkProcDef: error "registerReqFFI expects a lambda proc, found: " & $procNode.kind let params = procNode[3] # parameters list @@ -26,7 +24,7 @@ proc buildRequestType(reqTypeName: NimNode, body: NimNode): NimNode = var procNode = body if procNode.kind == nnkStmtList and procNode.len == 1: procNode = procNode[0] - if procNode.kind != nnkLambda: + if procNode.kind != nnkLambda and procNode.kind != nnkProcDef: error "registerReqFFI expects a lambda proc, found: " & $procNode.kind let params = procNode[3] # formal params of the lambda @@ -62,7 +60,7 @@ proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode = else: procNode = body - if procNode.kind != nnkLambda: + if procNode.kind != nnkLambda and procNode.kind != nnkProcDef: error "registerReqFFI expects a lambda definition. Found: " & $procNode.kind # T: typedesc[CreateNodeRequest] @@ -188,7 +186,7 @@ proc buildProcessFFIRequestProc(reqTypeName, reqHandler, body: NimNode): NimNode var procNode = body if procNode.kind == nnkStmtList and procNode.len == 1: procNode = procNode[0] - if procNode.kind != nnkLambda: + if procNode.kind != nnkLambda and procNode.kind != nnkProcDef: error "registerReqFFI expects a lambda definition. Found: " & $procNode.kind let typedescParam = @@ -317,31 +315,102 @@ macro registerReqFFI*(reqTypeName, reqHandler, body: untyped): untyped = let deleteProc = buildFfiDeleteReqProc(reqTypeName, fields) result = newStmtList(typeDef, ffiNewReqProc, deleteProc, processProc, addNewReqToReg) - # echo "Registered FFI request: " & result.repr - -macro processReq*(reqType: typed, args: varargs[untyped]): untyped = - ## Expands T.processReq(a,b,...) into the sendRequest boilerplate. - - # Collect the passed arguments as NimNodes - var callArgs = @[reqType, ident("callback"), ident("userData")] +macro processReq*( + reqType, ctx, callback, userData: untyped, args: varargs[untyped] +): untyped = + ## Expands T.processReq(ctx, callback, userData, a, b, ...) + var callArgs = @[reqType, callback, userData] for a in args: callArgs.add a - # Build: ffiNewReq(reqType, callback, userData, arg1, arg2, ...) let newReqCall = newCall(ident("ffiNewReq"), callArgs) - # Build: ffi_context.sendRequestToFFIThread(ctx, ) let sendCall = newCall( - newDotExpr(ident("ffi_context"), ident("sendRequestToFFIThread")), - ident("ctx"), - newReqCall, + newDotExpr(ident("ffi_context"), ident("sendRequestToFFIThread")), ctx, newReqCall ) result = quote: block: let res = `sendCall` - if res.isErr: + if res.isErr(): let msg = "error in sendRequestToFFIThread: " & res.error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) + `callback`(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), `userData`) return RET_ERR return RET_OK + +macro ffi*(prc: untyped): untyped = + let procName = prc[0] + let formalParams = prc[3] + let bodyNode = prc[^1] + + if formalParams.len < 2: + error("`.ffi.` procs require at least 1 parameter") + + let firstParam = formalParams[1] + let paramIdent = firstParam[0] + let paramType = firstParam[1] + + let reqName = ident($procName & "Req") + let returnType = ident("cint") + + # Build parameter list (skip return type) + var newParams = newSeq[NimNode]() + newParams.add(returnType) + for i in 1 ..< formalParams.len: + newParams.add(newIdentDefs(formalParams[i][0], formalParams[i][1])) + + # Build Future[Result[string, string]] return type + let futReturnType = quote: + Future[Result[string, string]] + + var userParams = newSeq[NimNode]() + userParams.add(futReturnType) + if formalParams.len > 3: + for i in 4 ..< formalParams.len: + userParams.add(newIdentDefs(formalParams[i][0], formalParams[i][1])) + + # Build argument list for processReq + var argsList = newSeq[NimNode]() + for i in 1 ..< formalParams.len: + argsList.add(formalParams[i][0]) + + # 1. Build the dot expression. e.g.: waku_is_onlineReq.processReq + let dotExpr = newTree(nnkDotExpr, reqName, ident"processReq") + + # 2. Build the call node with dotExpr as callee + let callNode = newTree(nnkCall, dotExpr) + for arg in argsList: + callNode.add(arg) + + # Proc body + let ffiBody = newStmtList( + quote do: + initializeLibrary() + if not isNil(ctx): + ctx[].userData = userData + if isNil(callback): + return RET_MISSING_CALLBACK + ) + + ffiBody.add(callNode) + + let ffiProc = newProc( + name = procName, + params = newParams, + body = ffiBody, + pragmas = newTree(nnkPragma, ident "dynlib", ident "exportc", ident "cdecl"), + ) + + var anonymousProcNode = newProc( + name = newEmptyNode(), # anonymous proc + params = userParams, + body = newStmtList(bodyNode), + pragmas = newTree(nnkPragma, ident"async"), + ) + + # registerReqFFI wrapper + let registerReq = quote: + registerReqFFI(`reqName`, `paramIdent`: `paramType`): + `anonymousProcNode` + + result = newStmtList(registerReq, ffiProc) diff --git a/ffi/logging.nim b/ffi/logging.nim new file mode 100644 index 0000000..b82ec11 --- /dev/null +++ b/ffi/logging.nim @@ -0,0 +1,106 @@ +## This code has been copied and addapted from `status-im/nimbu-eth2` project. +## Link: https://github.com/status-im/nimbus-eth2/blob/c585b0a5b1ae4d55af38ad7f4715ad455e791552/beacon_chain/nimbus_binary_common.nim +## This is also copied in logos-messaging-nim repository (2025-12-10) +import + std/[typetraits, os, strutils, syncio], + chronicles, + chronicles/log_output, + chronicles/topics_registry + +export chronicles.LogLevel + +{.push raises: [].} + +type LogFormat* = enum + TEXT + JSON + +## Utils + +proc stripAnsi(v: string): string = + ## Copied from: https://github.com/status-im/nimbus-eth2/blob/stable/beacon_chain/nimbus_binary_common.nim#L41 + ## Silly chronicles, colors is a compile-time property + var + res = newStringOfCap(v.len) + i: int + + while i < v.len: + let c = v[i] + if c == '\x1b': + var + x = i + 1 + found = false + + while x < v.len: # look for [..m + let c2 = v[x] + if x == i + 1: + if c2 != '[': + break + else: + if c2 in {'0' .. '9'} + {';'}: + discard # keep looking + elif c2 == 'm': + i = x + 1 + found = true + break + else: + break + inc x + + if found: # skip adding c + continue + res.add c + inc i + + res + +proc writeAndFlush(f: syncio.File, s: LogOutputStr) = + try: + f.write(s) + f.flushFile() + except CatchableError: + logLoggingFailure(cstring(s), getCurrentException()) + +## Setup + +proc setupLogLevel(level: LogLevel) = + # TODO: Support per topic level configuratio + topics_registry.setLogLevel(level) + +proc setupLogFormat(format: LogFormat, color = true) = + proc noOutputWriter(logLevel: LogLevel, msg: LogOutputStr) = + discard + + proc stdoutOutputWriter(logLevel: LogLevel, msg: LogOutputStr) = + writeAndFlush(syncio.stdout, msg) + + proc stdoutNoColorOutputWriter(logLevel: LogLevel, msg: LogOutputStr) = + writeAndFlush(syncio.stdout, stripAnsi(msg)) + + when defaultChroniclesStream.outputs.type.arity == 2: + case format + of LogFormat.Text: + defaultChroniclesStream.outputs[0].writer = + if color: stdoutOutputWriter else: stdoutNoColorOutputWriter + defaultChroniclesStream.outputs[1].writer = noOutputWriter + of LogFormat.Json: + defaultChroniclesStream.outputs[0].writer = noOutputWriter + defaultChroniclesStream.outputs[1].writer = stdoutOutputWriter + else: + {. + warning: + "the present module should be compiled with '-d:chronicles_default_output_device=dynamic' " & + "and '-d:chronicles_sinks=\"textlines,json\"' options" + .} + +proc setupLog*(level: LogLevel, format: LogFormat) = + ## Logging setup + # Adhere to NO_COLOR initiative: https://no-color.org/ + let color = + try: + not parseBool(os.getEnv("NO_COLOR", "false")) + except CatchableError: + true + + setupLogLevel(level) + setupLogFormat(format, color)