Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.digitalasset.canton.resource.DbStorage
import com.digitalasset.canton.topology.PartyId
import com.digitalasset.daml.lf.data.Time.Timestamp as LfTimestamp
import com.google.cloud.bigquery as bq
import bq.{Field, JobInfo, Schema, TableId}
import bq.{Field, FieldValueList, JobInfo, Schema, TableId, TableResult}
import bq.storage.v1.{JsonStreamWriter, TableSchema}
import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.*
import slick.jdbc.GetResult
Expand Down Expand Up @@ -83,6 +83,7 @@ class ScanTotalSupplyBigQueryIntegrationTest
// The peak is 17 transactions in a (simulated) minute, or 0.28333 tps over a minute,
// so we assert 15-20 transactions, or 0.25-0.34 tps
private val peakTps = (0.25, 0.34)
private val totalRounds = 4

override def beforeAll() = {
super.beforeAll()
Expand Down Expand Up @@ -152,12 +153,14 @@ class ScanTotalSupplyBigQueryIntegrationTest
createBigQueryFunctions()
}

val results = withClue("running total supply queries in BigQuery") {
runTotalSupplyQueries()
withClue("testing total supply queries in BigQuery") {
val results = runDashboardQueries()
verifyDashboardResults(results)
}

withClue(s"verify total supply results") {
verifyResults(results)
withClue("testing finance queries") {
val results = runFinanceQueries()
verifyFinanceResults(results)
}
}

Expand Down Expand Up @@ -523,16 +526,31 @@ class ScanTotalSupplyBigQueryIntegrationTest
job.waitFor()
}

