@@ -130,6 +130,7 @@ async function handleBurnBlockMessage(
130
130
burnchainBlockHeight : burnBlockMsg . burn_block_height ,
131
131
slotHolders : slotHolders ,
132
132
} ) ;
133
+ await db . updateBurnChainBlockHeight ( { blockHeight : burnBlockMsg . burn_block_height } ) ;
133
134
}
134
135
135
136
async function handleMempoolTxsMessage ( rawTxs : string [ ] , db : PgWriteStore ) : Promise < void > {
@@ -631,18 +632,32 @@ interface EventMessageHandler {
631
632
handleNewAttachment ( msg : CoreNodeAttachmentMessage [ ] , db : PgWriteStore ) : Promise < void > | void ;
632
633
}
633
634
634
- function createMessageProcessorQueue ( ) : EventMessageHandler {
635
+ function createMessageProcessorQueue ( db : PgWriteStore ) : EventMessageHandler {
635
636
// Create a promise queue so that only one message is handled at a time.
636
637
const processorQueue = new PQueue ( { concurrency : 1 } ) ;
637
638
638
- let eventTimer : prom . Histogram < 'event' > | undefined ;
639
+ let metrics :
640
+ | {
641
+ eventTimer : prom . Histogram ;
642
+ blocksInPreviousBurnBlock : prom . Gauge ;
643
+ }
644
+ | undefined ;
639
645
if ( isProdEnv ) {
640
- eventTimer = new prom . Histogram ( {
641
- name : 'stacks_event_ingestion_timers' ,
642
- help : 'Event ingestion timers' ,
643
- labelNames : [ 'event' ] ,
644
- buckets : prom . exponentialBuckets ( 50 , 3 , 10 ) , // 10 buckets, from 50 ms to 15 minutes
645
- } ) ;
646
+ metrics = {
647
+ eventTimer : new prom . Histogram ( {
648
+ name : 'stacks_event_ingestion_timers' ,
649
+ help : 'Event ingestion timers' ,
650
+ labelNames : [ 'event' ] ,
651
+ buckets : prom . exponentialBuckets ( 50 , 3 , 10 ) , // 10 buckets, from 50 ms to 15 minutes
652
+ } ) ,
653
+ blocksInPreviousBurnBlock : new prom . Gauge ( {
654
+ name : 'stacks_blocks_in_previous_burn_block' ,
655
+ help : 'Number of Stacks blocks produced in the previous burn block' ,
656
+ async collect ( ) {
657
+ this . set ( await db . getStacksBlockCountAtPreviousBurnBlock ( ) ) ;
658
+ } ,
659
+ } ) ,
660
+ } ;
646
661
}
647
662
648
663
const observeEvent = async ( event : string , fn : ( ) => Promise < void > ) => {
@@ -651,7 +666,7 @@ function createMessageProcessorQueue(): EventMessageHandler {
651
666
await fn ( ) ;
652
667
} finally {
653
668
const elapsedMs = timer . getElapsed ( ) ;
654
- eventTimer ? .observe ( { event } , elapsedMs ) ;
669
+ metrics ?. eventTimer . observe ( { event } , elapsedMs ) ;
655
670
}
656
671
} ;
657
672
@@ -738,7 +753,7 @@ export async function startEventServer(opts: {
738
753
serverPort ?: number ;
739
754
} ) : Promise < EventStreamServer > {
740
755
const db = opts . datastore ;
741
- const messageHandler = opts . messageHandler ?? createMessageProcessorQueue ( ) ;
756
+ const messageHandler = opts . messageHandler ?? createMessageProcessorQueue ( db ) ;
742
757
743
758
let eventHost = opts . serverHost ?? process . env [ 'STACKS_CORE_EVENT_HOST' ] ;
744
759
const eventPort = opts . serverPort ?? parseInt ( process . env [ 'STACKS_CORE_EVENT_PORT' ] ?? '' , 10 ) ;
0 commit comments