Skip to content

feat(agentstore): put index in dynamo #470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions filecoin/functions/handle-cron-tick.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Sentry.AWSLambda.init({
const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

export async function handleCronTick () {
const { did, pieceTableName, agentMessageBucketName, agentIndexBucketName, aggregatorDid, storefrontProof } = getEnv()
const { did, pieceTableName, agentMessageBucketName, agentIndexBucketName, agentIndexTableName, aggregatorDid, storefrontProof } = getEnv()
const { PRIVATE_KEY: privateKey } = Config

// create context
Expand All @@ -40,8 +40,8 @@ export async function handleCronTick () {
const context = {
id,
pieceStore: createPieceTable(AWS_REGION, pieceTableName),
taskStore: createTaskStore(AWS_REGION, agentIndexBucketName, agentMessageBucketName),
receiptStore: createReceiptStore(AWS_REGION, agentIndexBucketName, agentMessageBucketName),
taskStore: createTaskStore(AWS_REGION, agentIndexTableName, agentIndexBucketName, agentMessageBucketName),
receiptStore: createReceiptStore(AWS_REGION, agentIndexTableName, agentIndexBucketName, agentMessageBucketName),
aggregatorId: DID.parse(aggregatorDid),
}

Expand All @@ -66,6 +66,7 @@ function getEnv () {
pieceTableName: mustGetEnv('PIECE_TABLE_NAME'),
agentMessageBucketName: mustGetEnv('AGENT_MESSAGE_BUCKET_NAME'),
agentIndexBucketName: mustGetEnv('AGENT_INDEX_BUCKET_NAME'),
agentIndexTableName: mustGetEnv('AGENT_INDEX_TABLE_NAME'),
aggregatorDid: mustGetEnv('AGGREGATOR_DID'),
storefrontProof: process.env.PROOF,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
metricsTableName,
agentMessageBucketName,
agentIndexBucketName,
agentIndexTableName,

Check failure on line 28 in filecoin/functions/metrics-aggregate-offer-and-accept-total.js

View workflow job for this annotation

GitHub Actions / Test

Property 'agentIndexTableName' does not exist on type '{ metricsTableName: string; agentMessageBucketName: string; agentIndexBucketName: string; startEpochMs: number | undefined; }'.
startEpochMs
} = getLambdaEnv()

const filecoinMetricsStore = createFilecoinMetricsTable(AWS_REGION, metricsTableName)
const workflowStore = createWorkflowStore(AWS_REGION, agentMessageBucketName)
const invocationStore = createInvocationStore(AWS_REGION, agentIndexBucketName)
const invocationStore = createInvocationStore(AWS_REGION, agentIndexBucketName, agentIndexTableName)

await Promise.all([
updateAggregateOfferTotal(ucanInvocations, {
Expand Down
28 changes: 22 additions & 6 deletions filecoin/store/invocation.js
Original file line number Diff line number Diff line change
@@ -1,35 +1,51 @@
import { parseLink } from '@ucanto/core'
import * as Store from '../../upload-api/stores/agent/store.js'
import { getS3Client } from '../../lib/aws/s3.js'
import { getDynamoClient } from '../../lib/aws/dynamo.js'

/**
* Abstraction layer with Factory to perform operations on bucket storing
* invocation receipts and indexes.
*
* @param {string} region
* @param {string} tableName
* @param {string} bucketName
* @param {Partial<import('../../lib/aws/s3.js').Address>} [options]
* @param {{
* s3Address?: Partial<import('../../lib/aws/s3.js').Address>
* dynamoAddress?: Partial<import('../../lib/aws/dynamo.js').Address>
* }} [options]
*/
export function createInvocationStore(region, bucketName, options = {}) {
export function createInvocationStore(region, tableName, bucketName, options = {}) {
const dynamoDBClient = getDynamoClient({
region,
...options.dynamoAddress,
})
const s3client = getS3Client({
region,
...options,
...options.s3Address,
})
return useInvocationStore(s3client, bucketName)

return useInvocationStore(dynamoDBClient, s3client, tableName, bucketName)
}

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoDBClient
* @param {import('@aws-sdk/client-s3').S3Client} s3client
* @param {string} tableName
* @param {string} bucketName
* @returns {import('../types.js').InvocationBucket}
*/
export const useInvocationStore = (s3client, bucketName) => {
export const useInvocationStore = (dynamoDBClient, s3client, tableName, bucketName) => {
const store = Store.open({
connection: { channel: s3client },
s3Connection: { channel: s3client },
dynamoDBConnection: { channel: dynamoDBClient },
region: typeof s3client.config.region === 'string' ? s3client.config.region : 'us-west-2',
buckets: {
index: { name: bucketName },
message: { name: bucketName },
},
tables: {
index: { name: tableName }
}
})

Expand Down
27 changes: 21 additions & 6 deletions filecoin/store/receipt.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,49 @@
import { StoreOperationFailed } from '@storacha/filecoin-api/errors'
import * as Store from '../../upload-api/stores/agent/store.js'
import { getS3Client } from '../../lib/aws/s3.js'
import { getDynamoClient } from '../../lib/aws/dynamo.js'

/**
* Abstraction layer with Factory to perform operations on bucket storing
* handled receipts.
*
* @param {string} region
* @param {string} agentIndexTableName
* @param {string} agentIndexBucketName
* @param {string} agentMessageBucketName
* @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options]
* @param {{
* s3Address?: Partial<import('../../lib/aws/s3.js').Address>
* dynamoAddress?: Partial<import('../../lib/aws/dynamo.js').Address>
* }} [options]
*/
export function createReceiptStore(region, agentIndexBucketName, agentMessageBucketName, options = {}) {
export function createReceiptStore(region, agentIndexTableName, agentIndexBucketName, agentMessageBucketName, options = {}) {
const dynamoDBClient = getDynamoClient({
region,
...options.dynamoAddress,
})
const s3client = getS3Client({
region,
...options,
...options.s3Address,
})
return useReceiptStore(s3client, agentIndexBucketName, agentMessageBucketName)
return useReceiptStore(dynamoDBClient, s3client, agentIndexTableName, agentIndexBucketName, agentMessageBucketName)
}

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoDBClient
* @param {import('@aws-sdk/client-s3').S3Client} s3client
* @param {string} agentIndexTableName
* @param {string} agentIndexBucketName
* @param {string} agentMessageBucketName
* @returns {import('@storacha/filecoin-api/storefront/api').ReceiptStore}
*/
export const useReceiptStore = (s3client, agentIndexBucketName, agentMessageBucketName) => {
export const useReceiptStore = (dynamoDBClient, s3client, agentIndexTableName, agentIndexBucketName, agentMessageBucketName) => {
const store = Store.open({
connection: { channel: s3client },
dynamoDBConnection: { channel: dynamoDBClient },
s3Connection: { channel: s3client },
region: typeof s3client.config.region === 'string' ? s3client.config.region : 'us-west-2',
tables: {
index: { name: agentIndexTableName }
},
buckets: {
index: { name: agentIndexBucketName },
message: { name: agentMessageBucketName },
Expand Down
25 changes: 20 additions & 5 deletions filecoin/store/task.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,52 @@
import { StoreOperationFailed } from '@storacha/filecoin-api/errors'
import * as Store from '../../upload-api/stores/agent/store.js'
import { getS3Client } from '../../lib/aws/s3.js'
import { getDynamoClient } from '../../lib/aws/dynamo.js'

/**
* Abstraction layer with Factory to perform operations on bucket storing
* handled Tasks and their indexes.
*
* @param {string} region
* @param {string} agentIndexTableName
* @param {string} agentIndexBucketName
* @param {string} agentMessageBucketName
* @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options]
* @param {{
* s3Address?: Partial<import('../../lib/aws/s3.js').Address>
* dynamoAddress?: Partial<import('../../lib/aws/dynamo.js').Address>
* }} [options]
*/
export function createTaskStore(region, agentIndexBucketName, agentMessageBucketName, options = {}) {
export function createTaskStore(region, agentIndexTableName, agentIndexBucketName, agentMessageBucketName, options = {}) {
const dynamoDBClient = getDynamoClient({
region,
...options.dynamoAddress,
})
const s3client = getS3Client({
region,
...options,
})
return useTaskStore(s3client, agentIndexBucketName, agentMessageBucketName)
return useTaskStore(dynamoDBClient, s3client, agentIndexTableName, agentIndexBucketName, agentMessageBucketName)
}

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoDBClient
* @param {import('@aws-sdk/client-s3').S3Client} s3client
* @param {string} agentIndexTableName
* @param {string} agentIndexBucketName
* @param {string} agentMessageBucketName
* @returns {import('@storacha/filecoin-api/storefront/api').TaskStore}
*/
export const useTaskStore = (s3client, agentIndexBucketName, agentMessageBucketName) => {
export const useTaskStore = (dynamoDBClient, s3client, agentIndexTableName, agentIndexBucketName, agentMessageBucketName) => {
const store = Store.open({
connection: { channel: s3client },
dynamoDBConnection: { channel: dynamoDBClient },
s3Connection: { channel: s3client },
region: typeof s3client.config.region === 'string' ? s3client.config.region : 'us-west-2',
buckets: {
index: { name: agentIndexBucketName },
message: { name: agentMessageBucketName },
},
tables: {
index: { name: agentIndexTableName }
}
})

Expand Down
1 change: 1 addition & 0 deletions filecoin/test/helpers/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import anyTest from 'ava'
* @typedef {object} DynamoContext
* @property {string} dbEndpoint
* @property {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoClient
* @property {import('../../../lib/aws/dynamo.js').Address} dynamoOpts
*
* @typedef {object} MultipleQueueContext
* @property {import('@aws-sdk/client-sqs').SQSClient} sqsClient
Expand Down
1 change: 1 addition & 0 deletions filecoin/test/helpers/resources.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export async function createDynamodDb(opts = {}) {
region,
endpoint
}),
region,
endpoint,
stop: () => dbContainer.stop(),
}
Expand Down
18 changes: 12 additions & 6 deletions filecoin/test/helpers/service-context.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import { TestContentStore } from './content-store.js'
// queue clients
import { createClient as createPieceOfferQueueClient } from '../../queue/piece-offer-queue.js'
import { createClient as createFilecoinSubmitQueueClient } from '../../queue/filecoin-submit-queue.js'
import { agentIndexTableProps } from '../../../upload-api/tables/index.js'

/**
* @param {import('./context.js').DynamoContext & import('./context.js').S3Context} ctx
*/
export async function getStores (ctx) {
const { dynamoClient, s3Client } = ctx
const pieceStore = await createDynamoTable(dynamoClient, pieceTableProps)
const invocationTableName = await createDynamoTable(dynamoClient, agentIndexTableProps)
const [ invocationBucketName, workflowBucketName ] = await Promise.all([
createBucket(s3Client),
createBucket(s3Client),
Expand All @@ -33,8 +35,8 @@ export async function getStores (ctx) {

return {
pieceStore: createPieceStoreClient(dynamoClient, pieceStore),
taskStore: getTaskStoreClient(s3Client, invocationBucketName, workflowBucketName),
receiptStore: getReceiptStoreClient(s3Client, invocationBucketName, workflowBucketName),
taskStore: getTaskStoreClient(dynamoClient, s3Client, invocationTableName, invocationBucketName, workflowBucketName),
receiptStore: getReceiptStoreClient(dynamoClient, s3Client, invocationTableName, invocationBucketName, workflowBucketName),
contentStore: testContentStore.contentStore,
testContentStore
}
Expand All @@ -55,13 +57,15 @@ export function getQueues (ctx) {
}

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoDBClient
* @param {import('@aws-sdk/client-s3').S3Client} s3client
* @param {string} invocationTableName
* @param {string} invocationBucketName
* @param {string} workflowBucketName
* @returns {import('@storacha/filecoin-api/storefront/api').TaskStore}
*/
function getTaskStoreClient (s3client, invocationBucketName, workflowBucketName) {
const taskStore = createTaskStoreClient(s3client, invocationBucketName, workflowBucketName)
function getTaskStoreClient (dynamoDBClient, s3client, invocationTableName, invocationBucketName, workflowBucketName) {
const taskStore = createTaskStoreClient(dynamoDBClient, s3client, invocationTableName, invocationBucketName, workflowBucketName)

return {
...taskStore,
Expand Down Expand Up @@ -96,13 +100,15 @@ function getTaskStoreClient (s3client, invocationBucketName, workflowBucketName)
}

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoDBClient
* @param {import('@aws-sdk/client-s3').S3Client} s3client
* @param {string} invocationTableName
* @param {string} invocationBucketName
* @param {string} workflowBucketName
* @returns {import('@storacha/filecoin-api/storefront/api').ReceiptStore}
*/
function getReceiptStoreClient (s3client, invocationBucketName, workflowBucketName) {
const receiptStore = createReceiptStoreClient(s3client, invocationBucketName, workflowBucketName)
function getReceiptStoreClient (dynamoDBClient, s3client, invocationTableName, invocationBucketName, workflowBucketName) {
const receiptStore = createReceiptStoreClient(dynamoDBClient, s3client, invocationTableName, invocationBucketName, workflowBucketName)

return {
...receiptStore,
Expand Down
3 changes: 2 additions & 1 deletion filecoin/test/metrics-aggregate-accept-total.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ test.before(async t => {
// Dynamo DB
const {
client: dynamo,
region,
endpoint: dbEndpoint
} = await createDynamodDb({ port: 8000 })
t.context.dbEndpoint = dbEndpoint
t.context.dynamoClient = dynamo

t.context.dynamoOpts = { region, endpoint: dbEndpoint}
// S3
const { client, clientOpts } = await createS3()
t.context.s3Client = client
Expand Down
Loading
Loading