Skip to content

Commit c31a7ed

Browse files
authored
[docs] Add implementation flow details (#68)
#### Type of change - Documentation update #### Description - Add implementation details and flow to the query service. #### Related issues - resolves #59 --------- Signed-off-by: Liran Funaro <liran.funaro@gmail.com>
1 parent 7792455 commit c31a7ed

3 files changed

Lines changed: 314 additions & 0 deletions

File tree

docs/query-service.md

Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ SPDX-License-Identifier: Apache-2.0
2121
- [Batching Strategy](#batching-strategy)
2222
- [Lock-Free Design](#lock-free-design)
2323
5. [Configuration](#5-configuration)
24+
6. [Implementation](#6-implementation)
25+
- [Implementation Flow](#implementation-flow)
26+
- [Lock-Free Design](#lock-free-design)
27+
- [Error Handling and Recovery](#error-handling-and-recovery)
2428

2529
## 1. Overview
2630

@@ -191,3 +195,307 @@ type Config struct {
191195
MaxViewTimeout time.Duration
192196
}
193197
```
198+
199+
## 6. Implementation
200+
201+
The Query Service consists of several key components:
202+
203+
* **Service**: The main entry point that implements the gRPC `QueryServiceServer` interface. It handles incoming requests, manages views, and coordinates query execution.
204+
* **ViewsBatcher**: The component that manages views and batches queries for efficient database access. It uses lock-free data structures to minimize contention and maximize throughput.
205+
* **ViewHolder**: Represents a client's view of the database with specific isolation requirements. Each view has a unique ID and parameters that determine its behavior.
206+
* **Batcher**: Aggregates queries with similar parameters to optimize database access. Multiple views with the same parameters may share a batcher.
207+
* **NamespaceQueryBatch**: Collects keys to be queried within a specific namespace. Once a batch reaches a certain size or age, it is executed as a single database query.
208+
209+
### Implementation Flow
210+
211+
The Query Service operates with the following three stages.
212+
213+
#### Stage 1. View Management
214+
215+
View management provides clients with configurable isolation levels to the database.
216+
This stage is optional as the client can call `GetRows()` with `nil` view.
217+
Using `nil` view does not offer any consistency between future calls to `GetRows()`.
218+
219+
**a. View Creation:**
220+
221+
When a client calls `BeginView()`, the service:
222+
223+
1. Validates the requested timeout, capping it at the configured maximum if necessary
224+
2. Generates a unique UUID for the view
225+
3. Creates a new `viewHolder` with the specified parameters
226+
4. Stores the view in the `viewIDToViewHolder` map
227+
228+
```go
229+
func (q *Service) BeginView(
230+
ctx context.Context, params *protoqueryservice.ViewParameters,
231+
) (*protoqueryservice.View, error) {
232+
// ...
233+
// Validate and cap timeout.
234+
if params.TimeoutMilliseconds == 0 ||
235+
int64(params.TimeoutMilliseconds) > q.config.MaxViewTimeout.Milliseconds() {
236+
params.TimeoutMilliseconds = uint64(q.config.MaxViewTimeout.Milliseconds())
237+
}
238+
239+
// Generate unique view ID and create view.
240+
// We try again if we have view-id collision.
241+
for ctx.Err() == nil {
242+
viewID, err := getUUID()
243+
if err != nil { return nil, err }
244+
if q.batcher.makeView(viewID, params) {
245+
return &protoqueryservice.View{Id: viewID}, nil
246+
}
247+
}
248+
return nil, ctx.Err()
249+
}
250+
```
251+
252+
**b. View Parameters:**
253+
254+
Each view is configured with specific parameters:
255+
256+
1. **Isolation Level**: Determines the consistency guarantees for the view
257+
258+
- Read Uncommitted (0): Lowest isolation, may read uncommitted changes
259+
- Read Committed (1): Only reads committed data
260+
- Repeatable Read (2): Ensures the same data is returned for repeated reads
261+
- Serializable (3): (default) Highest isolation, ensures transactions are completely isolated
262+
263+
2. **Deferred Status**: Controls when isolation level violations are checked
264+
265+
- Non-deferred (false): Checks happen immediately for each operation
266+
- Deferred (true): (default) Checks happen only at transaction commit time
267+
268+
3. **Timeout**: Maximum lifetime for the view
269+
270+
- If omitted, it defaults to the maximum value defined in the service configuration
271+
272+
**c. View Aggregation:**
273+
274+
To optimize resource usage, views with identical parameters created within the `ViewAggregationWindow` are batched together:
275+
276+
1. The service calculates a parameter key based on the isolation level and deferred status
277+
2. It checks if a batcher already exists for these parameters
278+
3. If a batcher exists and is within the aggregation window, the view is associated with it
279+
4. Otherwise, a new batcher is created
280+
281+
**d. View Termination:**
282+
283+
When a client calls `EndView()` or when a view times out:
284+
285+
1. The view is removed from the `viewIDToViewHolder` map
286+
2. If the view was the last reference to its batcher, the batcher is cleaned up
287+
3. Any resources associated with the view are released
288+
289+
#### Stage 2. Query Batching and Execution
290+
291+
Query batching is an optimization that allows the service to minimize database access by combining multiple queries into a single operation.
292+
293+
**a. Query Assignment:**
294+
295+
When a client calls `GetRows()`, the service:
296+
297+
1. Retrieves the view associated with the provided view ID
298+
2. For each namespace in the query:
299+
- Gets or creates a `namespaceQueryBatch` for that namespace
300+
- Adds the requested keys to the batch
301+
- Returns a reference to the batch to the client
302+
303+
```go
304+
func (q *Service) assignRequest(
305+
ctx context.Context, query *protoqueryservice.Query,
306+
) ([]*namespaceQueryBatch, error) {
307+
// ...
308+
batcher, err := q.batcher.getBatcher(ctx, query.View)
309+
if err != nil { return nil, err }
310+
311+
batches := make([]*namespaceQueryBatch, len(query.Namespaces))
312+
for i, ns := range query.Namespaces {
313+
batches[i], err = batcher.addNamespaceKeys(ctx, ns.NsId, ns.Keys)
314+
if err != nil { return nil, err }
315+
}
316+
return batches, nil
317+
}
318+
```
319+
320+
**b. Batch Execution Triggers:**
321+
322+
A batch is executed when either of these conditions is met:
323+
324+
1. The batch reaches `MinBatchKeys` keys
325+
2. The batch has been waiting for `MaxBatchWait` duration
326+
327+
**c. Database Query Execution:**
328+
329+
When a batch is ready for execution:
330+
331+
1. The batcher acquires a database connection from the pool
332+
2. It constructs a SQL query that retrieves all keys in the batch in a single operation
333+
3. The query is executed with the appropriate isolation level
334+
4. Results are stored in the batch for distribution to clients
335+
336+
**d. Concurrent Batch Management:**
337+
338+
Multiple batches can be in different stages simultaneously:
339+
340+
1. Some batches may be collecting keys
341+
2. Others may be waiting to reach the minimum batch size
342+
3. Some may be executing database queries
343+
4. Others may be distributing results to clients
344+
345+
The service uses lock-free data structures to manage this concurrency efficiently.
346+
347+
#### Stage 3. Result Distribution
348+
349+
Once a batch query completes, the results need to be distributed to the clients that requested them.
350+
351+
**a. Result Storage:**
352+
353+
When a batch query completes:
354+
355+
1. The results are parsed from the database response
356+
2. They are stored in a map keyed by the key bytes
357+
3. The batch is marked as finalized
358+
359+
**b. Client Notification:**
360+
361+
Clients waiting for results are notified through a combination of:
362+
363+
1. Mutex-protected state changes
364+
2. Context cancellation for error cases
365+
3. Direct result access for successful queries
366+
367+
**c. Result Retrieval:**
368+
369+
When `waitForRows()` is called on a batch:
370+
371+
1. If the batch is already finalized, results are returned immediately
372+
2. Otherwise, the client waits until the batch is finalized or the context is canceled
373+
3. Once finalized, the client extracts only the results for the keys it requested
374+
375+
```go
376+
func (b *namespaceQueryBatch) waitForRows(ctx context.Context, keys [][]byte) ([]*protoqueryservice.Row, error) {
377+
// Wait for batch to be finalized or context to be canceled.
378+
select {
379+
case <-ctx.Done():
380+
return nil, ctx.Err()
381+
case <-b.ctx.Done():
382+
// Query completed.
383+
}
384+
385+
// ...
386+
387+
// Extract results for requested keys
388+
res := make([]*protoqueryservice.Row, len(keys))
389+
for i, key := range keys {
390+
// Get result for this key from the batch results.
391+
if row, ok := q.result[string(key)]; ok && row != nil {
392+
res = append(res, row)
393+
}
394+
}
395+
return rows, nil
396+
}
397+
```
398+
399+
### Lock-Free Design
400+
401+
The Query Service uses lock-free data structures to minimize contention and maximize throughput.
402+
403+
**a. Concurrent Maps:**
404+
405+
The service uses specialized concurrent maps for:
406+
407+
1. Mapping view IDs to view holders
408+
2. Mapping view parameters to batchers
409+
3. Mapping namespaces to query batches
410+
411+
These maps allow multiple goroutines to access and modify the maps concurrently without blocking.
412+
413+
**b. Update-or-Create Pattern:**
414+
415+
A key pattern used throughout the service is the `mapUpdateOrCreate` function, which:
416+
417+
1. Attempts to load an existing value from a map
418+
2. If found, tries to update it using a provided function
419+
3. If not found or update fails, creates a new value
420+
4. Uses atomic compare-and-swap operations to ensure consistency
421+
422+
```go
423+
func mapUpdateOrCreate[K, V any](
424+
ctx context.Context, m *utils.SyncMap[K, *V], key K, methods updateOrCreate[V],
425+
) (*V, error) {
426+
val, loaded := m.Load(key)
427+
for ctx.Err() == nil {
428+
// If there is a value, and we can update it, then return it.
429+
if loaded && val != nil {
430+
if methods.update(val) {
431+
return val, nil
432+
}
433+
}
434+
435+
// Otherwise, let's try to assign a new value.
436+
newVal := methods.create()
437+
var assigned bool
438+
if !loaded {
439+
val, loaded = m.LoadOrStore(key, newVal)
440+
assigned = !loaded
441+
} else if assigned = m.CompareAndSwap(key, val, newVal); !assigned {
442+
// If the CAS failed, we need to load the new value.
443+
val, loaded = m.Load(key)
444+
}
445+
446+
if assigned {
447+
methods.post(newVal)
448+
val = newVal
449+
loaded = true
450+
}
451+
}
452+
return nil, ctx.Err()
453+
}
454+
```
455+
456+
**c. Minimal Locking:**
457+
458+
When locks are necessary, they are used with minimal scope:
459+
460+
1. Fine-grained locks protect only the specific data being modified
461+
2. Operations that don't modify shared state are performed outside lock sections
462+
3. Long-running operations like database queries are performed without holding locks
463+
464+
### Error Handling and Recovery
465+
466+
The Query Service is designed to handle various error conditions gracefully.
467+
468+
**a. View Timeout Handling:**
469+
470+
Each view has a context with a timeout based on the client's request:
471+
472+
1. When a view times out, its context is canceled
473+
2. All operations associated with the view receive context cancellation errors
474+
3. Resources associated with the view are cleaned up
475+
476+
**b. Database Connection Failures:**
477+
478+
If a database connection fails:
479+
480+
1. The affected batch is marked as failed
481+
2. Clients waiting for results receive an error
482+
3. The service continues processing other batches
483+
4. New connection attempts are made for subsequent operations
484+
485+
**c. Invalid View Handling:**
486+
487+
If a client attempts to use an invalid or expired view:
488+
489+
1. The service returns `ErrInvalidOrStaleView`
490+
2. The client can create a new view and retry the operation
491+
492+
**d. Graceful Shutdown:**
493+
494+
When the service is shutting down:
495+
496+
1. The main context is canceled
497+
2. All view contexts are canceled
498+
3. In-flight operations are allowed to complete or time out
499+
4. Database connections are properly closed
500+
501+
This comprehensive error handling ensures the service remains stable and recovers gracefully from failures.

service/query/batcher.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,10 +421,12 @@ func (q *namespaceQueryBatch) waitForRows(
421421
ctx context.Context,
422422
keys [][]byte,
423423
) ([]*protoqueryservice.Row, error) {
424+
// Wait for batch to be finalized or context to be canceled.
424425
select {
425426
case <-ctx.Done():
426427
return nil, ctx.Err()
427428
case <-q.ctx.Done():
429+
// Query completed.
428430
}
429431

430432
if errors.Is(q.ctx.Err(), context.DeadlineExceeded) {
@@ -435,8 +437,10 @@ func (q *namespaceQueryBatch) waitForRows(
435437
return nil, err
436438
}
437439

440+
// Extract results for requested keys.
438441
res := make([]*protoqueryservice.Row, 0, len(keys))
439442
for _, key := range keys {
443+
// Get result for this key from the batch results.
440444
if row, ok := q.result[string(key)]; ok && row != nil {
441445
res = append(res, row)
442446
}

service/query/query_service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,13 @@ func (q *Service) BeginView(
8787
q.metrics.requests.WithLabelValues(grpcBeginView).Inc()
8888
defer q.requestLatency(grpcBeginView, time.Now())
8989

90+
// Validate and cap timeout.
9091
if params.TimeoutMilliseconds == 0 ||
9192
int64(params.TimeoutMilliseconds) > q.config.MaxViewTimeout.Milliseconds() { //nolint:gosec
9293
params.TimeoutMilliseconds = uint64(q.config.MaxViewTimeout.Milliseconds()) //nolint:gosec
9394
}
9495

96+
// Generate unique view ID and create view.
9597
// We try again if we have view-id collision.
9698
for ctx.Err() == nil {
9799
viewID, err := getUUID()

0 commit comments

Comments
 (0)