Skip to content

Commit a9ec283

Browse files
lucas-neynartybook
andcommitted
feat: username proof reconciliation [NEYN-1422] (#3)
* feat: add hub event reconciler * refactor: rename hubEventReconciliation to onChainEventReconciliation * fix: add event body types * fix: update logs and errors * fix: use built-in error method and fix build error with example-app * fix: restore example-app * fix: remove test code * fix: update processHubEvent logic * fix: type error * fix: remove empty file * feat: update onChainEventReconciliation to use maps * Update packages/shuttle/src/shuttle/onChainEventReconciliation.ts Co-authored-by: Ty Book <tbook11@gmail.com> * fix: rename functions and remove unnecessary logic * fix: move onchain_events table to HubTables * fix: Onchain to OnChain * fix: rename more things to onChain pattern, add support for SignerMigrated event, and fix example insert * fix: remove unused import * refactor: rename to onChain pattern * fix: update date comparison logic * fix: set fid to zero for signer migrated events * Revert "fix: set fid to zero for signer migrated events" This reverts commit d21fdbb. * fix: set fid to zero for signer migrated event lookups * fix: set fid to zero for signer migrated events before hubble queries * feat: add support for username proof reconciliation * fix: build error * feat: add wrapper for processing multiple username proof types * fix: rename name column to username * fix: convert farcaster time to date * feat: index username proof type * Remove redundant lines * fix: reconciliation logic * update yarn.lock * remove yarn.lock * PR feedback --------- Co-authored-by: Ty Book <tbook11@gmail.com> Co-authored-by: Ty Book <ty@neynar.com>
1 parent ef89c5c commit a9ec283

3 files changed

Lines changed: 253 additions & 0 deletions

File tree

packages/shuttle/src/shuttle/db.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,13 +159,29 @@ export type OnChainEventsTable = {
159159
body: IdRegisterEventBody | SignerEventBody | StorageRentEventBody | SignerMigratedEventBody;
160160
};
161161

162+
type UsernamesTable = {
163+
id: Generated<string>;
164+
createdAt: Generated<Date>;
165+
updatedAt: Generated<Date>;
166+
deletedAt: Date | null;
167+
fid: Fid;
168+
username: string;
169+
custodyAddress: Uint8Array | null;
170+
proofTimestamp: Date;
171+
type: UserNameType;
172+
};
173+
162174
export type OnChainEventRow = Selectable<OnChainEventsTable>;
163175
export type InsertableOnChainEventRow = Insertable<OnChainEventsTable>;
164176

177+
export type UsernameRow = Selectable<UsernamesTable>;
178+
export type InsertableUsernameRow = Insertable<UsernamesTable>;
179+
165180
// ALL TABLES -------------------------------------------------------------------------------------
166181
export interface HubTables {
167182
messages: MessagesTable;
168183
onchain_events: OnChainEventsTable;
184+
usernames: UsernamesTable;
169185
}
170186

171187
export const getDbClient = (connectionString?: string, schema = "public") => {

packages/shuttle/src/shuttle/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export * from "./hubEventProcessor";
99
export * from "./messageProcessor";
1010
export * from "./messageReconciliation";
1111
export * from "./onChainEventReconciliation";
12+
export * from "./usernameProofReconciliation";
1213
export * from "./eventStream";
1314

1415
export type StoreMessageOperation = "merge" | "delete" | "revoke" | "prune";
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
import {
2+
type HubRpcClient,
3+
type UserNameProof,
4+
UserNameType,
5+
bytesToHexString,
6+
fromFarcasterTime,
7+
} from "@farcaster/hub-nodejs";
8+
import { type DB } from "./db";
9+
import { Logger } from "pino";
10+
import { ok, err, Result } from "neverthrow";
11+
12+
export class UsernameProofReconciliation {
13+
private readonly hubClient: HubRpcClient;
14+
private readonly db: DB;
15+
private readonly log: Logger;
16+
17+
constructor(hubClient: HubRpcClient, db: DB, log: Logger) {
18+
this.hubClient = hubClient;
19+
this.db = db;
20+
this.log = log;
21+
}
22+
23+
async reconcileUsernameProofsForFid(
24+
fid: number,
25+
onHubProof: (proof: UserNameProof, missingInDb: boolean) => Promise<void>,
26+
onDbProof?: (proof: UserNameProof, missingInHub: boolean) => Promise<void>,
27+
startTimestamp?: number,
28+
stopTimestamp?: number,
29+
types?: UserNameType[],
30+
) {
31+
let startDate: Date | undefined;
32+
if (startTimestamp) {
33+
const startUnixTimestampResult = fromFarcasterTime(startTimestamp);
34+
if (startUnixTimestampResult.isErr()) {
35+
this.log.error({ fid, types, startTimestamp, stopTimestamp }, "Invalid time range provided to reconciliation");
36+
return;
37+
}
38+
39+
startDate = new Date(startUnixTimestampResult.value);
40+
}
41+
42+
let stopDate: Date | undefined;
43+
if (stopTimestamp) {
44+
const stopUnixTimestampResult = fromFarcasterTime(stopTimestamp);
45+
if (stopUnixTimestampResult.isErr()) {
46+
this.log.error({ fid, types, startTimestamp, stopTimestamp }, "Invalid time range provided to reconciliation");
47+
return;
48+
}
49+
50+
stopDate = new Date(stopUnixTimestampResult.value);
51+
}
52+
53+
for (const proofType of types ?? [
54+
UserNameType.USERNAME_TYPE_FNAME,
55+
UserNameType.USERNAME_TYPE_ENS_L1,
56+
UserNameType.USERNAME_TYPE_BASENAME,
57+
]) {
58+
this.log.debug({ fid, proofType, startTimestamp, stopTimestamp }, "Reconciling username proofs for FID");
59+
await this.reconcileUsernameProofsOfTypeForFid(fid, proofType, onHubProof, onDbProof, startDate, stopDate);
60+
}
61+
}
62+
63+
async reconcileUsernameProofsOfTypeForFid(
64+
fid: number,
65+
proofType: UserNameType,
66+
onHubProof: (proof: UserNameProof, missingInDb: boolean) => Promise<void>,
67+
onDbProof?: (proof: UserNameProof, missingInHub: boolean) => Promise<void>,
68+
startDate?: Date,
69+
stopDate?: Date,
70+
): Promise<void> {
71+
const hubProofsByKey = new Map<string, UserNameProof>();
72+
73+
// First, reconcile proofs that are in the hub but not in the database
74+
for await (const proofs of this.allHubUsernameProofsOfTypeForFid(fid, proofType, startDate, stopDate)) {
75+
const proofKeys = proofs.map((proof: UserNameProof) => this.getProofKey(proof));
76+
77+
if (proofKeys.length === 0) {
78+
this.log.debug(
79+
{ fid, proofType, startDate: startDate?.toISOString(), stopDate: stopDate?.toISOString() },
80+
"No username proofs found in hub",
81+
);
82+
continue;
83+
}
84+
85+
const dbProofs = await this.dbProofsMatchingHubProofs(fid, proofType, startDate, stopDate, proofs);
86+
if (dbProofs.isErr()) {
87+
throw dbProofs.error;
88+
}
89+
90+
const dbProofsByKey = new Map<string, UserNameProof>();
91+
for (const proof of dbProofs.value) {
92+
const key = this.getProofKey(proof);
93+
dbProofsByKey.set(key, proof);
94+
}
95+
96+
for (const proof of proofs) {
97+
const proofKey = this.getProofKey(proof);
98+
hubProofsByKey.set(proofKey, proof);
99+
100+
const dbProof = dbProofsByKey.get(proofKey);
101+
await onHubProof(proof, dbProof === undefined);
102+
}
103+
}
104+
105+
// Next, reconcile proofs that are in the database but not in the hub
106+
if (onDbProof) {
107+
const dbProofs = await this.allActiveDbProofsOfTypeForFid(fid, proofType, startDate, stopDate);
108+
if (dbProofs.isErr()) {
109+
this.log.error(
110+
{ fid, proofType, startDate: startDate?.toISOString(), stopDate: stopDate?.toISOString() },
111+
"Invalid time range provided to reconciliation",
112+
);
113+
return;
114+
}
115+
116+
for (const dbProof of dbProofs.value) {
117+
const key = this.getProofKey(dbProof);
118+
await onDbProof(dbProof, !hubProofsByKey.has(key));
119+
}
120+
}
121+
}
122+
123+
private async getProofsFromHub(
124+
fid: number,
125+
proofType: UserNameType,
126+
startDate?: Date,
127+
stopDate?: Date,
128+
): Promise<Result<UserNameProof[], Error>> {
129+
const result = await this.hubClient.getUserNameProofsByFid({ fid });
130+
if (result.isErr()) {
131+
return err(new Error(`Unable to get username proofs for FID ${fid}`, { cause: result.error }));
132+
}
133+
134+
let proofs = result.value.proofs.filter((proof) => proof.type === proofType);
135+
136+
if (startDate || stopDate) {
137+
proofs = proofs.filter((proof) => {
138+
if (startDate !== undefined && proof.timestamp * 1000 < startDate.getTime()) return false;
139+
if (stopDate !== undefined && proof.timestamp * 1000 > stopDate.getTime()) return false;
140+
return true;
141+
});
142+
}
143+
144+
return ok(proofs);
145+
}
146+
147+
private async getProofsFromDb(
148+
fid: number,
149+
proofType: UserNameType,
150+
startDate?: Date,
151+
stopDate?: Date,
152+
hubProofs?: UserNameProof[],
153+
): Promise<Result<UserNameProof[], Error>> {
154+
try {
155+
let query = this.db
156+
.selectFrom("usernames")
157+
.select(["username", "fid", "proofTimestamp", "custodyAddress", "type"])
158+
.where("fid", "=", fid)
159+
.where("type", "=", proofType)
160+
.where("deletedAt", "is", null);
161+
162+
if (startDate) {
163+
query = query.where("proofTimestamp", ">", startDate);
164+
}
165+
if (stopDate) {
166+
query = query.where("proofTimestamp", "<", stopDate);
167+
}
168+
169+
if (hubProofs) {
170+
query = query.where(
171+
"username",
172+
"in",
173+
hubProofs.map((p) => bytesToHexString(p.name)._unsafeUnwrap()),
174+
);
175+
}
176+
177+
const results = await query.execute();
178+
179+
const proofs = results.map((row) => ({
180+
name: Buffer.from(row.username),
181+
type: row.type,
182+
fid: row.fid,
183+
timestamp: Math.floor(row.proofTimestamp.getTime() / 1000),
184+
signature: Buffer.from([]),
185+
owner: row.custodyAddress ? Buffer.from(row.custodyAddress) : Buffer.from([]),
186+
}));
187+
188+
const filteredProofs = proofType !== undefined ? proofs.filter((p) => p.type === proofType) : proofs;
189+
return ok(filteredProofs);
190+
} catch (e) {
191+
return err(new Error(`Failed to get username proofs from DB for FID ${fid}`, { cause: e }));
192+
}
193+
}
194+
195+
private async *allHubUsernameProofsOfTypeForFid(
196+
fid: number,
197+
proofType: UserNameType,
198+
startDate?: Date,
199+
stopDate?: Date,
200+
): AsyncGenerator<UserNameProof[]> {
201+
const hubProofsResult = await this.getProofsFromHub(fid, proofType, startDate, stopDate);
202+
if (hubProofsResult.isErr()) {
203+
throw hubProofsResult.error;
204+
}
205+
206+
// Filter proofs by timestamp if provided
207+
const filteredProofs = hubProofsResult.value.filter((proof: UserNameProof) => {
208+
if (startDate && proof.timestamp * 1000 < startDate.getTime()) return false;
209+
if (stopDate && proof.timestamp * 1000 > stopDate.getTime()) return false;
210+
return true;
211+
});
212+
213+
yield filteredProofs;
214+
}
215+
216+
private async allActiveDbProofsOfTypeForFid(fid: number, proofType: UserNameType, startDate?: Date, stopDate?: Date) {
217+
return this.getProofsFromDb(fid, proofType, startDate, stopDate);
218+
}
219+
220+
private async dbProofsMatchingHubProofs(
221+
fid: number,
222+
proofType: UserNameType,
223+
startDate?: Date,
224+
stopDate?: Date,
225+
hubProofs?: UserNameProof[],
226+
) {
227+
return this.getProofsFromDb(fid, proofType, startDate, stopDate, hubProofs);
228+
}
229+
230+
private getProofKey(proof: UserNameProof): string {
231+
// Only include identifying fields (name, owner, fid, type) - not timestamp
232+
const nameHex = bytesToHexString(proof.name)._unsafeUnwrap();
233+
const ownerHex = bytesToHexString(proof.owner)._unsafeUnwrap();
234+
return `${nameHex}-${proof.fid}-${ownerHex}-${proof.type}`;
235+
}
236+
}

0 commit comments

Comments
 (0)