/** Runs the total supply queries from the SQL file
private def runFinanceQueries()(implicit env: FixtureParam): FinanceMetrics = {
val project = bigquery.getOptions.getProjectId
// The TPS query assumes staleness of up to 4 hours, so we query for stats 5 hours after the current ledger time.
val timestamp = getLedgerTime.toInstant.plus(5, ChronoUnit.HOURS).toString
logger.info(s"Querying all dashboard stats as of $timestamp")
val sql =
s"SELECT * FROM `$project.$functionsDatasetName.all_finance_stats`('$timestamp', 0);"

parseFinanceResults(runTableSqlQuery(sql))
}

/** Runs the dashboard queries from the SQL file
*/
private def runTotalSupplyQueries()(implicit env: FixtureParam): ExpectedMetrics = {
private def runDashboardQueries()(implicit env: FixtureParam): DashboardMetrics = {
val project = bigquery.getOptions.getProjectId
// The TPS query assumes staleness of up to 4 hours, so we query for stats 5 hours after the current ledger time.
val timestamp = getLedgerTime.toInstant.plus(5, ChronoUnit.HOURS).toString
logger.info(s"Querying all dashboard stats as of $timestamp")
val sql =
s"SELECT * FROM `$project.$functionsDatasetName.all_dashboard_stats`('$timestamp', 0);"

logger.info(s"Querying all stats as of $timestamp")
parseDashboardResults(runTableSqlQuery(sql))
}

private def runTableSqlQuery(sql: String): TableResult = {

// Execute the query
val queryConfig = bq.QueryJobConfiguration
Expand All @@ -549,11 +567,15 @@ class ScanTotalSupplyBigQueryIntegrationTest
job.waitFor()

// results should be available now
val result = job.getQueryResults()
parseQueryResults(result)
job.getQueryResults()
}

private case class ExpectedMetrics(
private case class FinanceMetrics(
// Most metrics reuse the same code as the dasboard computation, so we don't bother validating them again
latestRound: Long
)

private case class DashboardMetrics(
locked: BigDecimal,
unlocked: BigDecimal,
currentSupplyTotal: BigDecimal,
Expand All @@ -572,51 +594,64 @@ class ScanTotalSupplyBigQueryIntegrationTest
avgCoinPrice: BigDecimal,
)

private def parseQueryResults(result: bq.TableResult) = {
// We expect the final query to return a single row with all metrics
val row = result.iterateAll().iterator().next()
logger.debug(s"Query row: $row; schema ${result.getSchema}")
private def required(row: FieldValueList, column: String) = {
val field = row get column
if (field.isNull)
fail(s"Column '$column' in all-stats results is null")
field
}

def required(column: String) = {
val field = row get column
if (field.isNull)
fail(s"Column '$column' in all-stats results is null")
field
}
def bd(row: FieldValueList, column: String) = {
BigDecimal(required(row, column).getStringValue)
}

def bd(column: String) = {
BigDecimal(required(column).getStringValue)
}
def int(row: FieldValueList, column: String) = {
required(row, column).getLongValue
}

def int(column: String) = {
required(column).getLongValue
}
def float(row: FieldValueList, column: String) = {
required(row, column).getDoubleValue
}

def float(column: String) = {
required(column).getDoubleValue
}
private def parseFinanceResults(result: bq.TableResult) = {
val row = result.iterateAll().iterator().next()
logger.debug(s"Query row: $row; schema ${result.getSchema}")

FinanceMetrics(
latestRound = int(row, "latest_round")
)
}

private def parseDashboardResults(result: bq.TableResult) = {
// We expect the final query to return a single row with all metrics
val row = result.iterateAll().iterator().next()
logger.debug(s"Query row: $row; schema ${result.getSchema}")

ExpectedMetrics(
locked = bd("locked"),
unlocked = bd("unlocked"),
currentSupplyTotal = bd("current_supply_total"),
unminted = bd("unminted"),
mintedAppRewards = bd("daily_mint_app_rewards"),
mintedValidatorRewards = bd("daily_mint_validator_rewards"),
mintedSvRewards = bd("daily_mint_sv_rewards"),
mintedUnclaimed = bd("daily_mint_unclaimed_activity_records"),
burned = bd("daily_burn"),
numAmuletHolders = int("num_amulet_holders"),
numActiveValidators = int("num_active_validators"),
avgTps = float("average_tps"),
peakTps = float("peak_tps"),
minCoinPrice = bd("daily_min_coin_price"),
maxCoinPrice = bd("daily_max_coin_price"),
avgCoinPrice = bd("daily_avg_coin_price"),
DashboardMetrics(
locked = bd(row, "locked"),
unlocked = bd(row, "unlocked"),
currentSupplyTotal = bd(row, "current_supply_total"),
unminted = bd(row, "unminted"),
mintedAppRewards = bd(row, "daily_mint_app_rewards"),
mintedValidatorRewards = bd(row, "daily_mint_validator_rewards"),
mintedSvRewards = bd(row, "daily_mint_sv_rewards"),
mintedUnclaimed = bd(row, "daily_mint_unclaimed_activity_records"),
burned = bd(row, "daily_burn"),
numAmuletHolders = int(row, "num_amulet_holders"),
numActiveValidators = int(row, "num_active_validators"),
avgTps = float(row, "average_tps"),
peakTps = float(row, "peak_tps"),
minCoinPrice = bd(row, "daily_min_coin_price"),
maxCoinPrice = bd(row, "daily_max_coin_price"),
avgCoinPrice = bd(row, "daily_avg_coin_price"),
)
}

private def verifyResults(results: ExpectedMetrics): Unit = {
private def verifyFinanceResults(results: FinanceMetrics): Unit = {
results.latestRound shouldBe totalRounds withClue "total_rounds"
}

private def verifyDashboardResults(results: DashboardMetrics): Unit = {
// Verify individual metrics
forEvery(
Seq(
Expand Down
25 changes: 24 additions & 1 deletion cluster/pulumi/canton-network/src/bigQuery_functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const daml_prim_path = new BQScalarFunction(
WHEN 'contractId' THEN '.contractId'
WHEN 'list' THEN '.list.elements'
WHEN 'party' THEN '.party'
WHEN 'int64' THEN '.int64'
-- we treat records just like outer layer;
-- see how paths start with '$.record'
WHEN 'record' THEN ''
Expand Down Expand Up @@ -586,6 +587,24 @@ const coin_price = new BQScalarFunction(
`
);

const latest_round = new BQScalarFunction(
'latest_round',
as_of_args,
INT64,
`
(SELECT
CAST(JSON_VALUE(c.create_arguments, \`$$FUNCTIONS_DATASET$$.daml_record_path\`([1,0], 'int64')) AS INT64)
FROM \`$$SCAN_DATASET$$.scan_sv_1_update_history_creates\` c
WHERE template_id_entity_name = 'SummarizingMiningRound'
AND c.template_id_module_name = 'Splice.Round'
AND package_name = 'splice-amulet'
AND \`$$FUNCTIONS_DATASET$$.up_to_time\`(
as_of_record_time, migration_id,
c.record_time, c.migration_id)
ORDER BY c.record_time DESC LIMIT 1)
`
);

const all_dashboard_stats = new BQTableFunction(
'all_dashboard_stats',
as_of_args,
Expand Down Expand Up @@ -689,6 +708,7 @@ const all_finance_stats = new BQTableFunction(
new BQColumn('total_burn', BIGNUMERIC),
new BQColumn('num_amulet_holders', INT64),
new BQColumn('num_active_validators', INT64),
new BQColumn('latest_round', INT64),
],
`
SELECT
Expand Down Expand Up @@ -729,7 +749,9 @@ const all_finance_stats = new BQTableFunction(
migration_id),
0) AS total_burn,
\`$$FUNCTIONS_DATASET$$.num_amulet_holders\`(as_of_record_time, migration_id) as num_amulet_holders,
\`$$FUNCTIONS_DATASET$$.num_active_validators\`(as_of_record_time, migration_id) as num_active_validators
\`$$FUNCTIONS_DATASET$$.num_active_validators\`(as_of_record_time, migration_id) as num_active_validators,
\`$$FUNCTIONS_DATASET$$.latest_round\`(as_of_record_time, migration_id) as latest_round

`
);

Expand Down Expand Up @@ -878,6 +900,7 @@ export const allScanFunctions = [
average_tps,
peak_tps,
coin_price,
latest_round,
all_dashboard_stats,
all_finance_stats,
];
Expand Down