Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ffi.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import std/atomics, chronos
import
ffi/internal/[ffi_library, ffi_macro],
ffi/[alloc, ffi_types, ffi_context, ffi_thread_request]

export alloc, ffi_library, ffi_macro, ffi_types, ffi_context, ffi_thread_request
export atomics, chronos
export
atomics, alloc, ffi_library, ffi_macro, ffi_types, ffi_context, ffi_thread_request
44 changes: 24 additions & 20 deletions ffi/ffi_context.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

import std/[options, atomics, os, net, locks, json]
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro
import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro, ./logging

type FFIContext* = object
ffiThread: Thread[(ptr FFIContext)]
type FFIContext*[T] = object
myLib*: T
# main library object (e.g., Waku, LibP2P, SDS, the one to be exposed as a library)
ffiThread: Thread[(ptr FFIContext[T])]
# represents the main FFI thread in charge of attending API consumer actions
watchdogThread: Thread[(ptr FFIContext)]
watchdogThread: Thread[(ptr FFIContext[T])]
# monitors the FFI thread and notifies the FFI API consumer if it hangs
lock: Lock
reqChannel: ChannelSPSCSingle[ptr FFIThreadRequest]
Expand All @@ -21,6 +23,7 @@ type FFIContext* = object
eventUserdata*: pointer
running: Atomic[bool] # To control when the threads are running
registeredRequests: ptr Table[cstring, FFIRequestProc]
# Pointer to with the registered requests at compile time

const git_version* {.strdefine.} = "n/a"

Expand Down Expand Up @@ -80,18 +83,18 @@ registerReqFFI(WatchdogReq, foo: ptr Foo):
proc(): Future[Result[string, string]] {.async.} =
return ok("waku thread is not blocked")

type JsonWakuNotRespondingEvent = object
type JsonNotRespondingEvent = object
eventType: string

proc init(T: type JsonWakuNotRespondingEvent): T =
return JsonWakuNotRespondingEvent(eventType: "not_responding")
proc init(T: type JsonNotRespondingEvent): T =
return JsonNotRespondingEvent(eventType: "not_responding")

proc `$`(event: JsonWakuNotRespondingEvent): string =
proc `$`(event: JsonNotRespondingEvent): string =
$(%*event)

proc onWakuNotResponding*(ctx: ptr FFIContext) =
proc onNotResponding*(ctx: ptr FFIContext) =
callEventCallback(ctx, "onWakuNotResponsive"):
$JsonWakuNotRespondingEvent.init()
$JsonNotRespondingEvent.init()

proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
## Watchdog thread that monitors the Waku thread and notifies the library user if it hangs.
Expand Down Expand Up @@ -120,15 +123,16 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =

sendRequestToFFIThread(ctx, WatchdogReq.ffiNewReq(wakuCallback, nilUserData)).isOkOr:
error "Failed to send watchdog request to FFI thread", error = $error
onWakuNotResponding(ctx)
onNotResponding(ctx)

waitFor watchdogRun(ctx)

proc ffiThreadBody[TT](ctx: ptr FFIContext) {.thread.} =
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
## FFI thread that attends library user API requests

let ffiRun = proc(ctx: ptr FFIContext) {.async.} =
var ffiHandler: TT
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)

let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
while true:
await ctx.reqSignal.wait()

Expand All @@ -144,7 +148,7 @@ proc ffiThreadBody[TT](ctx: ptr FFIContext) {.thread.} =

## Handle the request
asyncSpawn FFIThreadRequest.process(
request, addr ffiHandler, ctx.registeredRequests
request, addr ctx.myLib, ctx.registeredRequests
)

let fireRes = ctx.reqReceivedSignal.fireSync()
Expand All @@ -153,21 +157,21 @@ proc ffiThreadBody[TT](ctx: ptr FFIContext) {.thread.} =

waitFor ffiRun(ctx)

proc createFFIContext*[T](tt: typedesc[T]): Result[ptr FFIContext, string] =
proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
## This proc is called from the main thread and it creates
## the FFI working thread.
var ctx = createShared(FFIContext, 1)
var ctx = createShared(FFIContext[T], 1)
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr")
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
ctx.lock.initLock()
ctx.registeredRequests = addr ffi_macro.registeredRequests
ctx.registeredRequests = addr ffi_types.registeredRequests

