Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/generalized-indexer/.mocharc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"extension": [
"ts"
],
"spec": "**/*.test.ts",
"require": [
"ts-node/register"
],
"recursive": true
}
29 changes: 29 additions & 0 deletions packages/generalized-indexer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Package Name
This is meant to be a template for quickly adding new package libraries. Replace this readme with relevant information for your package.

## Adding a new package
1. go into the package folder in this repo.
2. cp -r template your_package_name
3. Edit the package.json file renaming the "name" property.
4. add dependencies as needed.

## Template features
This template will set you up with typescript, eslint, prettier, some basic scripts, and a main file entry point.

### Scripts
- build - typescript build and output to dist
- watch - build watching for changes
- format - prettier code fixing
- lint - eslint code fixing
- fix - eslint and prettier code fixing
- lint:check - eslint code checking ( no changes )
- format:check - prettier code checking ( no changes )
- build:check - run type check without emitting files
- check - eslint and prettier and typescript code checking ( no changes )
- test - run mocha testing
- test:watch - run mocha testing
- coverage - see testing coverage

### Adding tests
Add a `example.test.ts` file to any folder and mocha will find it.
** note: chai v5 breaks typescript support, so we explicitly use chai 4 **
6 changes: 6 additions & 0 deletions packages/generalized-indexer/eslint.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// .eslintrc.js in the new package
module.exports = {
root:true,
extends: ['@repo/eslint-config/library.js'],
};

47 changes: 47 additions & 0 deletions packages/generalized-indexer/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"name": "@repo/generalized-indexer",
"version": "0.0.1",
"description": "",
"main": "index.js",
"scripts": {
"build": "tsc -b",
"build:check": "tsc --noEmit",
"watch": "tsc -b --watch",
"fix": "pnpm format && pnpm lint",
"format": "prettier --write src",
"format:check": "prettier src --check",
"lint": "eslint --fix",
"lint:check": "eslint",
"check": "pnpm format:check && pnpm lint:check && pnpm build:check",
"test": "mocha",
"coverage": "nyc mocha",
"test:watch": "mocha --watch"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@types/lodash": "^4.17.7",
"lodash": "^4.17.21",
"superstruct": "2.0.3-1"
},
"exports": {
".": "./dist/index.js"
},
"devDependencies": {
"@istanbuljs/nyc-config-typescript": "^1.0.2",
"@repo/eslint-config": "workspace:*",
"@repo/typescript-config": "workspace:*",
"@types/chai": "^4.3.17",
"@types/mocha": "^10.0.7",
"@types/node": "^16.11.10",
"chai": "^4.5.0",
"eslint": "^8.57.0",
"mocha": "^10.7.0",
"nyc": "^17.0.0",
"prettier": "^3.3.3",
"source-map-support": "^0.5.21",
"ts-node": "^10.9.2",
"typescript": "^5.5.4"
}
}
65 changes: 65 additions & 0 deletions packages/generalized-indexer/src/cursor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { expect } from "chai";
import { AsyncSortedKVStore } from "./sorted-kv";
import { Cursor } from "./cursor";

describe("Cursor", function () {
let store: AsyncSortedKVStore<string>;
let cursor: Cursor;

beforeEach(async function () {
store = new AsyncSortedKVStore<string>();
cursor = new Cursor(store);
});

it("should initialize with the first key", async function () {
await store.set("key1", "value1");
await store.set("key2", "value2");
const firstKey = await cursor.get();
expect(firstKey).to.equal("key1");
});

it("should move to the next key", async function () {
await store.set("key1", "value1");
await store.set("key2", "value2");
expect(await cursor.get()).to.equal("key1");
expect(await cursor.increment()).to.equal("key2");
});

it("should return undefined if there is no next key", async function () {
await store.set("key1", "value1");
const nextKey = await cursor.get();
expect(nextKey).to.equal("key1");
expect(await cursor.increment()).to.equal(undefined);
});

it("should emit change event when a lower key is set", async function () {
let rewindEventTriggered = false;
cursor.on("change", (key) => {
rewindEventTriggered = true;
expect(key).to.equal("key1");
});

await store.set("key2", "value2");
const n1 = await cursor.get();
expect(n1).to.equal("key2");
const n2 = await cursor.increment();
expect(n2).to.equal(undefined);
await store.set("key1", "value1"); // Setting a lower key
expect(rewindEventTriggered).to.be.true;
const peek = await cursor.get();
expect(peek).to.equal("key1");
});

it("should not emit rewind event if cursor does not change", async function () {
let rewindEventTriggered = false;
cursor.on("change", () => {
rewindEventTriggered = true;
});

await store.set("key1", "value1");
await cursor.increment();
await store.set("key1", "value1"); // Setting the same key again

expect(rewindEventTriggered).to.be.false;
});
});
57 changes: 57 additions & 0 deletions packages/generalized-indexer/src/cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import Events from "events";
import assert from "assert";
import { IKeyManager } from "./sorted-kv";

