44
55#include < cstdio> // For FILE, fopen, fread, fwrite, fclose, fseek, ftell, remove, rename
66#include < cstdlib> // For additional standard library functions
7+ #include < cstring> // For strrchr
8+ #include < unistd.h> // For fsync, close
9+ #include < fcntl.h> // For open, O_RDONLY
10+ #include < mutex> // For std::mutex, std::lock_guard
711
812extern " C" {
913#include < turso.h>
@@ -13,11 +17,32 @@ extern "C" {
1317namespace turso
1418{
1519
20+ /* *
21+ * Durable fsync: on Apple, fsync() only flushes to disk cache,
22+ * so we need F_FULLFSYNC for true persistence. On Linux/Android,
23+ * plain fsync() is sufficient.
24+ */
25+ static int durable_fsync (int fd)
26+ {
27+ #ifdef __APPLE__
28+ return fcntl (fd, F_FULLFSYNC);
29+ #else
30+ return fsync (fd);
31+ #endif
32+ }
33+
1634 using namespace facebook ;
1735
1836 // Global base path for database files
1937 static std::string g_basePath;
2038
39+ // Logger callback state — the Rust tracing subscriber may fire from any thread,
40+ // so we copy log data in the C callback and schedule JS execution via CallInvoker.
41+ static jsi::Runtime* g_runtime = nullptr ;
42+ static std::shared_ptr<react::CallInvoker> g_callInvoker;
43+ static std::shared_ptr<jsi::Function> g_loggerFn;
44+ static std::mutex g_loggerMutex;
45+
2146 /* *
2247 * Normalize a database path:
2348 * - If path is absolute (starts with '/'), use as-is
@@ -49,12 +74,79 @@ namespace turso
4974 }
5075 }
5176
77+ /* *
78+ * Map turso_tracing_level_t enum to JS-friendly string.
79+ */
80+ static const char * tracingLevelToString (turso_tracing_level_t level)
81+ {
82+ switch (level)
83+ {
84+ case TURSO_TRACING_LEVEL_ERROR: return " error" ;
85+ case TURSO_TRACING_LEVEL_WARN: return " warn" ;
86+ case TURSO_TRACING_LEVEL_INFO: return " info" ;
87+ case TURSO_TRACING_LEVEL_DEBUG: return " debug" ;
88+ case TURSO_TRACING_LEVEL_TRACE: return " trace" ;
89+ default : return " error" ;
90+ }
91+ }
92+
93+ /* *
94+ * C callback invoked by the Rust tracing subscriber (possibly from any thread).
95+ * Copies all string data synchronously, then schedules a JS call on the JS thread.
96+ */
97+ static void turso_logger_callback (const turso_log_t *log)
98+ {
99+ std::lock_guard<std::mutex> lock (g_loggerMutex);
100+ if (!g_loggerFn || !g_callInvoker || !g_runtime)
101+ {
102+ return ;
103+ }
104+
105+ // Copy all data — the turso_log_t fields are only valid during this callback.
106+ std::string message = log->message ? log->message : " " ;
107+ std::string target = log->target ? log->target : " " ;
108+ std::string file = log->file ? log->file : " " ;
109+ uint64_t timestamp = log->timestamp ;
110+ size_t line = log->line ;
111+ const char * level = tracingLevelToString (log->level );
112+ std::string levelStr (level);
113+
114+ // Prevent captures from preventing cleanup — capture shared_ptr copies
115+ auto callInvoker = g_callInvoker;
116+ auto loggerFn = g_loggerFn;
117+
118+ callInvoker->invokeAsync (
119+ [loggerFn, message = std::move (message), target = std::move (target),
120+ file = std::move (file), timestamp, line, levelStr = std::move (levelStr)]
121+ (jsi::Runtime &rt)
122+ {
123+ try
124+ {
125+ jsi::Object logObj (rt);
126+ logObj.setProperty (rt, " message" , jsi::String::createFromUtf8 (rt, message));
127+ logObj.setProperty (rt, " target" , jsi::String::createFromUtf8 (rt, target));
128+ logObj.setProperty (rt, " file" , jsi::String::createFromUtf8 (rt, file));
129+ logObj.setProperty (rt, " timestamp" , static_cast <double >(timestamp));
130+ logObj.setProperty (rt, " line" , static_cast <double >(line));
131+ logObj.setProperty (rt, " level" , jsi::String::createFromUtf8 (rt, levelStr));
132+
133+ loggerFn->call (rt, logObj);
134+ }
135+ catch (...)
136+ {
137+ // Logger must never crash the app — swallow all exceptions.
138+ }
139+ });
140+ }
141+
52142 void install (
53143 jsi::Runtime &rt,
54144 const std::shared_ptr<react::CallInvoker> &invoker,
55145 const char *basePath)
56146 {
57147 g_basePath = basePath ? basePath : " " ;
148+ g_runtime = &rt;
149+ g_callInvoker = invoker;
58150
59151 // Create the module object
60152 jsi::Object module (rt);
@@ -163,7 +255,7 @@ namespace turso
163255 sync_config.path = normalizedPath.c_str ();
164256
165257 // remoteUrl (optional)
166- static std::string remoteUrl;
258+ std::string remoteUrl;
167259 if (syncConfigObj.hasProperty (rt, " remoteUrl" ))
168260 {
169261 jsi::Value remoteUrlVal = syncConfigObj.getProperty (rt, " remoteUrl" );
@@ -183,7 +275,7 @@ namespace turso
183275 }
184276
185277 // clientName (optional)
186- static std::string clientName;
278+ std::string clientName;
187279 if (syncConfigObj.hasProperty (rt, " clientName" ))
188280 {
189281 jsi::Value clientNameVal = syncConfigObj.getProperty (rt, " clientName" );
@@ -274,7 +366,7 @@ namespace turso
274366 sync_config.partial_bootstrap_strategy_prefix = 0 ;
275367 }
276368
277- static std::string partialBootstrapStrategyQuery;
369+ std::string partialBootstrapStrategyQuery;
278370 if (syncConfigObj.hasProperty (rt, " partialBootstrapStrategyQuery" ))
279371 {
280372 jsi::Value queryVal = syncConfigObj.getProperty (rt, " partialBootstrapStrategyQuery" );
@@ -328,7 +420,7 @@ namespace turso
328420 }
329421
330422 // Remote encryption options
331- static std::string remoteEncryptionKey;
423+ std::string remoteEncryptionKey;
332424 if (syncConfigObj.hasProperty (rt, " remoteEncryptionKey" ))
333425 {
334426 jsi::Value keyVal = syncConfigObj.getProperty (rt, " remoteEncryptionKey" );
@@ -347,7 +439,7 @@ namespace turso
347439 sync_config.remote_encryption_key = nullptr ;
348440 }
349441
350- static std::string remoteEncryptionCipher;
442+ std::string remoteEncryptionCipher;
351443 if (syncConfigObj.hasProperty (rt, " remoteEncryptionCipher" ))
352444 {
353445 jsi::Value cipherVal = syncConfigObj.getProperty (rt, " remoteEncryptionCipher" );
@@ -409,8 +501,7 @@ namespace turso
409501
410502 jsi::Object options = args[0 ].asObject (rt);
411503
412- // Store log level in a static variable to ensure lifetime
413- static std::string logLevelStr;
504+ std::string logLevelStr;
414505
415506 // Get log level if provided
416507 if (options.hasProperty (rt, " logLevel" ))
@@ -424,6 +515,21 @@ namespace turso
424515
425516 turso_config_t config = {nullptr , logLevelStr.empty () ? nullptr : logLevelStr.c_str ()};
426517
518+ // Wire up logger callback if provided
519+ if (options.hasProperty (rt, " logger" ))
520+ {
521+ jsi::Value loggerVal = options.getProperty (rt, " logger" );
522+ if (loggerVal.isObject () && loggerVal.asObject (rt).isFunction (rt))
523+ {
524+ {
525+ std::lock_guard<std::mutex> lock (g_loggerMutex);
526+ g_loggerFn = std::make_shared<jsi::Function>(
527+ loggerVal.asObject (rt).asFunction (rt));
528+ }
529+ config.logger = turso_logger_callback;
530+ }
531+ }
532+
427533 // Call turso_setup
428534 const char *error = nullptr ;
429535 turso_status_code_t status = turso_setup (&config, &error);
@@ -504,33 +610,35 @@ namespace turso
504610 std::string path = args[0 ].asString (rt).utf8 (rt);
505611 jsi::ArrayBuffer buffer = args[1 ].asObject (rt).getArrayBuffer (rt);
506612
507- // Write atomically using temporary file + rename
613+ // Write atomically: write to temp, fsync, rename, fsync dir
508614 std::string tempPath = path + " .tmp" ;
509615
510- // Open temp file for writing
511616 FILE* file = fopen (tempPath.c_str (), " wb" );
512617 if (!file)
513618 {
514619 throw jsi::JSError (rt, " Failed to open file for writing" );
515620 }
516621
517- // Write data
518622 size_t size = buffer.size (rt);
519623 if (size > 0 )
520624 {
521625 size_t written = fwrite (buffer.data (rt), 1 , size, file);
522- fclose (file);
523-
524626 if (written != size)
525627 {
628+ fclose (file);
526629 remove (tempPath.c_str ());
527630 throw jsi::JSError (rt, " Failed to write complete file" );
528631 }
529632 }
530- else
633+
634+ // Flush to OS and sync to disk before rename
635+ if (fflush (file) != 0 || durable_fsync (fileno (file)) != 0 )
531636 {
532637 fclose (file);
638+ remove (tempPath.c_str ());
639+ throw jsi::JSError (rt, " Failed to sync file to disk" );
533640 }
641+ fclose (file);
534642
535643 // Atomic rename (replaces old file)
536644 if (rename (tempPath.c_str (), path.c_str ()) != 0 )
@@ -539,6 +647,20 @@ namespace turso
539647 throw jsi::JSError (rt, " Failed to rename temp file" );
540648 }
541649
650+ // Fsync parent directory to ensure rename is durable
651+ std::string dirPath = path;
652+ auto lastSlash = dirPath.rfind (' /' );
653+ if (lastSlash != std::string::npos)
654+ {
655+ dirPath.resize (lastSlash);
656+ int dirFd = open (dirPath.c_str (), O_RDONLY);
657+ if (dirFd >= 0 )
658+ {
659+ durable_fsync (dirFd);
660+ close (dirFd);
661+ }
662+ }
663+
542664 return jsi::Value::undefined ();
543665 });
544666
@@ -555,7 +677,10 @@ namespace turso
555677
556678 void invalidate ()
557679 {
558- // Cleanup if needed
680+ std::lock_guard<std::mutex> lock (g_loggerMutex);
681+ g_loggerFn.reset ();
682+ g_callInvoker.reset ();
683+ g_runtime = nullptr ;
559684 }
560685
561686} // namespace turso
0 commit comments