ctx.running.store(true)

try:
createThread(ctx.ffiThread, ffiThreadBody[tt], ctx)
createThread(ctx.ffiThread, ffiThreadBody[T], ctx)
except ValueError, ResourceExhaustedError:
freeShared(ctx)
return err("failed to create the Waku thread: " & getCurrentExceptionMsg())
Expand All @@ -180,7 +184,7 @@ proc createFFIContext*[T](tt: typedesc[T]): Result[ptr FFIContext, string] =

return ok(ctx)

proc destroyFFIContext*(ctx: ptr FFIContext): Result[void, string] =
proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] =
ctx.running.store(false)

let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
Expand Down
4 changes: 4 additions & 0 deletions ffi/ffi_types.nim
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import std/tables
import chronos

################################################################################
Expand Down Expand Up @@ -31,5 +32,8 @@ template foreignThreadGc*(body: untyped) =

type onDone* = proc()

## Registered requests table populated at compile time
var registeredRequests* {.threadvar.}: Table[cstring, FFIRequestProc]

### End of FFI utils
################################################################################
16 changes: 1 addition & 15 deletions ffi/internal/ffi_library.nim
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ macro declareLibrary*(libraryName: static[string]): untyped =
let nimMainName = ident("lib" & libraryName & "NimMain")

let initializeLibraryProc = quote:
proc `procName`() {.exported.} =
proc `procName`*() {.exported.} =
if not initialized.exchange(true):
## Every Nim library needs to call `<yourprefix>NimMain` once exactly,
## to initialize the Nim runtime.
Expand All @@ -78,18 +78,4 @@ macro declareLibrary*(libraryName: static[string]): untyped =

res.add(initializeLibraryProc)

## Generate the exported C-callable callback setter
let setCallbackProc = quote:
proc set_event_callback(
ctx: ptr FFIContext, callback: FFICallBack, userData: pointer
) {.dynlib, exportc.} =
initializeLibrary()
ctx[].eventCallback = cast[pointer](callback)
ctx[].eventUserData = userData

res.add(setCallbackProc)

# echo result.repr
return res


109 changes: 89 additions & 20 deletions ffi/internal/ffi_macro.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ import std/[macros, tables]
import chronos
import ../ffi_types

var registeredRequests* {.threadvar.}: Table[cstring, FFIRequestProc]

proc extractFieldsFromLambda(body: NimNode): seq[NimNode] =
## Extracts the fields (params) from the given lambda body.
var procNode = body
if procNode.kind == nnkStmtList and procNode.len == 1:
procNode = procNode[0]
if procNode.kind != nnkLambda:
if procNode.kind != nnkLambda and procNode.kind != nnkProcDef:
error "registerReqFFI expects a lambda proc, found: " & $procNode.kind

let params = procNode[3] # parameters list
Expand All @@ -26,7 +24,7 @@ proc buildRequestType(reqTypeName: NimNode, body: NimNode): NimNode =
var procNode = body
if procNode.kind == nnkStmtList and procNode.len == 1:
procNode = procNode[0]
if procNode.kind != nnkLambda:
if procNode.kind != nnkLambda and procNode.kind != nnkProcDef:
error "registerReqFFI expects a lambda proc, found: " & $procNode.kind

let params = procNode[3] # formal params of the lambda
Expand Down Expand Up @@ -62,7 +60,7 @@ proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode =
else:
procNode = body

if procNode.kind != nnkLambda:
if procNode.kind != nnkLambda and procNode.kind != nnkProcDef:
error "registerReqFFI expects a lambda definition. Found: " & $procNode.kind

# T: typedesc[CreateNodeRequest]
Expand Down Expand Up @@ -188,7 +186,7 @@ proc buildProcessFFIRequestProc(reqTypeName, reqHandler, body: NimNode): NimNode
var procNode = body
if procNode.kind == nnkStmtList and procNode.len == 1:
procNode = procNode[0]
if procNode.kind != nnkLambda:
if procNode.kind != nnkLambda and procNode.kind != nnkProcDef:
error "registerReqFFI expects a lambda definition. Found: " & $procNode.kind

