55 * Proprietary and confidential
66 */
77
8+ const concurrentSafeWrite = require ( '../sqliteUtils/concurrentSafeWrite' ) ;
89const SQLite3 = require ( 'better-sqlite3' ) ;
910const { Readable } = require ( 'stream' ) ;
1011
@@ -22,8 +23,6 @@ const tables = {
2223
2324const ALL_EVENTS_TAG = events . ALL_EVENTS_TAG ;
2425
25- const WAIT_LIST_MS = [ 1 , 2 , 5 , 10 , 15 , 20 , 25 , 25 , 25 , 50 , 50 , 100 ] ;
26-
2726/**
2827 * TODO: refactor the structure of tables and queries
2928 * (currently not consistent, either internally or with the Mongo code)
@@ -54,10 +53,7 @@ class UserDatabase {
5453 }
5554
5655 async init ( ) {
57- this . db . pragma ( 'journal_mode = WAL' ) ;
58- this . db . pragma ( 'busy_timeout = 0' ) ; // We take care of busy timeout ourselves as long as current driver does not go below the second
59- this . db . unsafeMode ( true ) ;
60-
56+ await concurrentSafeWrite . initWALAndConcurrentSafeWriteCapabilities ( this . db ) ;
6157 // here we might want to skip DB initialization if version is not null
6258
6359 this . create = { } ;
@@ -66,7 +62,7 @@ class UserDatabase {
6662 this . delete = { } ;
6763
6864 // create all tables
69- Object . keys ( tables ) . forEach ( ( tableName ) => {
65+ for ( const tableName of Object . keys ( tables ) ) {
7066 const columnsTypes = [ ] ;
7167 const indexes = [ ] ;
7268 const columnNames = Object . keys ( tables [ tableName ] ) ;
@@ -76,20 +72,24 @@ class UserDatabase {
7672 if ( column . index ) { indexes . push ( columnName ) ; }
7773 } ) ;
7874
79- this . db . prepare ( 'CREATE TABLE IF NOT EXISTS events ( ' +
80- columnsTypes . join ( ', ' ) +
81- ');' ) . run ( ) ;
82-
83- indexes . forEach ( ( columnName ) => {
84- this . db . prepare ( `CREATE INDEX IF NOT EXISTS ${ tableName } _${ columnName } ON ${ tableName } (${ columnName } )` ) . run ( ) ;
75+ await concurrentSafeWrite . execute ( ( ) => {
76+ this . db . prepare ( 'CREATE TABLE IF NOT EXISTS events ( ' +
77+ columnsTypes . join ( ', ' ) +
78+ ');' ) . run ( ) ;
8579 } ) ;
8680
81+ for ( const columnName of indexes ) {
82+ await concurrentSafeWrite . execute ( ( ) => {
83+ this . db . prepare ( `CREATE INDEX IF NOT EXISTS ${ tableName } _${ columnName } ON ${ tableName } (${ columnName } )` ) . run ( ) ;
84+ } ) ;
85+ }
86+
8787 this . create [ tableName ] = this . db . prepare ( `INSERT INTO ${ tableName } (` +
8888 columnNames . join ( ', ' ) + ') VALUES (@' +
8989 columnNames . join ( ', @' ) + ')' ) ;
9090
9191 this . getAll [ tableName ] = this . db . prepare ( `SELECT * FROM ${ tableName } ` ) ;
92- } ) ;
92+ }
9393
9494 // -- create FTS for streamIds on events
9595 createFTSFor ( this . db , 'events' , tables . events , [ 'streamIds' ] ) ;
@@ -114,9 +114,9 @@ class UserDatabase {
114114 eventForDb . eventid = eventId ;
115115 const update = this . db . prepare ( queryString ) ;
116116
117- await this . concurentSafeWriteStatement ( ( ) => {
117+ await concurrentSafeWrite . execute ( ( ) => {
118118 const res = update . run ( eventForDb ) ;
119- this . logger . debug ( ' UPDATE events changes:' + res . changes + ' eventId:' + eventId + ' event:' + JSON . stringify ( eventForDb ) ) ;
119+ this . logger . debug ( ` UPDATE events changes: ${ res . changes } eventId: ${ eventId } event: ${ JSON . stringify ( eventForDb ) } ` ) ;
120120 if ( res . changes !== 1 ) {
121121 throw new Error ( 'Event not found' ) ;
122122 }
@@ -132,14 +132,14 @@ class UserDatabase {
132132 */
133133 createEventSync ( event ) {
134134 const eventForDb = eventSchemas . eventToDB ( event ) ;
135- this . logger . debug ( ' (sync) CREATE event:' + JSON . stringify ( eventForDb ) ) ;
135+ this . logger . debug ( ` (sync) CREATE event: ${ JSON . stringify ( eventForDb ) } ` ) ;
136136 this . create . events . run ( eventForDb ) ;
137137 }
138138
139139 async createEvent ( event ) {
140140 const eventForDb = eventSchemas . eventToDB ( event ) ;
141- await this . concurentSafeWriteStatement ( ( ) => {
142- this . logger . debug ( '(async) CREATE event:' + JSON . stringify ( eventForDb ) ) ;
141+ this . logger . debug ( `(async) CREATE event: ${ JSON . stringify ( eventForDb ) } ` ) ;
142+ await concurrentSafeWrite . execute ( ( ) => {
143143 this . create . events . run ( eventForDb ) ;
144144 } ) ;
145145 }
@@ -153,47 +153,47 @@ class UserDatabase {
153153 }
154154
155155 async deleteEventsHistory ( eventId ) {
156- await this . concurentSafeWriteStatement ( ( ) => {
157- this . logger . debug ( '(async) DELETE event history for eventId:' + eventId ) ;
156+ this . logger . debug ( `(async) DELETE event history for eventId: ${ eventId } ` ) ;
157+ await concurrentSafeWrite . execute ( ( ) => {
158158 return this . delete . eventsByHeadId . run ( eventId ) ;
159159 } ) ;
160160 }
161161
162162 async minimizeEventHistory ( eventId , fieldsToRemove ) {
163163 const minimizeHistoryStatement = `UPDATE events SET ${ fieldsToRemove . map ( field => `${ field } = ${ field === 'streamIds' ? '\'' + ALL_EVENTS_TAG + '\'' : 'NULL' } ` ) . join ( ', ' ) } WHERE headId = ?` ;
164- await this . concurentSafeWriteStatement ( ( ) => {
165- this . logger . debug ( '(async) Minimize event history :' + minimizeHistoryStatement ) ;
164+ this . logger . debug ( `(async) Minimize event history: ${ minimizeHistoryStatement } ` ) ;
165+ await concurrentSafeWrite . execute ( ( ) => {
166166 this . db . prepare ( minimizeHistoryStatement ) . run ( eventId ) ;
167167 } ) ;
168168 }
169169
170170 async deleteEvents ( params ) {
171171 const queryString = prepareEventsDeleteQuery ( params ) ;
172172 if ( queryString . indexOf ( 'MATCH' ) > 0 ) {
173- this . logger . debug ( ' DELETE events one by one as queryString includes MATCH: ' + queryString ) ;
173+ this . logger . debug ( ` DELETE events one by one as queryString includes MATCH: ${ queryString } ` ) ;
174174 // SQLite does not know how to delete with "MATCH" statement
175175 // going by the doddgy task of getting events that matches the query and deleting them one by one
176176 const selectEventsToBeDeleted = prepareEventsGetQuery ( params ) ;
177177
178178 for ( const event of this . db . prepare ( selectEventsToBeDeleted ) . iterate ( ) ) {
179- await this . concurentSafeWriteStatement ( ( ) => {
180- this . logger . debug ( ' > DELETE event: ' + event . eventid ) ;
179+ this . logger . debug ( ` > DELETE event: ${ event . eventid } ` ) ;
180+ await concurrentSafeWrite . execute ( ( ) => {
181181 this . delete . eventById . run ( event . eventid ) ;
182182 } ) ;
183183 }
184184 return null ;
185185 }
186186 // else
187187 let res = null ;
188- await this . concurentSafeWriteStatement ( ( ) => {
189- this . logger . debug ( 'DELETE events: ' + queryString ) ;
188+ this . logger . debug ( `DELETE events: ${ queryString } ` ) ;
189+ await concurrentSafeWrite . execute ( ( ) => {
190190 res = this . db . prepare ( queryString ) . run ( ) ;
191191 } ) ;
192192 return res ;
193193 }
194194
195195 getOneEvent ( eventId ) {
196- this . logger . debug ( ' GET ONE event: ' + eventId ) ;
196+ this . logger . debug ( ` GET ONE event: ${ eventId } ` ) ;
197197 const event = this . get . eventById . get ( eventId ) ;
198198 if ( event == null ) return null ;
199199 return eventSchemas . eventFromDB ( event ) ;
@@ -202,7 +202,7 @@ class UserDatabase {
202202 getEvents ( params ) {
203203 const queryString = prepareEventsGetQuery ( params ) ;
204204
205- this . logger . debug ( ' GET Events:' + queryString ) ;
205+ this . logger . debug ( ` GET Events: ${ queryString } ` ) ;
206206 const res = this . db . prepare ( queryString ) . all ( ) ;
207207 if ( res != null ) {
208208 return res . map ( eventSchemas . eventFromDB ) ;
@@ -212,18 +212,18 @@ class UserDatabase {
212212
213213 getEventsStream ( params ) {
214214 const queryString = prepareEventsGetQuery ( params ) ;
215- this . logger . debug ( ' GET Events Stream: ' + queryString ) ;
215+ this . logger . debug ( ` GET Events Stream: ${ queryString } ` ) ;
216216 const query = this . db . prepare ( queryString ) ;
217217 return this . readableEventsStreamForIterator ( query . iterate ( ) ) ;
218218 }
219219
220220 getEventsDeletionsStream ( deletedSince ) {
221- this . logger . debug ( ' GET Events Deletions since: ' + deletedSince ) ;
221+ this . logger . debug ( ` GET Events Deletions since: ${ deletedSince } ` ) ;
222222 return this . readableEventsStreamForIterator ( this . get . eventsDeletedSince . iterate ( deletedSince ) ) ;
223223 }
224224
225225 getEventsHistory ( eventId ) {
226- this . logger . debug ( ' GET Events History for: ' + eventId ) ;
226+ this . logger . debug ( ` GET Events History for: ${ eventId } ` ) ;
227227 return this . get . eventHistory . all ( eventId ) . map ( eventSchemas . historyEventFromDB ) ;
228228 }
229229
@@ -254,31 +254,10 @@ class UserDatabase {
254254 close ( ) {
255255 this . db . close ( ) ;
256256 }
257-
258- /**
259- * Will look "retries" times, in case of "SQLITE_BUSY".
260- * This is CPU intensive, but tests have shown this solution to be efficient
261- */
262- async concurentSafeWriteStatement ( statement , retries = 100 ) {
263- for ( let i = 0 ; i < retries ; i ++ ) {
264- try {
265- statement ( ) ;
266- return ;
267- } catch ( error ) {
268- if ( error . code !== 'SQLITE_BUSY' ) { // ignore
269- throw error ;
270- }
271- const waitTime = i > ( WAIT_LIST_MS . length - 1 ) ? 100 : WAIT_LIST_MS [ i ] ;
272- await new Promise ( ( resolve ) => setTimeout ( resolve , waitTime ) ) ;
273- this . logger . debug ( 'SQLITE_BUSY, retrying in ' + waitTime + 'ms' ) ;
274- }
275- }
276- throw new Error ( 'Failed write action on Audit after ' + retries + ' retries' ) ;
277- }
278257}
279258
280259function prepareEventsDeleteQuery ( params ) {
281- if ( params . streams ) { throw new Error ( 'events DELETE with stream query not supported yet: ' + JSON . stringify ( params ) ) ; }
260+ if ( params . streams ) { throw new Error ( `Events DELETE with stream query not supported yet: ${ JSON . stringify ( params ) } ` ) ; }
282261 return 'DELETE FROM events ' + prepareQuery ( params , true ) ;
283262}
284263
0 commit comments