Skip to content

Commit d43c4ac

Browse files
Fetch client-provider-distribution-acc week-by-week
1 parent e0ba8cd commit d43c4ac

File tree

2 files changed

+85
-102
lines changed

2 files changed

+85
-102
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
with miner_pieces as (
2+
select
3+
"pieceCid" as piece_cid,
4+
'f0' || "clientId" as client,
5+
'f0' || "providerId" as provider,
6+
sum("pieceSize") as total_deal_size,
7+
min("pieceSize") as piece_size
8+
from unified_verified_deal
9+
where
10+
"termStart" >= 3847920 -- nv22 start
11+
and date_trunc('week', to_timestamp("termStart" * 30 + 1598306400)) <= $1 -- deals up to provided week
12+
group by
13+
"pieceCid",
14+
"clientId",
15+
"providerId"
16+
)
17+
18+
select
19+
client,
20+
provider,
21+
sum(total_deal_size)::bigint as total_deal_size,
22+
sum(piece_size)::bigint as unique_data_size
23+
from miner_pieces
24+
group by client, provider;
25+
26+

src/aggregation/runners/client-provider-distribution-acc.runner.ts

Lines changed: 59 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,122 +1,79 @@
1-
import { QueryIterablePool } from 'pg-iterator';
1+
import { Logger } from '@nestjs/common';
2+
import { DateTime } from 'luxon';
3+
import { getClientProviderDistributionAccSingleWeek } from 'prismaDmob/generated/client/sql';
24
import {
35
AggregationRunner,
46
AggregationRunnerRunServices,
57
} from '../aggregation-runner';
68
import { AggregationTable } from '../aggregation-table';
79

