Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions ponder.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,7 @@ export const PoolSnapshotRelations = relations(PoolSnapshot, ({ one }) => ({
}),
}));

/** Dynamic TokenSnapshot yield bigint columns derived from configured yield specs. */
function tokenYieldSnapshotColumns(t: PgColumnsBuilders) {
const cols: Record<string, ReturnType<PgColumnsBuilders["bigint"]>> = {};
for (const spec of TOKEN_YIELD_SPECS) {
Expand Down Expand Up @@ -1598,6 +1599,10 @@ const TokenInstancePositionColumns = (t: PgColumnsBuilders) => ({
centrifugeId: t.text().notNull(),
accountAddress: t.hex().notNull(),
balance: t.bigint().default(0n),
tokenPriceAtLastChange: t.bigint(),
cumulativeEarnings: t.bigint().notNull().default(0n),
costBasis: t.bigint().notNull().default(0n),
cumulativeRealizedPnl: t.bigint().notNull().default(0n),
isFrozen: t.boolean().notNull().default(false), //TODO: Deprecate this column
...defaultColumns(t),
});
Expand All @@ -1624,6 +1629,55 @@ export const TokenInstancePositionRelations = relations(TokenInstancePosition, (
}),
}));

/** Immutable per-transfer investor accounting checkpoints on a token instance. */
const InvestorPositionCheckpointColumns = (t: PgColumnsBuilders) => ({
tokenId: t.hex().notNull(),
centrifugeId: t.text().notNull(),
accountAddress: t.hex().notNull(),
poolId: t.bigint().notNull(),
balanceBefore: t.bigint().notNull(),
balanceAfter: t.bigint().notNull(),
tokenPrice: t.bigint(),
periodEarnings: t.bigint(),
cumulativeEarnings: t.bigint().notNull().default(0n),
costBasisBefore: t.bigint().notNull().default(0n),
costBasisAfter: t.bigint().notNull().default(0n),
realizedPnl: t.bigint(),
cumulativeRealizedPnl: t.bigint().notNull().default(0n),
trigger: t.text().notNull(),
logIndex: t.integer().notNull(),
...defaultColumns(t, false),
});

export const InvestorPositionCheckpoint = onchainTable(
"investor_position_checkpoint",
InvestorPositionCheckpointColumns,
(t) => ({
id: primaryKey({
columns: [t.tokenId, t.centrifugeId, t.accountAddress, t.createdAtBlock, t.logIndex],
}),
accountTokenCreatedAtIdx: index().on(t.accountAddress, t.tokenId, t.centrifugeId, t.createdAt),
})
);

export const InvestorPositionCheckpointRelations = relations(
InvestorPositionCheckpoint,
({ one }) => ({
tokenInstance: one(TokenInstance, {
fields: [InvestorPositionCheckpoint.tokenId, InvestorPositionCheckpoint.centrifugeId],
references: [TokenInstance.tokenId, TokenInstance.centrifugeId],
}),
account: one(Account, {
fields: [InvestorPositionCheckpoint.accountAddress],
references: [Account.address],
}),
pool: one(Pool, {
fields: [InvestorPositionCheckpoint.poolId],
references: [Pool.id],
}),
})
);

