|
| 1 | +import { DataPropertyNames } from 'objection' |
| 2 | +import pLimit from 'p-limit' |
| 3 | + |
| 4 | +import { BadUserInputError } from '@/errors/graphql-errors' |
| 5 | +import { castGsiValue, GSIS, TableRow } from '@/models/dynamodb/table-row' |
| 6 | +import TableCollaborator from '@/models/table-collaborators' |
| 7 | +import TableColumnMetadata from '@/models/table-column-metadata' |
| 8 | + |
| 9 | +import type { AdminMutationResolvers } from '../../__generated__/types.generated' |
| 10 | + |
| 11 | +const DEFAULT_PATCH_LIMIT = 1000 |
| 12 | + |
| 13 | +/** |
| 14 | + * This function patches the relevant column into the sort key for a given GSI |
| 15 | + * |
| 16 | + * @input |
| 17 | + * tableId: string - ID of table to patch |
| 18 | + * columnId: string - ID of column to patch from |
| 19 | + * limit: number - Maximum number of rows to patch |
| 20 | + * returnCount: boolean - Whether to return the number of rows left to patch |
| 21 | + * (this will usually be set to true for the first call) |
| 22 | + * |
| 23 | + * @output |
| 24 | + * numRowsPatched: number - Number of rows patched |
| 25 | + * numRowsLeftToPatch: number - Number of rows left to patch (only returned if returnCount is true) |
| 26 | + * hasMore: boolean - Whether there are more rows to patch |
| 27 | + */ |
| 28 | + |
| 29 | +const patchGsiRows: AdminMutationResolvers['patchGsiRows'] = async ( |
| 30 | + _parent, |
| 31 | + params, |
| 32 | + context, |
| 33 | +) => { |
| 34 | + const { tableId, columnId, limit: limitInput, returnCount } = params.input |
| 35 | + |
| 36 | + /** |
| 37 | + * Default limit is 1000 |
| 38 | + */ |
| 39 | + const limit = limitInput ?? DEFAULT_PATCH_LIMIT |
| 40 | + |
| 41 | + if (!tableId || !columnId || limit <= 0) { |
| 42 | + throw new BadUserInputError('Invalid input') |
| 43 | + } |
| 44 | + |
| 45 | + /** |
| 46 | + * Checks that the admin is access the table from an editor or above account |
| 47 | + */ |
| 48 | + await TableCollaborator.hasAccess(context.currentUser.id, tableId, 'editor') |
| 49 | + |
| 50 | + /** |
| 51 | + * Gets the gsi config for the column to patch from |
| 52 | + */ |
| 53 | + const { config } = await TableColumnMetadata.query() |
| 54 | + .findById(columnId) |
| 55 | + .throwIfNotFound() |
| 56 | + |
| 57 | + if (!config.gsi) { |
| 58 | + throw new BadUserInputError('Column does not have a GSI') |
| 59 | + } |
| 60 | + |
| 61 | + /** |
| 62 | + * GSI MUST be a valid GSI |
| 63 | + * status MUST be pending |
| 64 | + * patchFrom MUST be set |
| 65 | + */ |
| 66 | + const correspondingGsi = GSIS.find((gsi) => gsi.gsi === config.gsi.indexName) |
| 67 | + if ( |
| 68 | + config.gsi.status !== 'pending' || |
| 69 | + !config.gsi.patchFrom || |
| 70 | + !correspondingGsi |
| 71 | + ) { |
| 72 | + throw new BadUserInputError('GSI status is invalid or no patchFrom date') |
| 73 | + } |
| 74 | + |
| 75 | + /** |
| 76 | + * Gets the rows to patch in descending order of createdAt |
| 77 | + * Only rows before the patchFrom date are included |
| 78 | + */ |
| 79 | + const { data: rowsToPatch, cursor } = await TableRow.query |
| 80 | + .byCreatedAt({ |
| 81 | + tableId, |
| 82 | + }) |
| 83 | + .lt({ createdAt: new Date(config.gsi.patchFrom).getTime() }) |
| 84 | + .go({ |
| 85 | + order: 'desc', |
| 86 | + limit, |
| 87 | + ignoreOwnership: true, |
| 88 | + }) |
| 89 | + |
| 90 | + /** |
| 91 | + * We update with a concurrency of 10 to prevent overloading the partition |
| 92 | + * and server |
| 93 | + */ |
| 94 | + const updateLimit = pLimit(100) |
| 95 | + |
| 96 | + await Promise.all( |
| 97 | + /** |
| 98 | + * For each row, we update the GSI sort key with the value of the column |
| 99 | + * to patch from |
| 100 | + */ |
| 101 | + rowsToPatch.map(async (item) => { |
| 102 | + await updateLimit(async () => { |
| 103 | + /** |
| 104 | + * 1. Read original value of column to patch from |
| 105 | + * 2. Cast the original value to a string (since our sort key is a string for now) |
| 106 | + * 3. Update the GSI sort key with the casted value |
| 107 | + * PS: Since this is not an atomic operation, we need to ensure that |
| 108 | + * the original value is still the same when we update the GSI sort key |
| 109 | + * If not, this throws an error and the whole batch has to be re-run |
| 110 | + */ |
| 111 | + const row = await TableRow.get({ |
| 112 | + tableId, |
| 113 | + rowId: item.rowId, |
| 114 | + }).go({ |
| 115 | + ignoreOwnership: true, |
| 116 | + }) |
| 117 | + if (!row) { |
| 118 | + throw new Error('Row not found') |
| 119 | + } |
| 120 | + |
| 121 | + const originalValue = row.data.data[columnId] |
| 122 | + const valueToPatch = castGsiValue(originalValue) |
| 123 | + |
| 124 | + return TableRow.update({ tableId, rowId: item.rowId }) |
| 125 | + .data((row, { set, remove }) => { |
| 126 | + /** |
| 127 | + * If undefined/nullish or empty string, remove the sort key |
| 128 | + * Otherwise, set the sort key to the value to patch |
| 129 | + */ |
| 130 | + if (valueToPatch != null) { |
| 131 | + set(row[correspondingGsi.sk], valueToPatch) |
| 132 | + } else { |
| 133 | + remove(row[correspondingGsi.sk]) |
| 134 | + } |
| 135 | + }) |
| 136 | + .where(({ data }, { eq }) => eq(data[columnId], originalValue)) |
| 137 | + .go({ |
| 138 | + ignoreOwnership: true, |
| 139 | + }) |
| 140 | + }) |
| 141 | + }), |
| 142 | + ) |
| 143 | + |
| 144 | + const hasMore = rowsToPatch.length === 0 || !!cursor |
| 145 | + |
| 146 | + /** |
| 147 | + * We get the last(earliest) createdAt of the rows to patch |
| 148 | + * This will be used to update the patchFrom date |
| 149 | + */ |
| 150 | + const lastPatchedCreatedAt = rowsToPatch[rowsToPatch.length - 1].createdAt |
| 151 | + |
| 152 | + await TableColumnMetadata.query() |
| 153 | + .patch({ |
| 154 | + ['config:gsi' as DataPropertyNames<TableColumnMetadata>]: { |
| 155 | + ...config.gsi, |
| 156 | + status: hasMore ? 'pending' : 'ready', |
| 157 | + patchFrom: hasMore |
| 158 | + ? new Date(lastPatchedCreatedAt).toISOString() |
| 159 | + : undefined, |
| 160 | + }, |
| 161 | + }) |
| 162 | + .where({ id: columnId }) |
| 163 | + |
| 164 | + /** |
| 165 | + * If returnCount is true, we get the number of rows left to patch |
| 166 | + */ |
| 167 | + let numRowsLeftToPatch |
| 168 | + if (returnCount) { |
| 169 | + const numRowsLeftToPatchRes = await TableRow.query |
| 170 | + .byCreatedAt({ |
| 171 | + tableId, |
| 172 | + }) |
| 173 | + .lt({ createdAt: lastPatchedCreatedAt }) |
| 174 | + .go({ |
| 175 | + pages: 'all', |
| 176 | + attributes: ['rowId'], |
| 177 | + }) |
| 178 | + numRowsLeftToPatch = numRowsLeftToPatchRes.data.length |
| 179 | + } |
| 180 | + |
| 181 | + return { |
| 182 | + numRowsPatched: rowsToPatch.length, |
| 183 | + numRowsLeftToPatch, |
| 184 | + hasMore, |
| 185 | + } |
| 186 | +} |
| 187 | + |
| 188 | +export default patchGsiRows |
0 commit comments