@@ -400,3 +400,186 @@ func (d *DataAccessService) calculateValidatorDashboardBalance(ctx context.Conte
400400 }
401401 return balances , nil
402402}
403+
404+ func (d * DataAccessService ) GetLatestExportedChartTs (ctx context.Context , aggregation enums.ChartAggregation ) (uint64 , error ) {
405+ view , dateColumn , err := d .getViewAndDateColumn (aggregation )
406+ if err != nil {
407+ return 0 , err
408+ }
409+
410+ query := fmt .Sprintf (`SELECT max(%s) FROM %s` , dateColumn , view )
411+ var ts time.Time
412+ err = d .clickhouseReader .GetContext (ctx , & ts , query )
413+ if err != nil {
414+ return 0 , fmt .Errorf ("error retrieving latest exported chart timestamp: %w" , err )
415+ }
416+
417+ return uint64 (ts .Unix ()), nil
418+ }
419+
420+ func (d * DataAccessService ) getCurrentAndUpcomingSyncCommittees (ctx context.Context , latestEpoch uint64 ) (map [uint64 ]bool , map [uint64 ]bool , error ) {
421+ currentSyncCommitteeValidators := make (map [uint64 ]bool )
422+ upcomingSyncCommitteeValidators := make (map [uint64 ]bool )
423+
424+ currentSyncPeriod := utils .SyncPeriodOfEpoch (latestEpoch )
425+ ds := goqu .Dialect ("postgres" ).
426+ Select (
427+ goqu .L ("validatorindex" ),
428+ goqu .L ("period" )).
429+ From ("sync_committees" ).
430+ Where (goqu .L ("period IN (?, ?)" , currentSyncPeriod , currentSyncPeriod + 1 ))
431+
432+ var queryResult []struct {
433+ ValidatorIndex uint64 `db:"validatorindex"`
434+ Period uint64 `db:"period"`
435+ }
436+
437+ query , args , err := ds .Prepared (true ).ToSQL ()
438+ if err != nil {
439+ return nil , nil , fmt .Errorf ("error preparing query: %w" , err )
440+ }
441+
442+ err = d .readerDb .SelectContext (ctx , & queryResult , query , args ... )
443+ if err != nil {
444+ return nil , nil , fmt .Errorf ("error retrieving sync committee current and next period data: %w" , err )
445+ }
446+
447+ for _ , queryEntry := range queryResult {
448+ if queryEntry .Period == currentSyncPeriod {
449+ currentSyncCommitteeValidators [queryEntry .ValidatorIndex ] = true
450+ } else {
451+ upcomingSyncCommitteeValidators [queryEntry .ValidatorIndex ] = true
452+ }
453+ }
454+
455+ return currentSyncCommitteeValidators , upcomingSyncCommitteeValidators , nil
456+ }
457+
458+ func (d * DataAccessService ) getEpochStart (ctx context.Context , period enums.TimePeriod ) (uint64 , error ) {
459+ clickhouseTable , _ , err := d .getTablesForPeriod (period )
460+ if err != nil {
461+ return 0 , err
462+ }
463+
464+ ds := goqu .Dialect ("postgres" ).
465+ Select (goqu .L ("epoch_start" )).
466+ From (goqu .L (fmt .Sprintf ("%s FINAL" , clickhouseTable ))).
467+ Order (goqu .L ("epoch_start" ).Asc ()).
468+ Limit (1 )
469+
470+ query , args , err := ds .Prepared (true ).ToSQL ()
471+ if err != nil {
472+ return 0 , fmt .Errorf ("error preparing query: %w" , err )
473+ }
474+
475+ var epochStart uint64
476+ err = d .clickhouseReader .GetContext (ctx , & epochStart , query , args ... )
477+ if err != nil {
478+ return 0 , fmt .Errorf ("error retrieving cutoff epoch for past sync committees: %w" , err )
479+ }
480+
481+ return epochStart , nil
482+ }
483+
484+ func (d * DataAccessService ) getPastSyncCommittees (ctx context.Context , indicies []uint64 , epochStart uint64 , latestEpoch uint64 ) (map [uint64 ]uint64 , error ) {
485+ pastSyncPeriodCutoff := utils .SyncPeriodOfEpoch (epochStart )
486+ currentSyncPeriod := utils .SyncPeriodOfEpoch (latestEpoch )
487+
488+ // Get the past sync committee validators
489+ ds := goqu .Dialect ("postgres" ).
490+ Select (
491+ goqu .L ("sc.validatorindex" )).
492+ From (goqu .L ("sync_committees sc" )).
493+ Where (goqu .L ("period >= ? AND period < ? AND validatorindex = ANY(?)" , pastSyncPeriodCutoff , currentSyncPeriod , pq .Array (indicies )))
494+
495+ query , args , err := ds .Prepared (true ).ToSQL ()
496+ if err != nil {
497+ return nil , fmt .Errorf ("error preparing query: %w" , err )
498+ }
499+
500+ var validatorIndices []uint64
501+ err = d .alloyReader .SelectContext (ctx , & validatorIndices , query , args ... )
502+ if err != nil {
503+ return nil , fmt .Errorf ("error retrieving data for past sync committees: %w" , err )
504+ }
505+
506+ validatorCountMap := make (map [uint64 ]uint64 )
507+ for _ , validatorIndex := range validatorIndices {
508+ validatorCountMap [validatorIndex ]++
509+ }
510+
511+ return validatorCountMap , nil
512+ }
513+
514+ func (d * DataAccessService ) getTablesForPeriod (period enums.TimePeriod ) (string , int , error ) {
515+ table := ""
516+ hours := 0
517+
518+ switch period {
519+ case enums .TimePeriods .Last1h :
520+ table = "validator_dashboard_data_rolling_1h"
521+ hours = 1
522+ case enums .TimePeriods .Last24h :
523+ table = "validator_dashboard_data_rolling_24h"
524+ hours = 24
525+ case enums .TimePeriods .Last7d :
526+ table = "validator_dashboard_data_rolling_7d"
527+ hours = 7 * 24
528+ case enums .TimePeriods .Last30d :
529+ table = "validator_dashboard_data_rolling_30d"
530+ hours = 30 * 24
531+ case enums .TimePeriods .AllTime :
532+ table = "validator_dashboard_data_rolling_total"
533+ hours = - 1
534+ default :
535+ return "" , 0 , fmt .Errorf ("not-implemented time period: %v" , period )
536+ }
537+
538+ return table , hours , nil
539+ }
540+
541+ func (d * DataAccessService ) getTableAndDateColumn (aggregation enums.ChartAggregation ) (string , string , error ) {
542+ var table , dateColumn string
543+
544+ switch aggregation {
545+ case enums .IntervalEpoch :
546+ table = "validator_dashboard_data_epoch"
547+ dateColumn = "epoch_timestamp"
548+ case enums .IntervalHourly :
549+ table = "validator_dashboard_data_hourly"
550+ dateColumn = "t"
551+ case enums .IntervalDaily :
552+ table = "validator_dashboard_data_daily"
553+ dateColumn = "t"
554+ case enums .IntervalWeekly :
555+ table = "validator_dashboard_data_weekly"
556+ dateColumn = "t"
557+ default :
558+ return "" , "" , fmt .Errorf ("unexpected aggregation type: %v" , aggregation )
559+ }
560+
561+ return table , dateColumn , nil
562+ }
563+
564+ func (d * DataAccessService ) getViewAndDateColumn (aggregation enums.ChartAggregation ) (string , string , error ) {
565+ var view , dateColumn string
566+
567+ switch aggregation {
568+ case enums .IntervalEpoch :
569+ view = "view_validator_dashboard_data_epoch_max_ts"
570+ dateColumn = "t"
571+ case enums .IntervalHourly :
572+ view = "view_validator_dashboard_data_hourly_max_ts"
573+ dateColumn = "t"
574+ case enums .IntervalDaily :
575+ view = "view_validator_dashboard_data_daily_max_ts"
576+ dateColumn = "t"
577+ case enums .IntervalWeekly :
578+ view = "view_validator_dashboard_data_weekly_max_ts"
579+ dateColumn = "t"
580+ default :
581+ return "" , "" , fmt .Errorf ("unexpected aggregation type: %v" , aggregation )
582+ }
583+
584+ return view , dateColumn , nil
585+ }
0 commit comments