-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathengine.go
More file actions
190 lines (162 loc) · 4.43 KB
/
engine.go
File metadata and controls
190 lines (162 loc) · 4.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package stigma
import (
"errors"
"fmt"
"math"
"time"
)
// EngineConfig configures the stigma engine.
type EngineConfig struct {
DefaultResolution Resolution
MinResolution Resolution
MaxResolution Resolution
WALPath string
SnapshotPath string
FlushInterval time.Duration
}
// Engine aggregates ingest, query, and persistence logic.
// Engine is NOT safe for concurrent use by multiple goroutines in v1.
type Engine struct {
cfg EngineConfig
cellIndex *CellIndex
nodes *NodeStore
transitions *TransitionStore
states *EntityStateStore
wal *WAL
}
// NewEngine creates a new Engine and replays any WAL if present.
func NewEngine(cfg EngineConfig) (*Engine, error) {
e := &Engine{
cfg: cfg,
cellIndex: NewCellIndex(),
nodes: NewNodeStore(),
transitions: NewTransitionStore(),
states: NewEntityStateStore(),
}
if cfg.WALPath != "" {
wal, err := OpenWAL(cfg.WALPath)
if err != nil {
return nil, fmt.Errorf("open wal: %w", err)
}
e.wal = wal
if err := e.replayWAL(); err != nil {
return nil, err
}
}
return e, nil
}
// Close flushes and closes any persistence layers.
func (e *Engine) Close() error {
if e.wal != nil {
return e.wal.Close()
}
return nil
}
func (e *Engine) clampResolution(res Resolution) Resolution {
if e.cfg.MinResolution != 0 && res < e.cfg.MinResolution {
return e.cfg.MinResolution
}
if e.cfg.MaxResolution != 0 && res > e.cfg.MaxResolution {
return e.cfg.MaxResolution
}
return res
}
// IngestSample processes a single sample.
func (e *Engine) IngestSample(sample LocationSample) error {
return e.ingest([]LocationSample{sample}, true)
}
func validateSample(s LocationSample) error {
if s.EntityID == "" {
return errors.New("entity id is required")
}
if math.IsNaN(s.Lat) || math.IsNaN(s.Lon) || s.Lat < -90 || s.Lat > 90 || s.Lon < -180 || s.Lon > 180 {
return errors.New("invalid coordinates")
}
if s.Timestamp.IsZero() {
return errors.New("timestamp is required")
}
return nil
}
// IngestBatch processes multiple samples efficiently.
func (e *Engine) IngestBatch(samples []LocationSample) error {
return e.ingest(samples, true)
}
func (e *Engine) ingest(samples []LocationSample, logToWAL bool) error {
for i := range samples {
if err := validateSample(samples[i]); err != nil {
return err
}
}
if logToWAL && e.wal != nil {
if err := e.wal.Append(samples); err != nil {
return err
}
}
res := e.cfg.DefaultResolution
res = e.clampResolution(res)
for i := range samples {
sample := samples[i]
cellID, err := e.cellIndex.CellForCoord(sample.Lat, sample.Lon, res)
if err != nil {
return err
}
e.nodes.Update(cellID, res, sample)
prev, ok := e.states.Get(sample.EntityID)
if ok {
if prev.LastCell != cellID {
delta := sample.Timestamp.Sub(prev.LastTime)
e.transitions.Update(prev.LastCell, cellID, res, delta, sample.Timestamp)
}
}
e.states.Update(sample.EntityID, cellID, sample.Timestamp)
}
return nil
}
// QueryStigmaMap returns aggregates for the query bounds.
func (e *Engine) QueryStigmaMap(q StigmaMapQuery) (StigmaMap, error) {
res := e.cfg.DefaultResolution
if q.Resolution != nil {
res = e.clampResolution(*q.Resolution)
} else {
res = e.clampResolution(res)
}
cells, err := e.cellIndex.CellsInBBox(q.MinLat, q.MinLon, q.MaxLat, q.MaxLon, res)
if err != nil {
return StigmaMap{}, err
}
cellSet := make(map[CellID]struct{}, len(cells))
for _, c := range cells {
cellSet[c] = struct{}{}
}
nodes := e.nodes.TimeFiltered(q.StartTime, q.EndTime)
filteredNodes := make([]NodeAggregate, 0, len(nodes))
for _, n := range nodes {
if _, ok := cellSet[n.CellID]; !ok {
continue
}
if q.MinSampleCount > 0 && n.SampleCount < q.MinSampleCount {
continue
}
filteredNodes = append(filteredNodes, n)
}
result := StigmaMap{Nodes: filteredNodes}
if q.IncludeTransitions {
result.Transitions = e.transitions.FilterByCells(cellSet, q.StartTime, q.EndTime)
}
return result, nil
}
// GetCellStats returns aggregate for a single cell.
func (e *Engine) GetCellStats(cellID CellID) (NodeAggregate, bool) {
return e.nodes.Get(cellID)
}
// GetEntityPathSummary returns visitation summary for an entity.
func (e *Engine) GetEntityPathSummary(entityID string) (EntityPathSummary, bool) {
return e.states.PathSummary(entityID)
}
func (e *Engine) replayWAL() error {
entries, err := e.wal.ReadAll()
if err != nil {
return err
}
return e.ingest(entries, false)
}