// cursor tracks what key in the table we need to process next.
// internally the cursor tracks the key before the current key, and if its set to undefined, that implies its set to the start
export class Cursor extends Events {
private table: IKeyManager;
// cursor represents our currently processed point
// if undefined, we have not started it ie at beginning
private cursor?: string;

constructor(table: IKeyManager, cursor?: string) {
super();
this.table = table;
this.cursor = cursor;

// Listen to 'set' and 'delete' events from the table
this.table.on("change", (key?: string) => {
this.handleTableChange(key);
});
}

private async handleTableChange(key?: string): Promise<void> {
// cursor is undefined, which means we are already at beginning
if (this.cursor === undefined) return;
// if key undefined, that means table is empty
if (key === undefined) {
this.cursor = undefined;
} else if (key <= this.cursor) {
// if key is less than or equal to cursor, this means we need to go back
// since we assume we have processed up to cursor
// set cursor to the key before this one, because actually cursor points to the spot before active key
this.cursor = await this.table.prevKey(key);
} else {
return;
}
this.emit("change", this.cursor);
}

public async increment(): Promise<string | undefined> {
this.cursor = await this.get();
return this.get();
}
public async get(): Promise<string | undefined> {
// cursor is not set, return first element in table
if (this.cursor === undefined) return this.table.firstKey();
// otherwise return the element after the cursor
return this.table.nextKey(this.cursor);
}
public async getPrev(): Promise<string | undefined> {
return this.cursor;
}
public set(newCursor: string | undefined): void {
this.cursor = newCursor;
}
}
65 changes: 65 additions & 0 deletions packages/generalized-indexer/src/examples/balances.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { IndexerFactory } from "../factory";
import { AsyncSortedKVStore } from "../sorted-kv";
import { intToKey, sleep } from "../utils";
type BalanceChangeEvent = {
account: string;
time: number;
diff: number;
};

const balanceChangeEvents: BalanceChangeEvent[] = [
{ account: "account1", time: 1, diff: 100 },
{ account: "account2", time: 2, diff: 200 },
{ account: "account1", time: 3, diff: -50 },
{ account: "account3", time: 4, diff: 300 },
{ account: "account2", time: 5, diff: -100 },
{ account: "account1", time: 6, diff: 150 },
{ account: "account3", time: 7, diff: -200 },
{ account: "account2", time: 8, diff: 50 },
{ account: "account1", time: 9, diff: 100 },
{ account: "account3", time: 10, diff: 250 },
];

const tables = [new AsyncSortedKVStore<BalanceChangeEvent>()];
async function sequencerCb(keys: string[]): Promise<number> {
// since we are dealing with a single event stream, you will always return the first stream
return 0;
}
async function reducerCb(
locator: [number, string],
get: (prop: string) => Promise<string | undefined>,
index: number | undefined,
): Promise<Record<string, string> | undefined> {
const [tableIndex, key] = locator;
const event = await tables[tableIndex]?.get(key);
// no event found do nothing
if (event === undefined) return;
// get previously stored balance from reducer for this account, could be undefined if never set, so assume 0
const prevBalance = Number((await get(event.account)) ?? "0");
// get the diff from the event
const nextBalance = prevBalance + event.diff;
// store account updates to teh balance
return {
[event.account]: nextBalance.toString(),
};
}

// example runs through the events in order (no re-org), pushing events from the start and getting to final balances
async function run() {
const indexer = IndexerFactory(tables, sequencerCb, reducerCb);

for (const event of balanceChangeEvents) {
await tables[0]?.set(intToKey(event.time), event);
await sleep();
}

const balances = {
account1: await indexer.reducer.get("account1"),
account2: await indexer.reducer.get("account2"),
account3: await indexer.reducer.get("account3"),
};

return balances;
}

run().then(console.log).catch(console.error);
27 changes: 27 additions & 0 deletions packages/generalized-indexer/src/factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { AsyncSortedKVStore, IKeyManager } from "./sorted-kv";
import { Sequencer } from "./sequencer";
import { Cursor } from "./cursor";
import * as Reducer from "./reducer";
type SequencerCb = (keys: Array<string>) => Promise<number>;

export function IndexerFactory(
tables: IKeyManager[],
sequencerCb: SequencerCb,
reducerCb: Reducer.ReducerCb,
) {
const cursors = tables.map((table) => new Cursor(table));
const sequencer = new Sequencer(cursors, sequencerCb);
const reducer = new Reducer.Reducer(sequencer, reducerCb);

tables.forEach((table) => {
table.on("change", () => {
sequencer.tick();
});
});

sequencer.on("change", () => {
reducer.tick();
});

return { cursors, sequencer, reducer };
}
1 change: 1 addition & 0 deletions packages/generalized-indexer/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./main";
10 changes: 10 additions & 0 deletions packages/generalized-indexer/src/main.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { expect } from "chai";

import * as main from "./main";

describe("main", () => {
it("should return true", async () => {
const result = await main.Main({});
expect(result).to.be.true;
});
});
4 changes: 4 additions & 0 deletions packages/generalized-indexer/src/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export async function Main(env: Record<string, string | undefined>) {
console.log("Main running");
return true;
}
Loading
Loading