22import * as util from '../content/shared.js' ;
33
44// TODO: App-token for 'auth'?
5+ // TODO: More of this could be handled in SQL.
56
6- // Samples are consolidated after they are this age.
7- const DEF_CONSOLIDATE_AGE = 0.5 ;
7+ // Samples are consolidated after they are this age in days .
8+ const DEF_CONSOLIDATE_AGE = 1 ;
89
910// Only the N-newest samples are kept so that
1011// recent samples can eventually flip a coverage tile.
@@ -30,29 +31,27 @@ function consolidateSamples(samples, cutoffTime) {
3031 // Build the uber sample.
3132 samples . forEach ( s => {
3233 // Was this sample handled in a previous batch?
33- if ( s . metadata . time <= cutoffTime )
34+ if ( s . time <= cutoffTime )
3435 return ;
3536
36- const path = s . metadata . path ?? [ ] ;
37- const observed = s . metadata . observed ?? path . length > 0 ;
37+ uberSample . time = Math . max ( s . time , uberSample . time ) ;
38+ uberSample . snr = util . definedOr ( Math . max , s . snr , uberSample . snr ) ;
39+ uberSample . rssi = util . definedOr ( Math . max , s . rssi , uberSample . rssi ) ;
3840
39- uberSample . time = Math . max ( s . metadata . time , uberSample . time ) ;
40- uberSample . snr = util . definedOr ( Math . max , s . metadata . snr , uberSample . snr ) ;
41- uberSample . rssi = util . definedOr ( Math . max , s . metadata . rssi , uberSample . rssi ) ;
42-
43- if ( observed ) {
41+ if ( s . observed ) {
4442 uberSample . observed ++ ;
45- uberSample . lastObserved = Math . max ( s . metadata . time , uberSample . lastObserved ) ;
43+ uberSample . lastObserved = Math . max ( s . time , uberSample . lastObserved ) ;
4644 }
4745
48- if ( path . length > 0 ) {
46+ if ( s . observed || s . repeaters . length > 0 ) {
4947 uberSample . heard ++ ;
50- uberSample . lastHeard = Math . max ( s . metadata . time , uberSample . lastHeard ) ;
48+ uberSample . lastHeard = Math . max ( s . time , uberSample . lastHeard ) ;
5149 } else {
5250 uberSample . lost ++ ;
5351 }
5452
55- path . forEach ( p => {
53+ const repeaters = JSON . parse ( s . repeaters ) ;
54+ repeaters . forEach ( p => {
5655 if ( ! uberSample . repeaters . includes ( p ) )
5756 uberSample . repeaters . push ( p ) ;
5857 } ) ;
@@ -108,7 +107,7 @@ async function mergeCoverage(key, samples, store) {
108107 // Sort and keep the N-newest.
109108 value = value . toSorted ( ( a , b ) => a . time - b . time ) . slice ( - MAX_SAMPLES_PER_COVERAGE ) ;
110109 }
111-
110+
112111 // Compute new metadata stats, but keep the existing repeater list (for now).
113112 const metadata = {
114113 observed : 0 ,
@@ -139,52 +138,47 @@ async function mergeCoverage(key, samples, store) {
139138
140139export async function onRequest ( context ) {
141140 const coverageStore = context . env . COVERAGE ;
142- const sampleStore = context . env . SAMPLES ;
143- const archiveStore = context . env . ARCHIVE ;
144141
145142 const url = new URL ( context . request . url ) ;
146143 let maxAge = url . searchParams . get ( 'maxAge' ) ?? DEF_CONSOLIDATE_AGE ; // Days
147144 if ( maxAge <= 0 )
148145 maxAge = DEF_CONSOLIDATE_AGE ;
149146
150147 const result = {
151- coverage_entites_to_update : 0 ,
148+ coverage_to_update : 0 ,
152149 samples_to_update : 0 ,
153150 merged_ok : 0 ,
154151 merged_fail : 0 ,
155- archive_ok : 0 ,
156- archive_fail : 0 ,
157- delete_ok : 0 ,
158- delete_fail : 0 ,
159- delete_skip : 0
152+ merged_skip : 0 ,
160153 } ;
154+ const now = Date . now ( ) ;
161155 const hashToSamples = new Map ( ) ;
162- let cursor = null ;
163-
164- // Build index of old samples.
165- do {
166- const samplesList = await sampleStore . list ( { cursor : cursor } ) ;
167- cursor = samplesList . cursor ?? null ;
168-
169- // Group samples by 6-digit hash
170- samplesList . keys . forEach ( s => {
171- // Ignore recent samples.
172- if ( util . ageInDays ( s . metadata . time ) < maxAge ) return ;
173-
174- result . samples_to_update ++ ;
175- const key = s . name . substring ( 0 , 6 ) ;
176- util . pushMap ( hashToSamples , key , {
177- key : s . name ,
178- metadata : s . metadata
179- } ) ;
180- } ) ;
181- } while ( cursor !== null ) ;
182156
183- result . coverage_entites_to_update = hashToSamples . size
157+ // Get old samples.
158+ const { results : samples } = await context . env . DB
159+ . prepare ( "SELECT * FROM samples WHERE time < ?" )
160+ . bind ( now - ( maxAge * util . dayInMillis ) )
161+ . all ( ) ;
162+ console . log ( `Old samples:${ samples . length } ` ) ;
163+ result . samples_to_update = samples . length ;
164+
165+ // Build index of old samples - group by 6-digit hash.
166+ samples . forEach ( s => {
167+ const key = s . hash . substring ( 0 , 6 ) ;
168+ util . pushMap ( hashToSamples , key , s ) ;
169+ } ) ;
170+ console . log ( `Coverage to update:${ hashToSamples . size } ` ) ;
171+ result . coverage_to_update = hashToSamples . size
172+
184173 const mergedKeys = [ ] ;
174+ let mergeCount = 0 ;
185175
186176 // Merge old samples into coverage items.
187- await Promise . all ( hashToSamples . entries ( ) . map ( async ( [ k , v ] ) => {
177+ for ( const [ k , v ] of hashToSamples . entries ( ) ) {
178+ // To prevent hitting KV limits, only handle first N.
179+ if ( ++ mergeCount > 500 )
180+ break ;
181+
188182 try {
189183 await mergeCoverage ( k , v , coverageStore ) ;
190184 result . merged_ok ++ ;
@@ -193,31 +187,24 @@ export async function onRequest(context) {
193187 console . log ( `Merge failed. ${ e } ` ) ;
194188 result . merged_fail ++ ;
195189 }
196- } ) ) ;
190+ }
191+ result . merged_skip = hashToSamples . size - ( result . merged_ok + result . merged_fail ) ;
197192
198193 // Archive and delete the old samples.
199- await Promise . all ( mergedKeys . map ( async k => {
194+ const cleanupStmts = [ ] ;
195+ mergedKeys . forEach ( k => {
200196 const v = hashToSamples . get ( k ) ;
201197 for ( const sample of v ) {
202- try {
203- await archiveStore . put ( sample . key , "" , {
204- metadata : sample . metadata
205- } ) ;
206- result . archive_ok ++ ;
207- try {
208- await sampleStore . delete ( sample . key ) ;
209- result . delete_ok ++ ;
210- } catch ( e ) {
211- console . log ( `Delete failed. ${ e } ` ) ;
212- result . delete_fail ++ ;
213- }
214- } catch ( e ) {
215- console . log ( `Archive failed. ${ e } ` ) ;
216- result . archive_fail ++ ;
217- result . delete_skip ++ ;
218- }
198+ cleanupStmts . push ( context . env . DB
199+ . prepare ( "INSERT INTO sample_archive (time, data) VALUES (?, ?)" )
200+ . bind ( now , JSON . stringify ( sample ) ) ) ;
201+ cleanupStmts . push ( context . env . DB
202+ . prepare ( "DELETE FROM samples WHERE hash == ?" )
203+ . bind ( sample . hash ) ) ;
219204 }
220- } ) ) ;
205+ } ) ;
206+ if ( cleanupStmts . length > 0 )
207+ await context . env . DB . batch ( cleanupStmts ) ;
221208
222209 return Response . json ( result ) ;
223210}
0 commit comments