let typedescParam =
Expand Down Expand Up @@ -317,31 +315,102 @@ macro registerReqFFI*(reqTypeName, reqHandler, body: untyped): untyped =
let deleteProc = buildFfiDeleteReqProc(reqTypeName, fields)
result = newStmtList(typeDef, ffiNewReqProc, deleteProc, processProc, addNewReqToReg)

# echo "Registered FFI request: " & result.repr

macro processReq*(reqType: typed, args: varargs[untyped]): untyped =
## Expands T.processReq(a,b,...) into the sendRequest boilerplate.

# Collect the passed arguments as NimNodes
var callArgs = @[reqType, ident("callback"), ident("userData")]
macro processReq*(
reqType, ctx, callback, userData: untyped, args: varargs[untyped]
): untyped =
## Expands T.processReq(ctx, callback, userData, a, b, ...)
var callArgs = @[reqType, callback, userData]
for a in args:
callArgs.add a

# Build: ffiNewReq(reqType, callback, userData, arg1, arg2, ...)
let newReqCall = newCall(ident("ffiNewReq"), callArgs)

# Build: ffi_context.sendRequestToFFIThread(ctx, <newReqCall>)
let sendCall = newCall(
newDotExpr(ident("ffi_context"), ident("sendRequestToFFIThread")),
ident("ctx"),
newReqCall,
newDotExpr(ident("ffi_context"), ident("sendRequestToFFIThread")), ctx, newReqCall
)

result = quote:
block:
let res = `sendCall`
if res.isErr:
if res.isErr():
let msg = "error in sendRequestToFFIThread: " & res.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData)
`callback`(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), `userData`)
return RET_ERR
return RET_OK

macro ffi*(prc: untyped): untyped =
let procName = prc[0]
let formalParams = prc[3]
let bodyNode = prc[^1]

if formalParams.len < 2:
error("`.ffi.` procs require at least 1 parameter")

let firstParam = formalParams[1]
let paramIdent = firstParam[0]
let paramType = firstParam[1]

let reqName = ident($procName & "Req")
let returnType = ident("cint")

# Build parameter list (skip return type)
var newParams = newSeq[NimNode]()
newParams.add(returnType)
for i in 1 ..< formalParams.len:
newParams.add(newIdentDefs(formalParams[i][0], formalParams[i][1]))

# Build Future[Result[string, string]] return type
let futReturnType = quote:
Future[Result[string, string]]

var userParams = newSeq[NimNode]()
userParams.add(futReturnType)
if formalParams.len > 3:
for i in 4 ..< formalParams.len:
userParams.add(newIdentDefs(formalParams[i][0], formalParams[i][1]))

# Build argument list for processReq
var argsList = newSeq[NimNode]()
for i in 1 ..< formalParams.len:
argsList.add(formalParams[i][0])

# 1. Build the dot expression. e.g.: waku_is_onlineReq.processReq
let dotExpr = newTree(nnkDotExpr, reqName, ident"processReq")

# 2. Build the call node with dotExpr as callee
let callNode = newTree(nnkCall, dotExpr)
for arg in argsList:
callNode.add(arg)

# Proc body
let ffiBody = newStmtList(
quote do:
initializeLibrary()
if not isNil(ctx):
ctx[].userData = userData
if isNil(callback):
return RET_MISSING_CALLBACK
)

ffiBody.add(callNode)

let ffiProc = newProc(
name = procName,
params = newParams,
body = ffiBody,
pragmas = newTree(nnkPragma, ident "dynlib", ident "exportc", ident "cdecl"),
)

var anonymousProcNode = newProc(
name = newEmptyNode(), # anonymous proc
params = userParams,
body = newStmtList(bodyNode),
pragmas = newTree(nnkPragma, ident"async"),
)

# registerReqFFI wrapper
let registerReq = quote:
registerReqFFI(`reqName`, `paramIdent`: `paramType`):
`anonymousProcNode`

result = newStmtList(registerReq, ffiProc)
Loading