Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90,483 changes: 66,521 additions & 23,962 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions packages/cli/src/ceramic-daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export function makeCeramicConfig(opts: DaemonConfig): CeramicConfig {
syncOverride: SYNC_OPTIONS_MAP[opts.node.syncOverride],
streamCacheLimit: opts.node.streamCacheLimit,
indexing: opts.indexing,
reconUrl: opts.node.reconUrl,
}
if (opts.stateStore?.mode == StateStoreMode.FS) {
ceramicConfig.stateStoreDirectory = opts.stateStore.localDirectory
Expand Down
6 changes: 6 additions & 0 deletions packages/cli/src/daemon-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,12 @@ export class DaemonCeramicNodeConfig {
*/
@jsonMember(Number, { name: 'stream-cache-limit' })
streamCacheLimit?: number

/**
* If set, experimental recon is enabled and uses another node to run recon.
*/
@jsonMember(String, { name: 'recon-url' })
reconUrl?: string
}

/**
Expand Down
3 changes: 2 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@
"pg-boss": "^8.2.0",
"rxjs": "^7.5.2",
"sqlite3": "^5.0.8",
"uint8arrays": "^4.0.3"
"uint8arrays": "^4.0.3",
"zcgen-client": "^0.0.5"
},
"devDependencies": {
"@ceramicnetwork/3id-did-resolver": "^2.23.0-rc.0",
Expand Down
21 changes: 20 additions & 1 deletion packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import { SyncApi } from './sync/sync-api.js'
import { ProvidersCache } from './providers-cache.js'
import crypto from 'crypto'
import { SyncJobData } from './sync/interfaces.js'
import { ReconApi, ReconApiHTTP } from './recon.js'

const DEFAULT_CACHE_LIMIT = 500 // number of streams stored in the cache
const DEFAULT_QPS_LIMIT = 10 // Max number of pubsub query messages that can be published per second without rate limiting
Expand Down Expand Up @@ -141,6 +142,8 @@ export interface CeramicConfig {
useCentralizedPeerDiscovery?: boolean
syncOverride?: SyncOptions

reconUrl?: string

[index: string]: any // allow arbitrary properties
}

Expand All @@ -160,6 +163,7 @@ export interface CeramicModules {
repository: Repository
shutdownSignal: ShutdownSignal
providersCache: ProvidersCache
recon: ReconApi | null
}

/**
Expand Down Expand Up @@ -218,6 +222,7 @@ export class Ceramic implements CeramicApi {
private readonly anchorResumingService: AnchorResumingService
private readonly providersCache: ProvidersCache
private readonly syncApi: SyncApi
readonly recon: ReconApi

readonly _streamHandlers: HandlersMap
private readonly _anchorValidator: AnchorValidator
Expand Down Expand Up @@ -292,6 +297,7 @@ export class Ceramic implements CeramicApi {
anchorService: modules.anchorService,
conflictResolution: conflictResolution,
indexing: localIndex,
recon: modules.recon,
})
this.syncApi = new SyncApi(
{
Expand All @@ -305,7 +311,15 @@ export class Ceramic implements CeramicApi {
)
const pinApi = this._buildPinApi()
this.repository.index.setSyncQueryApi(this.syncApi)
this.admin = new LocalAdminApi(localIndex, this.syncApi, this.nodeStatus.bind(this), pinApi)
this.recon = modules.recon

this.admin = new LocalAdminApi(
localIndex,
this.syncApi,
this.nodeStatus.bind(this),
pinApi,
this.recon
)
}

get index(): LocalIndexApi {
Expand Down Expand Up @@ -537,6 +551,10 @@ export class Ceramic implements CeramicApi {
maxQueriesPerSecond
)

const recon = config.reconUrl
? new ReconApiHTTP(config.reconUrl, config.networkName, repository, dispatcher, logger)
: null

const params: CeramicParameters = {
gateway: config.gateway,
stateStoreDirectory: config.stateStoreDirectory,
Expand All @@ -557,6 +575,7 @@ export class Ceramic implements CeramicApi {
repository,
shutdownSignal,
providersCache,
recon,
}

return [modules, params]
Expand Down
6 changes: 4 additions & 2 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,14 @@ export class Dispatcher {
* @param cid - Commit CID
* @param streamId - StreamID of the stream the commit belongs to, used for logging.
*/
async retrieveCommit(cid: CID | string, streamId: StreamID): Promise<any> {
async retrieveCommit(cid: CID | string, streamId?: StreamID): Promise<any> {
try {
return await this._getFromIpfs(cid)
} catch (e) {
this._logger.err(
`Error while loading commit CID ${cid.toString()} from IPFS for stream ${streamId.toString()}: ${e}`
`Error while loading commit CID ${cid.toString()} from IPFS for stream ${
streamId ? streamId.toString() : ''
}: ${e}`
)
throw e
}
Expand Down
12 changes: 9 additions & 3 deletions packages/core/src/local-admin-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import { StreamID } from '@ceramicnetwork/streamid'
import { LocalIndexApi } from './indexing/local-index-api.js'
import { SyncApi } from './sync/sync-api.js'
import { ReconApi } from './recon.js'

type NodeStatusFn = () => Promise<NodeStatusResponse>

Expand All @@ -19,7 +20,8 @@ export class LocalAdminApi implements AdminApi {
private readonly indexApi: LocalIndexApi,
private readonly syncApi: SyncApi,
private readonly nodeStatusFn: NodeStatusFn, // TODO(CDB-2293): circular dependency back into Ceramic
private readonly pinApi: PinApi
private readonly pinApi: PinApi,
private readonly recon: ReconApi | undefined
) {}

async nodeStatus(): Promise<NodeStatusResponse> {
Expand All @@ -32,7 +34,9 @@ export class LocalAdminApi implements AdminApi {

async startIndexingModelData(modelData: Array<ModelData>): Promise<void> {
await this.indexApi.indexModels(modelData)
await this.syncApi.startModelSync(modelData.map((idx) => idx.streamID.toString()))
const ids = modelData.map((idx) => idx.streamID.toString())
await this.syncApi.startModelSync(ids)
if (this.recon) ids.forEach(this.recon.subscribe.bind(this.recon))
}

async getIndexedModels(): Promise<Array<StreamID>> {
Expand All @@ -49,10 +53,12 @@ export class LocalAdminApi implements AdminApi {
}

async stopIndexingModelData(modelData: Array<ModelData>): Promise<void> {
const ids = modelData.map((idx) => idx.streamID.toString())
await Promise.all([
this.indexApi.stopIndexingModels(modelData),
this.syncApi.stopModelSync(modelData.map((data) => data.streamID.toString())),
this.syncApi.stopModelSync(ids),
])
if (this.recon) ids.forEach(this.recon.unsubscribe.bind(this.recon))
}

get pin(): PinApi {
Expand Down
193 changes: 193 additions & 0 deletions packages/core/src/recon.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import { EventID, StreamID } from '@ceramicnetwork/streamid'
import { DiagnosticsLogger } from '@ceramicnetwork/common'
import { Repository } from './state-management/repository.js'
import * as ReconClient from 'zcgen-client'
import { Dispatcher } from './dispatcher.js'
import { ModelInstanceDocument } from '@ceramicnetwork/stream-model-instance'
import { from, repeat, timer, switchMap, tap, Subscription } from 'rxjs'
import { retry } from 'rxjs/operators'

/**
* Recon Event
*/
export interface Event {
eventId: string
}

/**
* Describes Recon Service API
*/
export interface ReconApi {
readonly networkName: string
/**
* Recon subscription, subscribe by model
*/
subscribe(model: string): Subscription
/**
* Add event to recon
*/
addEvent(event: Event): Promise<void>
/**
* Unsubscribe to subscription by model
*/
unsubscribe(model: string): void
/**
* Close and unsubscribe to all
*/
close(): void
}

/**
* Recon subscription manager, manages simple map of models to subscriptions
*/
export interface SubManager {
/**
* Add active subscription
*/
add(model: string, sub: Subscription): void
/**
* Get subscription by model
*/
get(model: string): Subscription | undefined
/**
* Unsubscribe
*/
unsubscribe(model: string): void
/**
* Unsubscribe to all known subscriptions
*/
close(): void
}

export class ReconSubManager implements SubManager {
private readonly subscriptions: Record<string, Subscription>

constructor(private readonly logger: DiagnosticsLogger) {
this.subscriptions = {}
}

add(model: string, sub: Subscription): void {
this.subscriptions[model] = sub
this.logger.verbose(`Recon: subscription for model ${model} added`)
}

get(model: string): Subscription | undefined {
return this.subscriptions[model]
}

unsubscribe(model: string): void {
const sub = this.get(model)
if (!sub) return
sub.unsubscribe()
delete this.subscriptions[model]
this.logger.verbose(`Recon: unsubscribed for model ${model}`)
}

close(): void {
Object.keys(this.subscriptions).forEach((model) => {
this.unsubscribe(model)
})
this.logger.verbose(`Recon: closing, unsubscribed to all`)
}
}

/**
* Recon API
*/
export class ReconApiHTTP implements ReconApi {
private readonly api: ReconClient.DefaultApi
private readonly subscriptions: ReconSubManager

constructor(
url: string,
readonly networkName: string,
private readonly repository: Repository,
private readonly dispatcher: Dispatcher,
private readonly logger: DiagnosticsLogger
) {
const baseServer = new ReconClient.ServerConfiguration(url, {})
const config = ReconClient.createConfiguration({ baseServer })
this.api = new ReconClient.DefaultApi(config)
this.subscriptions = new ReconSubManager(logger)
}

subscribe(model: string): Subscription {
if (this.subscriptions.get(model)) return this.subscriptions.get(model)

let offset = 0
const increaseOffset = (val: number): void => {
offset += val
}

const obv$ = from(
this.api.ceramicSubscribeSortKeySortValueGet(
'model',
model,
undefined,
undefined,
offset,
1000
)
).pipe(
tap((arr) => increaseOffset(arr.length)),
switchMap(from),
repeat({ delay: 200 }),
retry({
delay: (error, count) => {
this.logger.warn(`Recon: subscription failed for model ${model}, attempting to retry`)
// exp backoff, max 3 minutes
return timer(count > 11 ? 3 * 60 * 1000 : 2 ^ (count * 100))
},
resetOnSuccess: true,
})
)

// in future could return observable, handler added here to keep recon code together for now
const sub = obv$.subscribe(this._eventHandler)
this.subscriptions.add(model, sub)
return sub
}

unsubscribe(model: string): void {
this.subscriptions.unsubscribe(model)
}

close(): void {
this.subscriptions.close()
}

// messy here, so that recon changes are minimized for now and uses existing apis,
// model/streamids used for lots of caching, but could later be implemented w/o or recon based
async _eventHandler(event: string): Promise<void> {
const eventId = EventID.fromString(event)
const commit = await this.dispatcher.retrieveCommit(eventId.event)

let header, gcid
if (commit.proof) {
gcid = commit.id
} else if (commit.id) {
const genesis = await this.dispatcher.retrieveCommit(commit.id)
header = genesis.header
gcid = commit.id
} else {
header = commit.header
gcid = eventId.event
}

const model = header ? StreamID.fromBytes(header.model) : undefined
// assumes model instance
const streamid = new StreamID(ModelInstanceDocument.STREAM_TYPE_ID, gcid)

this.logger.verbose(`Recon: received eventID ${eventId.toString()} for streamId ${streamid}`)
await this.repository.stateManager.handleUpdate(streamid, eventId.event, model)
}

async addEvent(event: Event): Promise<void> {
try {
await this.api.ceramicEventsPost(event)
this.logger.verbose(`Recon: added event ${event.eventId}`)
} catch (err) {
this.logger.err(`Recon: failed to add event ${event.eventId}`)
}
}
}
Loading