Skip to content

Commit ecc4eb2

Browse files
ingestion: OTel decode/write histograms, per-partition Kafka concurrency, redactJSON early-exit (#412)
* ingestion: OTel histograms for decode + write phase Initialises a global MeterProvider in Start.startApp (nested inside the existing TracerProvider bracket so shutdown order is meter→tracer, both flushing pending exports via shutdown*Provider). Picks up the same OTEL_EXPORTER_OTLP_ENDPOINT / headers as traces by default, with the usual per-signal overrides. Adds Pkg.Metrics with two top-level NOINLINE instruments (monoscope.ingest.decode.duration, monoscope.ingest.write.duration, ms) plus a `timed` helper that brackets the action so latency is recorded on exception too — useful for the dual-write where a timeout is still latency we want in the distribution. ProcessMessage.processMessages now records the decode phase manually and wraps insertAndHandOff in `timed`. * Auto-format code with fourmolu * ingestion: per-partition Kafka group concurrency Group records by (topic, partition) instead of just topic and process groups via pooledForConcurrentlyN with a new kafkaGroupConcurrency knob (default 4). Each tpsFor still yields exactly one TP, so a failed group only stalls its own partition's commits. Bound stays well under the hasql pool (shared with web): the failure mode to avoid is AcquisitionTimeout → Hasql.isTransientException → no commit → redelivery storm. Single committer (the poll thread) — no concurrent commitPartitionsOffsets. * ingestion: symmetric Metrics.timed for decode + write, redactJSON elem idiom Wraps the decode block in Metrics.timed (replacing manual getMonotonicTime + recordMs) so both phases use the same bracket-on-exception path. Drops the GHC.Clock import. redactJSON: switches `any T.null ps` to `"" \`elem\` ps` — equivalent but reads as "have we consumed all path segments?" which matches intent. Removes TODO.md — belongs in PR/issue tracker, not the repo. * hpack: pick up Pkg.Metrics in test-dev other-modules After merging master (which added the test-dev stanza in #411), the auto-generated other-modules list was missing Pkg.Metrics — the stanza was hpack-generated before this branch's new module existed. * queue: use HashMap consistently for byTP + tpsFor, drop Data.Map import The rest of Pkg/Queue.hs uses HM throughout; (Text, Int32) and Int32 keys are both Hashable, so HashMap is a drop-in. tpsFor's group is always single-partition, so the map has size 1 — ordering doesn't matter. * queue: revert HashMap swap, KP.PartitionId has no Hashable instance The previous 79bd1bc assumed crPartition was Int32 (Hashable), but it's KP.PartitionId (Ord-only). Data.Map.Strict was load-bearing, not an inconsistency. Comment added to prevent the next reviewer from trying the same swap. --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 5bac013 commit ecc4eb2

6 files changed

Lines changed: 167 additions & 63 deletions

File tree

monoscope.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ library
200200
Pkg.ErrorMetrics
201201
Pkg.ExtractionWorker
202202
Pkg.Mail
203+
Pkg.Metrics
203204
Pkg.Parser
204205
Pkg.Parser.Expr
205206
Pkg.Parser.Stats
@@ -801,6 +802,7 @@ test-suite test-dev
801802
Pkg.ErrorMetrics
802803
Pkg.ExtractionWorker
803804
Pkg.Mail
805+
Pkg.Metrics
804806
Pkg.Parser
805807
Pkg.Parser.Expr
806808
Pkg.Parser.Stats

src/Pkg/Metrics.hs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
-- | Shared OpenTelemetry metric instruments.
2+
--
3+
-- Instruments are spec-safe to cache once and reuse globally, so we hold
4+
-- them as top-level 'unsafePerformIO' thunks rather than threading a
5+
-- 'Meter' through 'AuthContext'. The global 'MeterProvider' is initialised
6+
-- in 'Start.startApp' before any of these are forced; before init they
7+
-- silently no-op via the SDK's 'noopMeterProvider'.
8+
module Pkg.Metrics (
9+
ingestDecodeHist,
10+
ingestWriteHist,
11+
recordMs,
12+
timed,
13+
) where
14+
15+
import Effectful (Eff, IOE, (:>))
16+
import Effectful.Exception (bracket)
17+
import GHC.Clock (getMonotonicTime)
18+
import OpenTelemetry.Attributes (Attribute, Attributes, emptyAttributes, unsafeAttributesFromListIgnoringLimits)
19+
import OpenTelemetry.Metric.Core (
20+
Histogram (..),
21+
Meter (..),
22+
defaultAdvisoryParameters,
23+
getGlobalMeterProvider,
24+
getMeter,
25+
)
26+
import Relude
27+
import System.IO.Unsafe (unsafePerformIO)
28+
29+
30+
monoscopeMeter :: Meter
31+
monoscopeMeter = unsafePerformIO $ getGlobalMeterProvider >>= flip getMeter "monoscope"
32+
{-# NOINLINE monoscopeMeter #-}
33+
34+
35+
-- | Wall-time spent decoding/redacting/building span vectors in
36+
-- 'ProcessMessage.processMessages', per batch.
37+
ingestDecodeHist :: Histogram
38+
ingestDecodeHist =
39+
unsafePerformIO
40+
$ meterCreateHistogram monoscopeMeter "monoscope.ingest.decode.duration" (Just "ms") Nothing defaultAdvisoryParameters
41+
{-# NOINLINE ingestDecodeHist #-}
42+
43+
44+
-- | Wall-time spent in the PG + TF dual write
45+
-- ('Telemetry.insertAndHandOff'), per batch.
46+
ingestWriteHist :: Histogram
47+
ingestWriteHist =
48+
unsafePerformIO
49+
$ meterCreateHistogram monoscopeMeter "monoscope.ingest.write.duration" (Just "ms") Nothing defaultAdvisoryParameters
50+
{-# NOINLINE ingestWriteHist #-}
51+
52+
53+
recordMs :: MonadIO m => Histogram -> Double -> [(Text, Attribute)] -> m ()
54+
recordMs h v attrs = liftIO $ histogramRecord h v (toAttrs attrs)
55+
56+
57+
-- | Time an action and record its wall-time (ms) on @hist@. Uses
58+
-- 'bracket' so the measurement fires on both success and exception —
59+
-- useful when you want failures represented in the distribution (e.g.
60+
-- DB write latency where a timeout is still latency). Pass @[]@ for
61+
-- no attributes.
62+
timed :: IOE :> es => Histogram -> [(Text, Attribute)] -> Eff es a -> Eff es a
63+
timed hist attrs act =
64+
bracket
65+
(liftIO getMonotonicTime)
66+
(\t0 -> liftIO getMonotonicTime >>= \t1 -> recordMs hist ((t1 - t0) * 1000) attrs)
67+
(const act)
68+
69+
70+
toAttrs :: [(Text, Attribute)] -> Attributes
71+
toAttrs [] = emptyAttributes
72+
toAttrs xs = unsafeAttributesFromListIgnoringLimits xs
73+
{-# INLINE toAttrs #-}

src/Pkg/Queue.hs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ module Pkg.Queue (pubsubService, kafkaService, publishJSONToKafka, publishToDead
33
import Control.Exception.Annotated (checkpoint)
44
import Control.Lens ((^?), _Just)
55
import Control.Lens qualified as L
6-
import Control.Monad.Extra (concatMapM)
76
import Control.Monad.Trans.Resource (runResourceT)
87
import Data.Aeson qualified as AE
98
import Data.Annotation (toAnnotation)
@@ -35,6 +34,7 @@ import System.IO.Unsafe (unsafePerformIO)
3534
import System.Tracing (SpanStatus (..), addEvent, setStatus, withSpan)
3635
import System.Types (ATBackgroundCtx, runBackground)
3736
import UnliftIO (throwIO)
37+
import UnliftIO.Async (pooledForConcurrentlyN)
3838
import UnliftIO.Exception (bracket, try, tryAny)
3939
import UnliftIO.MVar (modifyMVar)
4040

@@ -239,16 +239,12 @@ kafkaService appLogger appCtx tp kafkaTopics batchSize fn = checkpoint "kafkaSer
239239
$ LogBase.logAttention "Kafka poll returned errors"
240240
$ AE.object ["error_count" AE..= length leftRecords, "errors" AE..= map show leftRecords]
241241

242-
-- Group by topic: a single poll can interleave records from multiple
243-
-- subscribed topics; each group needs its own ce-type and attributes.
244-
-- Per-partition commit: only topic-groups whose batch succeeded
245-
-- (or whose poison content was successfully DLQ'd) contribute to
246-
-- the offset commit. Failed groups leave their partitions alone;
247-
-- librdkafka redelivers them next poll, surfacing the outage as
248-
-- consumer-group lag on those partitions only — healthy
249-
-- partitions advance unaffected.
250-
let byTopic = foldr (\r -> HM.insertWith (<>) r.crTopic.unTopicName (r :| [])) HM.empty rightRecords
251-
processGroup (topic, neRecords@(recc :| _)) = do
242+
-- Group per (topic, partition) so groups can run concurrently
243+
-- below while each tpsFor still yields exactly one TP — failed
244+
-- groups stall only their own partition's commits.
245+
-- Map (not HashMap): KP.PartitionId has Ord but no Hashable instance.
246+
let byTP = foldr (\r -> Map.insertWith (<>) (r.crTopic.unTopicName, r.crPartition) (r :| [])) Map.empty rightRecords
247+
processGroup ((topic, _partition), neRecords@(recc :| _)) = do
252248
let headers = consumerRecordHeadersToHashMap recc
253249
ceType = ceTypeFor appCtx.config.kafkaDeadLetterTopic topic headers
254250
attributes = HM.insert "ce-type" ceType headers
@@ -293,7 +289,11 @@ kafkaService appLogger appCtx tp kafkaTopics batchSize fn = checkpoint "kafkaSer
293289
pure $ case dlqRes of
294290
Right _ -> successTps
295291
Left _ -> []
296-
tps <- concatMapM processGroup (HM.toList byTopic)
292+
-- Bound must stay well under the hasql pool (shared with web) —
293+
-- AcquisitionTimeout (→ Hasql.isTransientException → no commit →
294+
-- redelivery storm) is the failure mode to avoid. Single committer
295+
-- (this thread) below — no concurrent commitPartitionsOffsets.
296+
tps <- concat <$> pooledForConcurrentlyN appCtx.config.kafkaGroupConcurrency (Map.toList byTP) processGroup
297297

298298
-- No poll-thread backoff: lag growth is the outage signal under
299299
-- per-partition commits. A reintroduced `threadDelay` here races

src/ProcessMessage.hs

Lines changed: 64 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import Models.Projects.Projects qualified as Projects
5757
import Models.Telemetry.Telemetry (Context (trace_state), OtelLogsAndSpans (..), generateSummary)
5858
import Models.Telemetry.Telemetry qualified as Telemetry
5959
import Pkg.DeriveUtils (AesonText (..), UUIDId (..), unAesonTextMaybe)
60+
import Pkg.Metrics qualified as Metrics
6061
import Pkg.SchemaLearning.Catalog qualified as Catalog
6162
import Pkg.SchemaLearning.Catalog qualified as Fields
6263
import Pkg.SchemaLearning.Hot qualified as SchemaHot
@@ -138,45 +139,40 @@ processMessages
138139
processMessages [] _ = pure []
139140
processMessages msgs attrs = do
140141
appCtx <- Eff.ask @AuthContext
141-
rMsgs <-
142-
catMaybes <$> forM msgs \(ackId, msg) -> case AE.eitherDecodeStrict (BS.filter (/= 0) msg) of
143-
Left err -> Nothing <$ Log.logAttention "Error parsing json msgs" (object ["AckId" .= ackId, "Error" .= err, "OriginalMsg" .= decodeUtf8 @Text msg])
144-
Right m -> pure $ Just (ackId, BS.length msg, m)
145-
146-
if null rMsgs
147-
then pure []
148-
else do
149-
projectCaches <-
150-
HM.fromList <$> liftIO do
151-
-- Use parallel evaluation for cache fetching
152-
let projectIds = (\(_, _, m) -> UUIDId m.projectId) <$> rMsgs
153-
cachePairs <- forM projectIds $ \pid ->
154-
Cache.fetchWithCache appCtx.projectCache pid $ \pid' -> do
155-
mpjCache <- Projects.projectCacheByIdIO appCtx.hasqlJobsPool pid'
156-
pure $! fromMaybe Projects.defaultProjectCache mpjCache
157-
pure $! zip projectIds cachePairs
158-
159-
spans <- forM rMsgs \(rmAckId, rawSize, msg) -> do
160-
let pid = UUIDId msg.projectId
161-
!msgSize = fromIntegral rawSize
162-
case HM.lookup pid projectCaches of
163-
Just cache ->
164-
let !totalDailyEvents = fromIntegral cache.dailyEventCount + fromIntegral cache.dailyMetricCount
165-
!isFreeTier = cache.paymentPlan == "Free"
166-
!hasExceededLimit = isFreeTier && totalDailyEvents >= freeTierDailyMaxEvents
167-
in if hasExceededLimit
168-
then pure Nothing
169-
else do
170-
spanId <- UUID.genUUID
171-
trId <- UUID.toText <$> UUID.genUUID
172-
let !span' = convertRequestMessageToSpan msg msgSize (spanId, trId)
173-
pure (Just span')
174-
Nothing -> pure Nothing
175-
176-
let !spanVec = V.fromList (catMaybes spans)
177-
Telemetry.insertAndHandOff appCtx.env.enableTimefusionWrites appCtx.extractionWorker projectCaches spanVec
178-
179-
pure $ map fst3 rMsgs
142+
(rMsgs, mWrite) <- Metrics.timed Metrics.ingestDecodeHist [] do
143+
rMsgs <-
144+
catMaybes <$> forM msgs \(ackId, msg) -> case AE.eitherDecodeStrict (BS.filter (/= 0) msg) of
145+
Left err -> Nothing <$ Log.logAttention "Error parsing json msgs" (object ["AckId" .= ackId, "Error" .= err, "OriginalMsg" .= decodeUtf8 @Text msg])
146+
Right m -> pure $ Just (ackId, BS.length msg, m)
147+
if null rMsgs
148+
then pure (rMsgs, Nothing)
149+
else do
150+
projectCaches <-
151+
liftIO $ HM.fromList <$> forM (ordNub $ (\(_, _, m) -> UUIDId m.projectId) <$> rMsgs) \pid -> do
152+
cache <-
153+
Cache.fetchWithCache appCtx.projectCache pid
154+
$ fmap (fromMaybe Projects.defaultProjectCache)
155+
. Projects.projectCacheByIdIO appCtx.hasqlJobsPool
156+
pure (pid, cache)
157+
158+
spans <- forM rMsgs \(rmAckId, rawSize, msg) -> runMaybeT do
159+
let pid = UUIDId msg.projectId
160+
!msgSize = fromIntegral rawSize
161+
cache <- hoistMaybe $ HM.lookup pid projectCaches
162+
let !totalDailyEvents = fromIntegral cache.dailyEventCount + fromIntegral cache.dailyMetricCount
163+
!isFreeTier = cache.paymentPlan == "Free"
164+
guard $ not (isFreeTier && totalDailyEvents >= freeTierDailyMaxEvents)
165+
spanId <- lift UUID.genUUID
166+
trId <- lift $ UUID.toText <$> UUID.genUUID
167+
pure $! convertRequestMessageToSpan msg msgSize (spanId, trId)
168+
169+
pure (rMsgs, Just (projectCaches, V.fromList (catMaybes spans)))
170+
171+
forM_ mWrite \(projectCaches, spanVec) ->
172+
Metrics.timed Metrics.ingestWriteHist []
173+
$ Telemetry.insertAndHandOff appCtx.env.enableTimefusionWrites appCtx.extractionWorker projectCaches spanVec
174+
175+
pure $ map fst3 rMsgs
180176

181177

182178
-- | Process a single span to extract entities for hash-stamping.
@@ -631,6 +627,10 @@ instance {-# OVERLAPPING #-} AE.FromJSON (Either Text [Text]) where
631627

632628
-- | Walk the JSON once, redact any fields which are in the list of json paths to be redacted.
633629
--
630+
-- Empty paths short-circuits to structural sharing: subtrees with no live path
631+
-- are returned by pointer instead of rebuilt. Key matching requires a @.@
632+
-- boundary (tightening over a prior prefix-match implementation).
633+
--
634634
-- >>> redactJSON (V.fromList ["menu.id."]) [aesonQQ| {"menu":{"id":"file", "name":"John"}} |]
635635
-- Object (fromList [("menu",Object (fromList [("id",String "[REDACTED]"),("name",String "John")]))])
636636
--
@@ -642,19 +642,33 @@ instance {-# OVERLAPPING #-} AE.FromJSON (Either Text [Text]) where
642642
--
643643
-- >>> redactJSON (V.fromList ["menu.[].id", "menu.[].names.[]"]) [aesonQQ| {"menu":[{"id":"i1", "names":["John","okon"]}, {"id":"i2"}]} |]
644644
-- Object (fromList [("menu",Array [Object (fromList [("id",String "[REDACTED]"),("names",Array [String "[REDACTED]",String "[REDACTED]"])]),Object (fromList [("id",String "[REDACTED]")])])])
645+
--
646+
-- Prefix without a path-boundary does NOT match:
647+
--
648+
-- >>> redactJSON (V.fromList ["password"]) [aesonQQ| {"pass":{"word":"secret"}} |]
649+
-- Object (fromList [("pass",Object (fromList [("word",String "secret")]))])
645650
redactJSON :: V.Vector Text -> AE.Value -> AE.Value
646-
redactJSON paths' = redactJSON' (map stripPrefixDot $ V.toList paths')
651+
redactJSON ps0 = go [fromMaybe p (T.stripPrefix "." p) | p <- V.toList ps0]
647652
where
648-
redactJSON' !paths value = case value of
649-
AET.String v -> if any T.null paths then AET.String "[REDACTED]" else AET.String v
650-
AET.Number v -> if any T.null paths then AET.String "[REDACTED]" else AET.Number v
651-
AET.Null -> AET.Null
652-
AET.Bool v -> AET.Bool v
653-
AET.Object objMap -> AET.Object $ AEKM.mapWithKey (\k v -> redactJSON' (matchKey (AEK.toText k) paths) v) objMap
654-
AET.Array jsonList -> AET.Array $ V.map (redactJSON' (mapMaybe (\path -> T.stripPrefix "[]." path <|> T.stripPrefix "[]" path) paths)) jsonList
655-
656-
matchKey !k = mapMaybe (\path -> T.stripPrefix (k <> ".") path <|> T.stripPrefix k path)
657-
stripPrefixDot !p = fromMaybe p (T.stripPrefix "." p)
653+
go [] v = v
654+
go ps v = case v of
655+
AET.Object om -> AET.Object $ AEKM.mapWithKey (\k -> go (mapMaybe (matchKey (AEK.toText k)) ps)) om
656+
AET.Array xs
657+
| null cps -> v
658+
| otherwise -> AET.Array $ V.map (go cps) xs
659+
where
660+
cps = mapMaybe (\p -> T.stripPrefix "[]." p <|> T.stripPrefix "[]" p) ps
661+
AET.String{} | "" `elem` ps -> AET.String "[REDACTED]"
662+
AET.Number{} | "" `elem` ps -> AET.String "[REDACTED]"
663+
_ -> v
664+
665+
-- Match @k@ against a path on a @.@ boundary, returning the remainder.
666+
-- Avoids allocating @k <> "."@ per (key, path) pair.
667+
matchKey !k path =
668+
T.stripPrefix k path >>= \rest -> case T.uncons rest of
669+
Nothing -> Just ""
670+
Just ('.', rest') -> Just rest'
671+
_ -> Nothing
658672

659673

660674
-- valueToFields takes an aeson object and converts it into a vector of paths to

src/Start.hs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ where
55

66
import Configuration.Dotenv qualified as Dotenv
77
import Control.Exception.Safe qualified as Safe
8+
import OpenTelemetry.Metric (initializeGlobalMeterProvider, shutdownMeterProvider)
89
import OpenTelemetry.Trace (Tracer, TracerOptions (..), getGlobalTracerProvider, initializeGlobalTracerProvider, makeTracer, shutdownTracerProvider)
910
import Relude
1011
import System.Server qualified as Server
@@ -13,11 +14,23 @@ import System.Server qualified as Server
1314
startApp :: IO ()
1415
startApp = do
1516
_ <- Safe.try (Dotenv.loadFile Dotenv.defaultConfig) :: IO (Either SomeException ())
16-
withTracer \tracer -> getGlobalTracerProvider >>= Server.runMonoscope
17+
withTracer \_tracer ->
18+
withMeter $ getGlobalTracerProvider >>= Server.runMonoscope
1719
where
1820
withTracer :: ((TracerOptions -> Tracer) -> IO c) -> IO c
1921
withTracer f =
2022
Safe.bracket
2123
initializeGlobalTracerProvider
2224
(\tp -> void $ shutdownTracerProvider tp Nothing)
2325
(\tracerProvider -> f $ makeTracer tracerProvider "monoscope-server")
26+
27+
-- Metric instruments live in 'Pkg.Metrics' and pull from the global
28+
-- meter provider lazily; init here so the global is set before the
29+
-- first 'histogramRecord'. Reads OTEL_METRICS_EXPORTER / interval env
30+
-- vars (defaults: otlp, 60s).
31+
withMeter :: IO c -> IO c
32+
withMeter action =
33+
Safe.bracket
34+
initializeGlobalMeterProvider
35+
(\mp -> void $ shutdownMeterProvider mp Nothing)
36+
(\_ -> action)

src/System/Config.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ data EnvConfig = EnvConfig
5656
, kafkaUsername :: Text
5757
, kafkaPassword :: Text
5858
, enableKafkaService :: Bool
59+
, kafkaGroupConcurrency :: Int
5960
, smtpHost :: Text
6061
, smtpPort :: Int
6162
, smtpTls :: Bool
@@ -213,6 +214,7 @@ instance DefConfig EnvConfig where
213214
, postmarkFromEmail = "hello@monoscope.tech"
214215
, openaiModel = "gpt-5.4-mini"
215216
, openaiSmallModel = "gpt-5.4-nano"
217+
, kafkaGroupConcurrency = 4
216218
, extractionWorkerShards = 4
217219
, extractionQueueCapacity = 64
218220
, drainFlushBatchSize = 1000

0 commit comments

Comments
 (0)