This document covers the new features added post-MVP:
- Sentinel Integration — Predictive fire spread & triangulation
- WebSocket Control — Bidirectional valve commands
- SQLite Persistence — Durable data storage
Ensure Sentinel API is running on your network:
# In Sentinel repo
docker compose -f infra/docker/docker-compose.dev.yml up -d
# API available at http://localhost:8000Set the Sentinel endpoint in your environment:
export SENTINEL_API_URL=http://localhost:8000When FlowIQ detects anomalies, it automatically queries Sentinel for spread predictions:
curl http://localhost:4000/flow/predictionResponse:
{
"hasPrediction": true,
"prediction": {
"simulation_id": "sim_12345",
"total_area_hectares": 100.0,
"max_spread_rate_mph": 5.5,
"isochrones": [
{
"hours_from_start": 6,
"area_hectares": 50.0,
"perimeter_km": 2.5
}
],
"confidence": {
"overall_confidence": 0.8,
"weather_confidence": 0.9,
"fuel_confidence": 0.7,
"terrain_confidence": 0.8
}
},
"anomalyCount": 3,
"cached": false
}When multiple cameras detect the same object, call triangulation:
// In your module
import { sentinelClient } from '../sentinel-client';
const results = await sentinelClient.triangulate({
observations: [
{
device_id: "cam-101",
timestamp: "2024-01-01T00:00:00Z",
device_latitude: 40.0,
device_longitude: -120.0,
device_altitude: 1000.0,
camera_heading: 0,
camera_pitch: 0,
bearing: 45,
confidence: 0.9,
detection_id: "det_001"
}
]
});Sentinel prediction results are emitted on SSE:
curl http://localhost:4000/events
# Listen for:
event: anomaly.detected
data: {"type":"anomaly.detected","assetId":"flow-system","anomalyType":"spread_prediction","confidence":0.8,"at":1701000000000}const ws = new WebSocket("ws://localhost:4000/control");
ws.onopen = () => {
console.log("Connected to Plainview control channel");
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === "connected") {
console.log("Features:", msg.features);
// Send commands
}
};ws.send(JSON.stringify({
type: "valve.actuate",
valveId: "v-101"
}));
// Response:
// {
// "type": "ack",
// "command": { "type": "valve.actuate", "valveId": "v-101" },
// "timestamp": "2024-01-01T00:00:00Z"
// }After connecting, receive live events:
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === "event") {
console.log("Event:", msg.data);
// {
// "type": "valve.actuation.completed",
// "valveId": "v-101",
// "torqueNm": 52,
// "completedAt": 1701000000000
// }
}
};Plainview now uses SQLite with Drizzle ORM for persistent storage.
# Database file: data/plainview.db
npm run dev -w @plainview/api
# On startup, schema is auto-created
# ✓ Database initialized successfullyexport DATABASE_PATH=/path/to/plainview.db
npm run dev -w @plainview/apiAll domain entities are persisted:
- valves — equipment with health metrics
- valve_actuations — historical actuation records
- detections — camera/sensor observations
- alerts — system alerts with status
- incidents — correlated events with timeline
- flow_metrics — time-series flow data
- anomalies — detected deviations
- leak_detections — pipeline leaks with history
import { db, valves, eq } from './db';
// Get valve by ID
const valve = await db.query.valves.findFirst({
where: eq(valves.id, "v-101")
});
// List all active incidents
const activeIncidents = await db.query.incidents.findMany({
where: eq(incidents.status, "active")
});
// Get recent metrics
const metrics = await db.query.flowMetrics.findMany({
orderBy: desc(flowMetrics.createdAt),
limit: 100
});Data is automatically persisted when events occur:
- Valve actuation → saved to
valve_actuations - Detection generated → saved to
detections - Alert created → saved to
alerts - Incident correlated → saved to
incidents
Dashboard queries fetch from DB for historical analysis.
┌─────────────────────────────────────────────────────┐
│ Plainview API (Fastify) │
├─────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Domain Modules │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ ValveOps │ │PipelineG │ │ FlowIQ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Event Bus (SSE + WebSocket) │ │
│ └─────────────────────────────────────────────┘ │
│ ↙ ↘ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ SSE /events │ │ WS /control │ │
│ │ (read) │ │ (bidirectional) │
│ └──────────────┘ └──────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Sentinel Client (Prediction/Triangulation) │ │
│ │ ↓ HTTP calls ↓ │ │
│ │ Sentinel API (:8000) │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ SQLite DB (data/plainview.db) │ │
│ │ via Drizzle ORM │ │
│ └─────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
↓ ↓ ↓
Dashboard Mobile CLI External Integrations
- ROS2 bridge for hardware nodes (Roustabout / Rigsight)
- TimescaleDB connector for high-volume metrics
- Mission replay UI with AI co-pilot
- RBAC authentication (JWT)
- GraphQL API layer
- Multi-site federation
- Edge agent software stack
- Real-time anomaly models (ML)
- Custom webhook/IFTTT actions
If Sentinel is down, predictions gracefully fail:
{
"hasPrediction": false,
"reason": "Sentinel unavailable"
}No production impact; local modules continue normally.
Check firewall rules allow WebSocket upgrade:
# Test WebSocket handshake
curl -i -N \
-H "Connection: Upgrade" \
-H "Upgrade: websocket" \
-H "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" \
-H "Sec-WebSocket-Version: 13" \
http://localhost:4000/controlSQLite can lock if multiple processes write simultaneously. Use WAL mode (default):
# Check database
file data/plainview.db*
# data/plainview.db: SQLite 3.x database
# data/plainview.db-shm: shared memory WAL file
# data/plainview.db-wal: write-ahead logGET /valves— list allGET /valves/:id— detailGET /valves/:id/health— health reportPOST /valves/:id/actuate— actuate valve
GET /pipeline/alerts— active leaksGET /pipeline/health— integrity scoreGET /pipeline/sections/:section— section detailPOST /pipeline/alerts/:id/resolve— resolve leak
GET /flow/health— current metrics + anomaliesGET /flow/metrics— history + statsGET /flow/anomalies— filtered anomaliesGET /flow/prediction— Sentinel spread prediction
GET /rig/cameras— list camerasGET /rig/cameras/:id— detailGET /rig/detections— historical detections
GET /incidents— active + recentGET /incidents/:id— detailPOST /incidents/:id/update— update statusGET /incidents/:id/timeline— chronological view
Message format:
{
"type": "valve.actuate" | "detection.ack" | "alert.resolve",
"valveId": "v-101",
"...": "other fields"
}Response:
{
"type": "ack" | "event",
"timestamp": "ISO-8601",
"data": {}
}Connect: curl http://localhost:4000/events
Event types:
valve.actuation.requestedvalve.actuation.completedalert.createdalert.acknowledgeddetection.madeflow.metrics.updatedanomaly.detectedincident.createdincident.updated
To add new features:
- Define schema in
src/db/schema.ts - Create Drizzle operations in module
- Emit events via
bus.emit('event', ...) - Update this guide
Built with ❤️ for oil & gas autonomy.