Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions stigma/cellindex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package stigma

import (
"errors"
"math"
)

// CellIndex maps lat/lon coordinates to deterministic CellIDs.
type CellIndex struct{}

// NewCellIndex constructs a CellIndex.
func NewCellIndex() *CellIndex {
return &CellIndex{}
}

// CellForCoord converts a coordinate into a CellID for the given resolution.
// This is a placeholder grid; the API mirrors an H3-based implementation.
func (c *CellIndex) CellForCoord(lat, lon float64, res Resolution) (CellID, error) {
if math.IsNaN(lat) || math.IsNaN(lon) || math.IsInf(lat, 0) || math.IsInf(lon, 0) {
return 0, errors.New("invalid coordinate")
}

factor := math.Pow(2, float64(res)+1) // higher resolution => finer grid
x := int64(math.Floor((lon + 180.0) * factor))
y := int64(math.Floor((lat + 90.0) * factor))
return CellID(uint64(y)<<32 | uint64(x&0xffffffff)), nil
}

// CellsInBBox returns all cells intersecting the bounding box for the resolution.
func (c *CellIndex) CellsInBBox(minLat, minLon, maxLat, maxLon float64, res Resolution) ([]CellID, error) {
if minLat > maxLat || minLon > maxLon {
return nil, errors.New("invalid bounding box")
}

factor := math.Pow(2, float64(res)+1)
minX := int64(math.Floor((minLon + 180.0) * factor))
maxX := int64(math.Floor((maxLon + 180.0) * factor))
minY := int64(math.Floor((minLat + 90.0) * factor))
maxY := int64(math.Floor((maxLat + 90.0) * factor))

var cells []CellID
for y := minY; y <= maxY; y++ {
for x := minX; x <= maxX; x++ {
cells = append(cells, CellID(uint64(y)<<32|uint64(x&0xffffffff)))
}
}

return cells, nil
}
190 changes: 190 additions & 0 deletions stigma/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package stigma

import (
"errors"
"fmt"
"math"
"time"
)

// EngineConfig configures the stigma engine.
type EngineConfig struct {
DefaultResolution Resolution
MinResolution Resolution
MaxResolution Resolution

WALPath string
SnapshotPath string
FlushInterval time.Duration
}

// Engine aggregates ingest, query, and persistence logic.
// Engine is NOT safe for concurrent use by multiple goroutines in v1.
type Engine struct {
cfg EngineConfig
cellIndex *CellIndex
nodes *NodeStore
transitions *TransitionStore
states *EntityStateStore
wal *WAL
}

// NewEngine creates a new Engine and replays any WAL if present.
func NewEngine(cfg EngineConfig) (*Engine, error) {
e := &Engine{
cfg: cfg,
cellIndex: NewCellIndex(),
nodes: NewNodeStore(),
transitions: NewTransitionStore(),
states: NewEntityStateStore(),
}

if cfg.WALPath != "" {
wal, err := OpenWAL(cfg.WALPath)
if err != nil {
return nil, fmt.Errorf("open wal: %w", err)
}
e.wal = wal
if err := e.replayWAL(); err != nil {
return nil, err
}
}

return e, nil
}

// Close flushes and closes any persistence layers.
func (e *Engine) Close() error {
if e.wal != nil {
return e.wal.Close()
}
return nil
}

func (e *Engine) clampResolution(res Resolution) Resolution {
if e.cfg.MinResolution != 0 && res < e.cfg.MinResolution {
return e.cfg.MinResolution
}
if e.cfg.MaxResolution != 0 && res > e.cfg.MaxResolution {
return e.cfg.MaxResolution
}
return res
}

// IngestSample processes a single sample.
func (e *Engine) IngestSample(sample LocationSample) error {
return e.ingest([]LocationSample{sample}, true)
}