const MerkleProofManagerColumns = (t: PgColumnsBuilders) => ({
address: t.hex().notNull(),
centrifugeId: t.text().notNull(),
Expand Down
172 changes: 140 additions & 32 deletions src/handlers/tokenInstanceHandlers.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { multiMapper } from "../helpers/multiMapper";
import { logEvent, serviceError, serviceLog } from "../helpers/logger";
import { logEvent, serviceError, serviceLog, serviceWarn } from "../helpers/logger";
import { computeInvestorPositionCheckpoint } from "../helpers/investorPositionCheckpoint";
import {
BlockchainService,
DeploymentService,
TokenInstanceService,
TokenInstancePositionService,
AccountService,
InvestorPositionCheckpointService,
TokenService,
InvestorTransactionService,
EscrowService,
Expand Down Expand Up @@ -34,6 +36,7 @@ multiMapper("tokenInstance:Transfer", async ({ event, context }) => {
const token = (await TokenService.get(context, { id: tokenId })) as TokenService | null;
if (!token) return serviceError(`Token not found. Cannot retrieve poolId`);
const { poolId } = token.read();
const { tokenPrice } = tokenInstance.read();

const [isFromNull, isToNull] = [BigInt(from) === 0n, BigInt(to) === 0n];

Expand Down Expand Up @@ -67,48 +70,153 @@ multiMapper("tokenInstance:Transfer", async ({ event, context }) => {
!isToNull && !isToGlobalEscrow && !isToPoolEscrow,
];

if (isFromUserAccount) {
const _fromAccount = await AccountService.getOrInit(context, { address: from }, event);
const fromPosition = (await TokenInstancePositionService.getOrInit(
const isSelfTransfer = isFromUserAccount && isToUserAccount && from === to;
const trigger = "tokenInstance:Transfer" as const;
const logIndex = event.log.logIndex;

const handleUserPositionChange = async ({
accountAddress,
isIncrease,
}: {
accountAddress: `0x${string}`;
isIncrease: boolean;
}) => {
await AccountService.getOrInit(context, { address: accountAddress }, event);

const positionQuery = {
tokenId,
centrifugeId,
accountAddress,
} as const;

let positionWasInitialized = false;
const position = (await TokenInstancePositionService.getOrInit(
context,
{
tokenId: tokenId,
centrifugeId,
accountAddress: from,
},
positionQuery,
event,
async (tokenInstancePosition) =>
await initialisePosition(context, event, tokenAddress, tokenInstancePosition)
async (tokenInstancePosition) => {
positionWasInitialized = true;
await initialisePosition(context, event, tokenAddress, tokenInstancePosition);
}
)) as TokenInstancePositionService;
const { createdAtBlock } = fromPosition.read();
if (!createdAtBlock) {
serviceError(`TokenInstancePosition not found. Cannot update balance`);

const positionData = position.read();
const positionAlreadyExisted = !positionWasInitialized;
const currentBalance = positionData.balance ?? 0n;

if (!positionAlreadyExisted && !isIncrease) {
serviceWarn(
"InvestorPositionCheckpoint invariant violated: first-seen sender position on Transfer",
`tokenId=${tokenId}`,
`centrifugeId=${centrifugeId}`,
`accountAddress=${accountAddress}`,
`txHash=${event.transaction.hash}`,
`block=${event.block.number}`,
`amount=${amount}`
);
return;
}

const balanceBefore = positionAlreadyExisted ? currentBalance : 0n;
const balanceAfter = positionAlreadyExisted
? isIncrease
? currentBalance + amount
: currentBalance - amount
: amount;

if (!positionAlreadyExisted && isIncrease && currentBalance !== amount) {
serviceWarn(
"InvestorPositionCheckpoint invariant violated: first-seen recipient balance mismatch",
`tokenId=${tokenId}`,
`centrifugeId=${centrifugeId}`,
`accountAddress=${accountAddress}`,
`txHash=${event.transaction.hash}`,
`block=${event.block.number}`,
`amount=${amount}`,
`initializedBalance=${currentBalance}`
);
return;
}
if (createdAtBlock < Number(event.block.number)) fromPosition.subBalance(amount);
await fromPosition.save(event);
}

if (isToUserAccount) {
const _toAccount = await AccountService.getOrInit(context, { address: to }, event);
const toPosition = (await TokenInstancePositionService.getOrInit(
if (tokenPrice === null || tokenPrice <= 0n) {
serviceWarn(
"InvestorPositionCheckpoint skipped due to unknown token price",
`tokenId=${tokenId}`,
`centrifugeId=${centrifugeId}`,
`accountAddress=${accountAddress}`,
`txHash=${event.transaction.hash}`,
`block=${event.block.number}`,
`amount=${amount}`
);
if (positionAlreadyExisted) {
await position.setBalance(balanceAfter).save(event);
}
return;
}

const accounting = computeInvestorPositionCheckpoint({
amount,
balanceBefore,
balanceAfter,
tokenPrice,
tokenPriceAtLastChange: positionData.tokenPriceAtLastChange ?? null,
cumulativeEarningsBefore: positionData.cumulativeEarnings ?? 0n,
costBasisBefore: positionData.costBasis ?? 0n,
cumulativeRealizedPnlBefore: positionData.cumulativeRealizedPnl ?? 0n,
isIncrease,
});

await InvestorPositionCheckpointService.createCheckpoint(
context,
{
tokenId: tokenId,
tokenId,
centrifugeId,
accountAddress: to,
accountAddress,
poolId,
balanceBefore,
balanceAfter,
tokenPrice,
periodEarnings: accounting.periodEarnings,
cumulativeEarnings: accounting.cumulativeEarningsAfter,
costBasisBefore: positionData.costBasis ?? 0n,
costBasisAfter: accounting.costBasisAfter,
realizedPnl: accounting.realizedPnl,
cumulativeRealizedPnl: accounting.cumulativeRealizedPnlAfter,
trigger,
logIndex,
},
event,
async (tokenInstancePosition) =>
await initialisePosition(context, event, tokenAddress, tokenInstancePosition)
)) as TokenInstancePositionService;
const { createdAtBlock } = toPosition.read();
if (!createdAtBlock) {
serviceError(`TokenInstancePosition not found. Cannot update balance`);
return;
event
);

await position
.applyCheckpointAccounting({
balanceAfter,
tokenPrice,
cumulativeEarnings: accounting.cumulativeEarningsAfter,
costBasisAfter: accounting.costBasisAfter,
cumulativeRealizedPnl: accounting.cumulativeRealizedPnlAfter,
})
.save(event);
};

if (isSelfTransfer) {
serviceWarn(
"InvestorPositionCheckpoint skipped for self-transfer",
`tokenId=${tokenId}`,
`centrifugeId=${centrifugeId}`,
`accountAddress=${from}`,
`txHash=${event.transaction.hash}`,
`block=${event.block.number}`,
`amount=${amount}`
);
} else {
if (isFromUserAccount) {
await handleUserPositionChange({ accountAddress: from, isIncrease: false });
}

if (isToUserAccount) {
await handleUserPositionChange({ accountAddress: to, isIncrease: true });
}
if (createdAtBlock < Number(event.block.number)) toPosition.addBalance(amount);
await toPosition.save(event);
}

// Handle tokenInstance and token total issuance change
Expand Down
90 changes: 90 additions & 0 deletions src/helpers/investorPositionCheckpoint.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { bigintMax } from "./bigintMath";

export type InvestorPositionCheckpointComputationInput = {
amount: bigint;
balanceBefore: bigint;
balanceAfter: bigint;
tokenPrice: bigint;
tokenPriceAtLastChange: bigint | null;
cumulativeEarningsBefore: bigint;
costBasisBefore: bigint;
cumulativeRealizedPnlBefore: bigint;
isIncrease: boolean;
};

export type InvestorPositionCheckpointComputation = {
periodEarnings: bigint | null;
cumulativeEarningsAfter: bigint;
costBasisAfter: bigint;
realizedPnl: bigint;
cumulativeRealizedPnlAfter: bigint;
};

/**
* Price appreciation on the held balance between position changes.
*/
export function computePeriodEarnings(
balanceBefore: bigint,
currentTokenPrice: bigint,
previousTokenPrice: bigint | null
) {
if (balanceBefore === 0n || previousTokenPrice === null || previousTokenPrice <= 0n) return null;
return balanceBefore * (currentTokenPrice - previousTokenPrice);
}

/**
* Realized cost basis removed on a decrease, using direct multiplication/division to avoid
* an intermediate average-cost bigint rounding step.
*/
export function computeRemovedCostBasis(
amount: bigint,
costBasisBefore: bigint,
balanceBefore: bigint
) {
if (amount === 0n || costBasisBefore === 0n || balanceBefore === 0n) return 0n;
return (amount * costBasisBefore) / balanceBefore;
}

/**
* Computes checkpoint accounting fields for a balance change with a known positive token price.
*/
export function computeInvestorPositionCheckpoint(
input: InvestorPositionCheckpointComputationInput
): InvestorPositionCheckpointComputation {
const {
amount,
balanceBefore,
tokenPrice,
tokenPriceAtLastChange,
cumulativeEarningsBefore,
costBasisBefore,
cumulativeRealizedPnlBefore,
isIncrease,
} = input;

const periodEarnings = computePeriodEarnings(balanceBefore, tokenPrice, tokenPriceAtLastChange);
const cumulativeEarningsAfter = cumulativeEarningsBefore + (periodEarnings ?? 0n);

if (isIncrease) {
const costBasisAfter = costBasisBefore + amount * tokenPrice;
return {
periodEarnings,
cumulativeEarningsAfter,
costBasisAfter,
realizedPnl: 0n,
cumulativeRealizedPnlAfter: cumulativeRealizedPnlBefore,
};
}

const removedCostBasis = computeRemovedCostBasis(amount, costBasisBefore, balanceBefore);
const realizedPnl = amount * tokenPrice - removedCostBasis;
const costBasisAfter = bigintMax(costBasisBefore - removedCostBasis, 0n);

return {
periodEarnings,
cumulativeEarningsAfter,
costBasisAfter,
realizedPnl,
cumulativeRealizedPnlAfter: cumulativeRealizedPnlBefore + realizedPnl,
};
}
9 changes: 9 additions & 0 deletions src/helpers/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,12 @@ export function serviceLog(...args: any[]) {
export function serviceError(...args: any[]) {
process.stderr.write("> [ERROR] " + args.join(" ") + "\n");
}

/**
* Logs a warning message to the console with a prefix.
*
* @param args - The arguments to log
*/
export function serviceWarn(...args: any[]) {
process.stderr.write("> [WARN] " + args.join(" ") + "\n");
}
Loading
Loading