@@ -27,17 +27,24 @@ enum DatabaseError: Error {
27
27
}
28
28
}
29
29
30
+
31
+ func subscription_cb( ctx: UnsafeMutableRawPointer ? , subid: UInt64 ) -> Void {
32
+ guard let ctx else { return }
33
+ let ndb = Unmanaged < Ndb > . fromOpaque ( ctx) . takeUnretainedValue ( )
34
+ ndb. sub_cb ? ( subid)
35
+ }
36
+
30
37
class Ndb {
31
38
var ndb : ndb_t
32
39
let path : String ?
33
40
let owns_db : Bool
34
41
var generation : Int
42
+ let sub_cb : ( ( UInt64 ) -> ( ) ) ?
35
43
private var closed : Bool
36
44
37
45
var is_closed : Bool {
38
46
self . closed || self . ndb. ndb == nil
39
47
}
40
-
41
48
static func safemode( ) -> Ndb ? {
42
49
guard let path = db_path ?? old_db_path else { return nil }
43
50
@@ -49,7 +56,8 @@ class Ndb {
49
56
}
50
57
}
51
58
52
- guard let ndb = Ndb ( path: path) else {
59
+ let ndb = Ndb ( path: path)
60
+ guard let _ = ndb. open ( ) else {
53
61
return nil
54
62
}
55
63
@@ -79,14 +87,14 @@ class Ndb {
79
87
print ( " txn: NOSTRDB EMPTY " )
80
88
return Ndb ( ndb: ndb_t ( ndb: nil ) )
81
89
}
82
-
83
- static func open( path : String ? = nil , owns_db_file : Bool = true ) -> ndb_t ? {
90
+
91
+ func open( ) -> ndb_t ? {
84
92
var ndb_p : OpaquePointer ? = nil
85
93
86
94
let ingest_threads : Int32 = 4
87
95
var mapsize : Int = 1024 * 1024 * 1024 * 32
88
96
89
- if path == nil && owns_db_file {
97
+ if path == nil && owns_db {
90
98
// `nil` path indicates the default path will be used.
91
99
// The default path changed over time, so migrate the database to the new location if needed
92
100
do {
@@ -99,7 +107,7 @@ class Ndb {
99
107
}
100
108
101
109
guard let db_path = Self . db_path,
102
- owns_db_file || Self . db_files_exist ( path: db_path) else {
110
+ owns_db || Self . db_files_exist ( path: db_path) else {
103
111
return nil // If the caller claims to not own the DB file, and the DB files do not exist, then we should not initialize Ndb
104
112
}
105
113
@@ -109,8 +117,9 @@ class Ndb {
109
117
110
118
let ok = path. withCString { testdir in
111
119
var ok = false
120
+ let ctx = Unmanaged . passUnretained ( self ) . toOpaque ( )
112
121
while !ok && mapsize > 1024 * 1024 * 700 {
113
- var cfg = ndb_config ( flags: 0 , ingester_threads: ingest_threads, mapsize: mapsize, filter_context: nil , ingest_filter: nil , sub_cb_ctx: nil , sub_cb: nil )
122
+ var cfg = ndb_config ( flags: 0 , ingester_threads: ingest_threads, mapsize: mapsize, filter_context: nil , ingest_filter: nil , sub_cb_ctx: ctx , sub_cb: subscription_cb )
114
123
ok = ndb_init ( & ndb_p, testdir, & cfg) != 0
115
124
if !ok {
116
125
mapsize /= 2
@@ -123,19 +132,17 @@ class Ndb {
123
132
return nil
124
133
}
125
134
126
- return ndb_t ( ndb: ndb_p)
135
+ self . ndb = ndb_t ( ndb: ndb_p)
136
+ return self . ndb
127
137
}
128
138
129
- init ? ( path: String ? = nil , owns_db_file: Bool = true ) {
130
- guard let db = Self . open ( path: path, owns_db_file: owns_db_file) else {
131
- return nil
132
- }
133
-
139
+ init ( path: String ? = nil , owns_db_file: Bool = true , sub_cb: ( ( UInt64 ) -> ( ) ) ? = nil ) {
134
140
self . generation = 0
135
141
self . path = path
136
142
self . owns_db = owns_db_file
137
- self . ndb = db
138
143
self . closed = false
144
+ self . sub_cb = sub_cb
145
+ self . ndb = ndb_t ( )
139
146
}
140
147
141
148
private static func migrate_db_location_if_needed( ) throws {
@@ -181,6 +188,7 @@ class Ndb {
181
188
self . path = nil
182
189
self . owns_db = true
183
190
self . closed = false
191
+ self . sub_cb = nil
184
192
}
185
193
186
194
func close( ) {
@@ -193,8 +201,8 @@ class Ndb {
193
201
}
194
202
195
203
func reopen( ) -> Bool {
196
- guard self . is_closed ,
197
- let db = Self . open ( path : self . path , owns_db_file : self . owns_db ) else {
204
+ let ctx = Unmanaged . passUnretained ( self ) . toOpaque ( )
205
+ guard self . is_closed , let db = self . open ( ) else {
198
206
return false
199
207
}
200
208
@@ -205,6 +213,20 @@ class Ndb {
205
213
return true
206
214
}
207
215
216
+ func poll_for_notes( subid: Int64 , capacity: Int ) -> [ NoteKey ] {
217
+ var buf = Array< UInt64> . init( repeating: 0 , count: capacity)
218
+
219
+ let r = buf. withUnsafeMutableBufferPointer { bytes in
220
+ return ndb_poll_for_notes ( self . ndb. ndb, UInt64 ( subid) , bytes. baseAddress, Int32 ( capacity) )
221
+ }
222
+
223
+ guard r != 0 else {
224
+ return [ ]
225
+ }
226
+
227
+ return Array ( buf. prefix ( Int ( r) ) )
228
+ }
229
+
208
230
func lookup_blocks_by_key_with_txn< Y> ( _ key: NoteKey , txn: NdbTxn < Y > ) -> NdbBlocks ? {
209
231
guard let blocks = ndb_get_blocks_by_key ( self . ndb. ndb, & txn. txn, key) else {
210
232
return nil
0 commit comments