func validateSample(s LocationSample) error {
if s.EntityID == "" {
return errors.New("entity id is required")
}
if math.IsNaN(s.Lat) || math.IsNaN(s.Lon) || s.Lat < -90 || s.Lat > 90 || s.Lon < -180 || s.Lon > 180 {
return errors.New("invalid coordinates")
}
if s.Timestamp.IsZero() {
return errors.New("timestamp is required")
}
return nil
}

// IngestBatch processes multiple samples efficiently.
func (e *Engine) IngestBatch(samples []LocationSample) error {
return e.ingest(samples, true)
}

func (e *Engine) ingest(samples []LocationSample, logToWAL bool) error {
for i := range samples {
if err := validateSample(samples[i]); err != nil {
return err
}
}

if logToWAL && e.wal != nil {
if err := e.wal.Append(samples); err != nil {
return err
}
}

res := e.cfg.DefaultResolution
res = e.clampResolution(res)

for i := range samples {
sample := samples[i]
cellID, err := e.cellIndex.CellForCoord(sample.Lat, sample.Lon, res)
if err != nil {
return err
}

e.nodes.Update(cellID, res, sample)

prev, ok := e.states.Get(sample.EntityID)
if ok {
if prev.LastCell != cellID {
delta := sample.Timestamp.Sub(prev.LastTime)
e.transitions.Update(prev.LastCell, cellID, res, delta, sample.Timestamp)
}
}
e.states.Update(sample.EntityID, cellID, sample.Timestamp)
}

return nil
}

// QueryStigmaMap returns aggregates for the query bounds.
func (e *Engine) QueryStigmaMap(q StigmaMapQuery) (StigmaMap, error) {
res := e.cfg.DefaultResolution
if q.Resolution != nil {
res = e.clampResolution(*q.Resolution)
} else {
res = e.clampResolution(res)
}

cells, err := e.cellIndex.CellsInBBox(q.MinLat, q.MinLon, q.MaxLat, q.MaxLon, res)
if err != nil {
return StigmaMap{}, err
}

cellSet := make(map[CellID]struct{}, len(cells))
for _, c := range cells {
cellSet[c] = struct{}{}
}

nodes := e.nodes.TimeFiltered(q.StartTime, q.EndTime)
filteredNodes := make([]NodeAggregate, 0, len(nodes))
for _, n := range nodes {
if _, ok := cellSet[n.CellID]; !ok {
continue
}
if q.MinSampleCount > 0 && n.SampleCount < q.MinSampleCount {
continue
}
filteredNodes = append(filteredNodes, n)
}

result := StigmaMap{Nodes: filteredNodes}
if q.IncludeTransitions {
result.Transitions = e.transitions.FilterByCells(cellSet, q.StartTime, q.EndTime)
}

return result, nil
}

// GetCellStats returns aggregate for a single cell.
func (e *Engine) GetCellStats(cellID CellID) (NodeAggregate, bool) {
return e.nodes.Get(cellID)
}

// GetEntityPathSummary returns visitation summary for an entity.
func (e *Engine) GetEntityPathSummary(entityID string) (EntityPathSummary, bool) {
return e.states.PathSummary(entityID)
}

func (e *Engine) replayWAL() error {
entries, err := e.wal.ReadAll()
if err != nil {
return err
}
return e.ingest(entries, false)
}
119 changes: 119 additions & 0 deletions stigma/engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package stigma

import (
"os"
"path/filepath"
"testing"
"time"
)

func TestIngestSingleSampleUpdatesNode(t *testing.T) {
engine, err := NewEngine(EngineConfig{DefaultResolution: 1})
if err != nil {
t.Fatalf("failed to create engine: %v", err)
}
now := time.Now().UTC()
sample := LocationSample{EntityID: "e1", Lat: 1.0, Lon: 1.0, Timestamp: now, SpeedMps: 2.5}
if err := engine.IngestSample(sample); err != nil {
t.Fatalf("ingest sample: %v", err)
}

cell, _ := engine.cellIndex.CellForCoord(sample.Lat, sample.Lon, engine.cfg.DefaultResolution)
agg, ok := engine.GetCellStats(cell)
if !ok {
t.Fatalf("expected cell aggregate present")
}
if agg.SampleCount != 1 || agg.UniqueEntities != 1 {
t.Fatalf("unexpected counts: %+v", agg)
}
if agg.MaxSpeedMps != sample.SpeedMps || agg.AvgSpeedMps == 0 {
t.Fatalf("unexpected speed aggregates: %+v", agg)
}
}