810
export class ClientProviderDistributionAccRunner implements AggregationRunner {
11+
private readonly logger = new Logger(
12+
ClientProviderDistributionAccRunner.name,
13+
);
14+
915
public async run({
1016
prismaService,
11-
postgresDmobService,
17+
prismaDmobService,
1218
prometheusMetricService,
1319
}: AggregationRunnerRunServices): Promise<void> {
1420
const runnerName = this.getName();
15-
1621
const {
1722
startGetDataTimerByRunnerNameMetric,
1823
startStoreDataTimerByRunnerNameMetric,
1924
} = prometheusMetricService.aggregateMetrics;
2025

21-
await prismaService.$transaction(
22-
async (tx) => {
23-
const getDataEndTimerMetric =
24-
startGetDataTimerByRunnerNameMetric(runnerName);
25-
26-
const queryIterablePool = new QueryIterablePool<{
27-
week: Date | null;
28-
client: string | null;
29-
provider: string | null;
30-
total_deal_size: bigint | null;
31-
unique_data_size: bigint | null;
32-
}>(postgresDmobService.pool);
33-
34-
const i = queryIterablePool.query(`with miner_pieces
35-
as (select date_trunc('week', to_timestamp("termStart" * 30 + 1598306400)) as week,
36-
'f0' || "clientId" as client,
37-
'f0' || "providerId" as provider,
38-
"pieceCid",
39-
sum("pieceSize") as total_deal_size,
40-
min("pieceSize") as piece_size
41-
from unified_verified_deal
42-
where "termStart" >= 3847920 -- nv22 start
43-
and to_timestamp("termStart" * 30 + 1598306400) <= current_timestamp -- deals that didn't start yet
44-
group by week,
45-
client,
46-
provider,
47-
"pieceCid"),
48-
weeks as (select date_trunc('week', dates) week
49-
from generate_series(
50-
to_timestamp(3847920 * 30 + 1598306400),
51-
current_timestamp,
52-
'1 week'::interval) dates)
53-
select weeks.week as week,
54-
client,
55-
provider,
56-
sum(total_deal_size)::bigint as total_deal_size,
57-
sum(piece_size)::bigint as unique_data_size
58-
from weeks
59-
inner join miner_pieces
60-
on weeks.week >= miner_pieces.week
61-
group by weeks.week,
62-
client,
63-
provider;`);
64-
65-
const data: {
66-
week: Date | null;
67-
client: string | null;
68-
provider: string | null;
69-
total_deal_size: bigint | null;
70-
unique_data_size: bigint | null;
71-
}[] = [];
72-
73-
let isFirstInsert = true;
74-
75-
let storeDataEndTimerMetric;
76-
77-
for await (const rowResult of i) {
78-
data.push({
79-
week: rowResult.week,
80-
client: rowResult.client,
81-
provider: rowResult.provider,
82-
total_deal_size: rowResult.total_deal_size,
83-
unique_data_size: rowResult.unique_data_size,
84-
});
85-
86-
if (data.length === 5000) {
87-
if (isFirstInsert) {
88-
getDataEndTimerMetric();
89-
storeDataEndTimerMetric =
90-
startStoreDataTimerByRunnerNameMetric(runnerName);
91-
await tx.$executeRaw`delete from client_provider_distribution_weekly_acc`;
92-
isFirstInsert = false;
93-
}
94-
95-
await tx.client_provider_distribution_weekly_acc.createMany({
96-
data,
97-
});
98-
99-
data.length = 0;
100-
}
101-
}
102-
103-
if (data.length > 0) {
104-
if (isFirstInsert) {
105-
getDataEndTimerMetric();
106-
storeDataEndTimerMetric =
107-
startStoreDataTimerByRunnerNameMetric(runnerName);
108-
await tx.$executeRaw`delete from client_provider_distribution_weekly_acc`;
109-
}
110-
await tx.client_provider_distribution_weekly_acc.createMany({
111-
data,
112-
});
113-
}
114-
storeDataEndTimerMetric();
115-
},
116-
{
117-
timeout: Number.MAX_SAFE_INTEGER,
118-
},
119-
);
26+
//const latestStored =
27+
// await prismaService.client_provider_distribution_weekly_acc.findFirst({
28+
// select: {
29+
// week: true,
30+
// },
31+
// orderBy: {
32+
// week: 'desc',
33+
// },
34+
// });
35+
//let nextWeek = latestStored
36+
// ? DateTime.fromJSDate(latestStored.date, { zone: 'UTC' }) // we want to reprocess the last stored week, as it might've been incomplete
37+
// : DateTime.fromSeconds(3847920 * 30 + 1598306400).startOf('week'); // nv22 start week - a.k.a. reprocess everything
38+
let nextWeek = DateTime.fromSeconds(3847920 * 30 + 1598306400).startOf(
39+
'week',
40+
); // nv22 start week
41+
42+
const now = DateTime.now().setZone('UTC');
43+
while (nextWeek <= now) {
44+
this.logger.debug(`Processing week ${nextWeek}`);
45+
const getDataEndTimerMetric =
46+
startGetDataTimerByRunnerNameMetric(runnerName);
47+
const result = await prismaDmobService.$queryRawTyped(
48+
getClientProviderDistributionAccSingleWeek(nextWeek.toJSDate()),
49+
);
50+
getDataEndTimerMetric();
51+
52+
const storeDataEndTimerMetric =
53+
startStoreDataTimerByRunnerNameMetric(runnerName);
54+
const data = result.map((dmobResult) => ({
55+
week: nextWeek.toJSDate(),
56+
client: dmobResult.client,
57+
provider: dmobResult.provider,
58+
total_deal_size: dmobResult.total_deal_size,
59+
unique_data_size: dmobResult.unique_data_size,
60+
}));
61+
await prismaService.$transaction(async (tx) => {
62+
await tx.client_provider_distribution_weekly_acc.deleteMany({
63+
where: {
64+
week: {
65+
equals: nextWeek.toJSDate(),
66+
},
67+
},
68+
});
69+
await tx.client_provider_distribution_weekly_acc.createMany({
70+
data,
71+
});
72+
});
73+
storeDataEndTimerMetric();
74+
75+
nextWeek = nextWeek.plus({ weeks: 1 });
76+
}
12077
}
12178

12279
getFilledTables(): AggregationTable[] {

0 commit comments

Comments
 (0)