Skip to content

Commit 372da68

Browse files
lucas-neynartybook
andcommitted
feat: username proof reconciliation [NEYN-1422] (#3)
* feat: add hub event reconciler * refactor: rename hubEventReconciliation to onChainEventReconciliation * fix: add event body types * fix: update logs and errors * fix: use built-in error method and fix build error with example-app * fix: restore example-app * fix: remove test code * fix: update processHubEvent logic * fix: type error * fix: remove empty file * feat: update onChainEventReconciliation to use maps * Update packages/shuttle/src/shuttle/onChainEventReconciliation.ts Co-authored-by: Ty Book <tbook11@gmail.com> * fix: rename functions and remove unnecessary logic * fix: move onchain_events table to HubTables * fix: Onchain to OnChain * fix: rename more things to onChain pattern, add support for SignerMigrated event, and fix example insert * fix: remove unused import * refactor: rename to onChain pattern * fix: update date comparison logic * fix: set fid to zero for signer migrated events * Revert "fix: set fid to zero for signer migrated events" This reverts commit d21fdbb. * fix: set fid to zero for signer migrated event lookups * fix: set fid to zero for signer migrated events before hubble queries * feat: add support for username proof reconciliation * fix: build error * feat: add wrapper for processing multiple username proof types * fix: rename name column to username * fix: convert farcaster time to date * feat: index username proof type * Remove redundant lines * fix: reconciliation logic * update yarn.lock * remove yarn.lock * PR feedback --------- Co-authored-by: Ty Book <tbook11@gmail.com> Co-authored-by: Ty Book <ty@neynar.com>
1 parent c8cf5ea commit 372da68

3 files changed

Lines changed: 249 additions & 0 deletions

File tree

packages/shuttle/src/shuttle/db.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,13 +159,29 @@ export type OnChainEventsTable = {
159159
body: IdRegisterEventBody | SignerEventBody | StorageRentEventBody | SignerMigratedEventBody;
160160
};
161161

162+
type UsernamesTable = {
163+
id: Generated<string>;
164+
createdAt: Generated<Date>;
165+
updatedAt: Generated<Date>;
166+
deletedAt: Date | null;
167+
fid: Fid;
168+
username: string;
169+
custodyAddress: Uint8Array | null;
170+
proofTimestamp: Date;
171+
type: UserNameType;
172+
};
173+
162174
export type OnChainEventRow = Selectable<OnChainEventsTable>;
163175
export type InsertableOnChainEventRow = Insertable<OnChainEventsTable>;
164176

177+
export type UsernameRow = Selectable<UsernamesTable>;
178+
export type InsertableUsernameRow = Insertable<UsernamesTable>;
179+
165180
// ALL TABLES -------------------------------------------------------------------------------------
166181
export interface HubTables {
167182
messages: MessagesTable;
168183
onchain_events: OnChainEventsTable;
184+
usernames: UsernamesTable;
169185
}
170186

171187
export const getDbClient = (connectionString?: string, schema = "public") => {

packages/shuttle/src/shuttle/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export * from "./hubEventProcessor";
99
export * from "./messageProcessor";
1010
export * from "./messageReconciliation";
1111
export * from "./onChainEventReconciliation";
12+
export * from "./usernameProofReconciliation";
1213
export * from "./eventStream";
1314

1415
export type StoreMessageOperation = "merge" | "delete" | "revoke" | "prune";
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
import {
2+
type HubRpcClient,
3+
type UserNameProof,
4+
UserNameType,
5+
bytesToHexString,
6+
fromFarcasterTime,
7+
} from "@farcaster/hub-nodejs";
8+
import { type DB } from "./db";
9+
import { Logger } from "pino";
10+
import { ok, err, Result } from "neverthrow";
11+
12+
export class UsernameProofReconciliation {
13+
private readonly hubClient: HubRpcClient;
14+
private readonly db: DB;
15+
private readonly log: Logger;
16+
17+
constructor(hubClient: HubRpcClient, db: DB, log: Logger) {
18+
this.hubClient = hubClient;
19+
this.db = db;
20+
this.log = log;
21+
}
22+
23+
async reconcileUsernameProofsForFid(
24+
fid: number,
25+
onHubProof: (proof: UserNameProof, missingInDb: boolean) => Promise<void>,
26+
onDbProof?: (proof: UserNameProof, missingInHub: boolean) => Promise<void>,
27+
startTimestamp?: number,
28+
stopTimestamp?: number,
29+
types?: UserNameType[],
30+
) {
31+
let startDate: Date | undefined;
32+
if (startTimestamp) {
33+
const startUnixTimestampResult = fromFarcasterTime(startTimestamp);
34+
if (startUnixTimestampResult.isErr()) {
35+
this.log.error({ fid, types, startTimestamp, stopTimestamp }, "Invalid time range provided to reconciliation");
36+
return;
37+
}
38+
39+
startDate = new Date(startUnixTimestampResult.value);
40+
}
41+
42+
let stopDate: Date | undefined;
43+
if (stopTimestamp) {
44+
const stopUnixTimestampResult = fromFarcasterTime(stopTimestamp);
45+
if (stopUnixTimestampResult.isErr()) {
46+
this.log.error({ fid, types, startTimestamp, stopTimestamp }, "Invalid time range provided to reconciliation");
47+
return;
48+
}
49+
50+
stopDate = new Date(stopUnixTimestampResult.value);
51+
}
52+
53+
for (const proofType of types ?? [UserNameType.USERNAME_TYPE_FNAME, UserNameType.USERNAME_TYPE_ENS_L1]) {
54+
this.log.debug({ fid, proofType, startTimestamp, stopTimestamp }, "Reconciling username proofs for FID");
55+
await this.reconcileUsernameProofsOfTypeForFid(fid, proofType, onHubProof, onDbProof, startDate, stopDate);
56+
}
57+
}
58+
59+
async reconcileUsernameProofsOfTypeForFid(
60+
fid: number,
61+
proofType: UserNameType,
62+
onHubProof: (proof: UserNameProof, missingInDb: boolean) => Promise<void>,
63+
onDbProof?: (proof: UserNameProof, missingInHub: boolean) => Promise<void>,
64+
startDate?: Date,
65+
stopDate?: Date,
66+
): Promise<void> {
67+
const hubProofsByKey = new Map<string, UserNameProof>();
68+
69+
// First, reconcile proofs that are in the hub but not in the database
70+
for await (const proofs of this.allHubUsernameProofsOfTypeForFid(fid, proofType, startDate, stopDate)) {
71+
const proofKeys = proofs.map((proof: UserNameProof) => this.getProofKey(proof));
72+
73+
if (proofKeys.length === 0) {
74+
this.log.debug(
75+
{ fid, proofType, startDate: startDate?.toISOString(), stopDate: stopDate?.toISOString() },
76+
"No username proofs found in hub",
77+
);
78+
continue;
79+
}
80+
81+
const dbProofs = await this.dbProofsMatchingHubProofs(fid, proofType, startDate, stopDate, proofs);
82+
if (dbProofs.isErr()) {
83+
throw dbProofs.error;
84+
}
85+
86+
const dbProofsByKey = new Map<string, UserNameProof>();
87+
for (const proof of dbProofs.value) {
88+
const key = this.getProofKey(proof);
89+
dbProofsByKey.set(key, proof);
90+
}
91+
92+
for (const proof of proofs) {
93+
const proofKey = this.getProofKey(proof);
94+
hubProofsByKey.set(proofKey, proof);
95+
96+
const dbProof = dbProofsByKey.get(proofKey);
97+
await onHubProof(proof, dbProof === undefined);
98+
}
99+
}
100+
101+
// Next, reconcile proofs that are in the database but not in the hub
102+
if (onDbProof) {
103+
const dbProofs = await this.allActiveDbProofsOfTypeForFid(fid, proofType, startDate, stopDate);
104+
if (dbProofs.isErr()) {
105+
this.log.error(
106+
{ fid, proofType, startDate: startDate?.toISOString(), stopDate: stopDate?.toISOString() },
107+
"Invalid time range provided to reconciliation",
108+
);
109+
return;
110+
}
111+
112+
for (const dbProof of dbProofs.value) {
113+
const key = this.getProofKey(dbProof);
114+
await onDbProof(dbProof, !hubProofsByKey.has(key));
115+
}
116+
}
117+
}
118+
119+
private async getProofsFromHub(
120+
fid: number,
121+
proofType: UserNameType,
122+
startDate?: Date,
123+
stopDate?: Date,
124+
): Promise<Result<UserNameProof[], Error>> {
125+
const result = await this.hubClient.getUserNameProofsByFid({ fid });
126+
if (result.isErr()) {
127+
return err(new Error(`Unable to get username proofs for FID ${fid}`, { cause: result.error }));
128+
}
129+
130+
let proofs = result.value.proofs.filter((proof) => proof.type === proofType);
131+
132+
if (startDate || stopDate) {
133+
proofs = proofs.filter((proof) => {
134+
if (startDate !== undefined && proof.timestamp * 1000 < startDate.getTime()) return false;
135+
if (stopDate !== undefined && proof.timestamp * 1000 > stopDate.getTime()) return false;
136+
return true;
137+
});
138+
}
139+
140+
return ok(proofs);
141+
}
142+
143+
private async getProofsFromDb(
144+
fid: number,
145+
proofType: UserNameType,
146+
startDate?: Date,
147+
stopDate?: Date,
148+
hubProofs?: UserNameProof[],
149+
): Promise<Result<UserNameProof[], Error>> {
150+
try {
151+
let query = this.db
152+
.selectFrom("usernames")
153+
.select(["username", "fid", "proofTimestamp", "custodyAddress", "type"])
154+
.where("fid", "=", fid)
155+
.where("type", "=", proofType)
156+
.where("deletedAt", "is", null);
157+
158+
if (startDate) {
159+
query = query.where("proofTimestamp", ">", startDate);
160+
}
161+
if (stopDate) {
162+
query = query.where("proofTimestamp", "<", stopDate);
163+
}
164+
165+
if (hubProofs) {
166+
query = query.where(
167+
"username",
168+
"in",
169+
hubProofs.map((p) => bytesToHexString(p.name)._unsafeUnwrap()),
170+
);
171+
}
172+
173+
const results = await query.execute();
174+
175+
const proofs = results.map((row) => ({
176+
name: Buffer.from(row.username),
177+
type: row.type,
178+
fid: row.fid,
179+
timestamp: Math.floor(row.proofTimestamp.getTime() / 1000),
180+
signature: Buffer.from([]),
181+
owner: row.custodyAddress ? Buffer.from(row.custodyAddress) : Buffer.from([]),
182+
}));
183+
184+
const filteredProofs = proofType !== undefined ? proofs.filter((p) => p.type === proofType) : proofs;
185+
return ok(filteredProofs);
186+
} catch (e) {
187+
return err(new Error(`Failed to get username proofs from DB for FID ${fid}`, { cause: e }));
188+
}
189+
}
190+
191+
private async *allHubUsernameProofsOfTypeForFid(
192+
fid: number,
193+
proofType: UserNameType,
194+
startDate?: Date,
195+
stopDate?: Date,
196+
): AsyncGenerator<UserNameProof[]> {
197+
const hubProofsResult = await this.getProofsFromHub(fid, proofType, startDate, stopDate);
198+
if (hubProofsResult.isErr()) {
199+
throw hubProofsResult.error;
200+
}
201+
202+
// Filter proofs by timestamp if provided
203+
const filteredProofs = hubProofsResult.value.filter((proof: UserNameProof) => {
204+
if (startDate && proof.timestamp * 1000 < startDate.getTime()) return false;
205+
if (stopDate && proof.timestamp * 1000 > stopDate.getTime()) return false;
206+
return true;
207+
});
208+
209+
yield filteredProofs;
210+
}
211+
212+
private async allActiveDbProofsOfTypeForFid(fid: number, proofType: UserNameType, startDate?: Date, stopDate?: Date) {
213+
return this.getProofsFromDb(fid, proofType, startDate, stopDate);
214+
}
215+
216+
private async dbProofsMatchingHubProofs(
217+
fid: number,
218+
proofType: UserNameType,
219+
startDate?: Date,
220+
stopDate?: Date,
221+
hubProofs?: UserNameProof[],
222+
) {
223+
return this.getProofsFromDb(fid, proofType, startDate, stopDate, hubProofs);
224+
}
225+
226+
private getProofKey(proof: UserNameProof): string {
227+
// Only include identifying fields (name, owner, fid, type) - not timestamp
228+
const nameHex = bytesToHexString(proof.name)._unsafeUnwrap();
229+
const ownerHex = bytesToHexString(proof.owner)._unsafeUnwrap();
230+
return `${nameHex}-${proof.fid}-${ownerHex}-${proof.type}`;
231+
}
232+
}

0 commit comments

Comments
 (0)