func TestTransitionComputedAcrossCells(t *testing.T) {
engine, _ := NewEngine(EngineConfig{DefaultResolution: 1})
now := time.Now().UTC()
samples := []LocationSample{
{EntityID: "e1", Lat: 0, Lon: 0, Timestamp: now},
{EntityID: "e1", Lat: 10, Lon: 10, Timestamp: now.Add(10 * time.Second)},
}
if err := engine.IngestBatch(samples); err != nil {
t.Fatalf("ingest batch: %v", err)
}

transitions := engine.transitions.All()
if len(transitions) != 1 {
t.Fatalf("expected one transition, got %d", len(transitions))
}
tr := transitions[0]
if tr.TransitionCount != 1 || tr.AvgTravelTime != 10*time.Second || tr.MinTravelTime != 10*time.Second || tr.MaxTravelTime != 10*time.Second {
t.Fatalf("unexpected transition aggregate: %+v", tr)
}
}

func TestQueryStigmaMapFiltersBySampleCount(t *testing.T) {
engine, _ := NewEngine(EngineConfig{DefaultResolution: 1})
now := time.Now().UTC()
_ = engine.IngestBatch([]LocationSample{
{EntityID: "a", Lat: 0, Lon: 0, Timestamp: now},
{EntityID: "a", Lat: 0, Lon: 0, Timestamp: now.Add(time.Second)},
{EntityID: "b", Lat: 1, Lon: 1, Timestamp: now},
})

query := StigmaMapQuery{MinLat: -1, MinLon: -1, MaxLat: 2, MaxLon: 2, MinSampleCount: 2}
result, err := engine.QueryStigmaMap(query)
if err != nil {
t.Fatalf("query failed: %v", err)
}

if len(result.Nodes) != 1 {
t.Fatalf("expected one node after filtering, got %d", len(result.Nodes))
}
}

func TestWALReplayRestoresAggregates(t *testing.T) {
dir := t.TempDir()
walPath := filepath.Join(dir, "wal.log")

cfg := EngineConfig{DefaultResolution: 1, WALPath: walPath}
engine, err := NewEngine(cfg)
if err != nil {
t.Fatalf("create engine: %v", err)
}
now := time.Now().UTC()
if err := engine.IngestSample(LocationSample{EntityID: "x", Lat: 1, Lon: 1, Timestamp: now}); err != nil {
t.Fatalf("ingest with wal: %v", err)
}
_ = engine.Close()

// reopen and ensure sample is replayed
engine2, err := NewEngine(cfg)
if err != nil {
t.Fatalf("recreate engine: %v", err)
}
defer engine2.Close()

cell, _ := engine2.cellIndex.CellForCoord(1, 1, cfg.DefaultResolution)
if agg, ok := engine2.GetCellStats(cell); !ok || agg.SampleCount != 1 {
t.Fatalf("expected aggregate restored, got %#v", agg)
}

// ensure wal file exists
if _, err := os.Stat(walPath); err != nil {
t.Fatalf("wal file missing: %v", err)
}
}

func TestInvalidSamplesReturnErrors(t *testing.T) {
engine, _ := NewEngine(EngineConfig{DefaultResolution: 1})
err := engine.IngestSample(LocationSample{})
if err == nil {
t.Fatalf("expected validation error")
}

err = engine.IngestSample(LocationSample{EntityID: "a", Lat: -91, Lon: 0, Timestamp: time.Now()})
if err == nil {
t.Fatalf("expected invalid coordinate error")
}
}
Loading