Skip to content

Commit c8c6a8f

Browse files
committed
Merge remote-tracking branch 'origin/develop'
Signed-off-by: Andrey Sobolev <[email protected]>
2 parents 1f1b2f1 + 7c1a161 commit c8c6a8f

File tree

6 files changed

+116
-20
lines changed

6 files changed

+116
-20
lines changed

Diff for: common/scripts/check_model_version.js

+2
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ const exec = require('child_process').exec
22

33
exec('git describe --tags `git rev-list --tags --max-count=1`', (err, stdout, stderr) => {
44
if (err !== null) {
5+
console.log('Error', err)
56
process.exit(1)
67
}
78
const tag = stdout.trim()
89
console.log('Check changes for tag:', tag)
910
exec(`git fetch --tags && git diff ${tag} --name-only`, (err, stdout, stderr) => {
1011
if (err !== null) {
12+
console.log('Error', err)
1113
process.exit(1)
1214
}
1315
const changedFiles = stdout.trim().split('\n')

Diff for: dev/tool/src/index.ts

+34-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//
22
// Copyright © 2020, 2021 Anticrm Platform Contributors.
3-
// Copyright © 2021 Hardcore Engineering Inc.
3+
// Copyright © 2021, 2024 Hardcore Engineering Inc.
44
//
55
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
66
// you may not use this file except in compliance with the License. You may
@@ -74,7 +74,7 @@ import { consoleModelLogger, type MigrateOperation } from '@hcengineering/model'
7474
import contact from '@hcengineering/model-contact'
7575
import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo'
7676
import { openAIConfigDefaults } from '@hcengineering/openai'
77-
import type { StorageAdapter } from '@hcengineering/server-core'
77+
import type { StorageAdapter, StorageAdapterEx } from '@hcengineering/server-core'
7878
import { deepEqual } from 'fast-equals'
7979
import { createWriteStream, readFileSync } from 'fs'
8080
import { benchmark, benchmarkWorker } from './benchmark'
@@ -95,6 +95,7 @@ import { fixJsonMarkup } from './markup'
9595
import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin'
9696
import { openAIConfig } from './openai'
9797
import { fixAccountEmails, renameAccount } from './renameAccount'
98+
import { moveFiles } from './storage'
9899

99100
const colorConstants = {
100101
colorRed: '\u001b[31m',
@@ -1040,6 +1041,37 @@ export function devTool (
10401041
})
10411042
})
10421043

1044+
program
1045+
.command('move-files')
1046+
.option('-w, --workspace <workspace>', 'Selected workspace only', '')
1047+
.action(async (cmd: { workspace: string }) => {
1048+
const { mongodbUri } = prepareTools()
1049+
await withDatabase(mongodbUri, async (db, client) => {
1050+
await withStorage(mongodbUri, async (adapter) => {
1051+
try {
1052+
const exAdapter = adapter as StorageAdapterEx
1053+
if (exAdapter.adapters === undefined || exAdapter.adapters.size < 2) {
1054+
throw new Error('bad storage config, at least two storage providers are required')
1055+
}
1056+
1057+
console.log('moving files to storage provider', exAdapter.defaultAdapter)
1058+
1059+
const workspaces = await listWorkspacesPure(db, productId)
1060+
for (const workspace of workspaces) {
1061+
if (cmd.workspace !== '' && workspace.workspace !== cmd.workspace) {
1062+
continue
1063+
}
1064+
1065+
const wsId = getWorkspaceId(workspace.workspace, productId)
1066+
await moveFiles(toolCtx, wsId, exAdapter)
1067+
}
1068+
} catch (err: any) {
1069+
console.error(err)
1070+
}
1071+
})
1072+
})
1073+
})
1074+
10431075
program.command('fix-bw-workspace <workspace>').action(async (workspace: string) => {
10441076
const { mongodbUri } = prepareTools()
10451077
await withStorage(mongodbUri, async (adapter) => {

Diff for: dev/tool/src/storage.ts

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
//
2+
// Copyright © 2024 Hardcore Engineering Inc.
3+
//
4+
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License. You may
6+
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
//
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
16+
import { type MeasureContext, type WorkspaceId } from '@hcengineering/core'
17+
import { type StorageAdapterEx } from '@hcengineering/server-core'
18+
import { PassThrough } from 'stream'
19+
20+
export async function moveFiles (
21+
ctx: MeasureContext,
22+
workspaceId: WorkspaceId,
23+
exAdapter: StorageAdapterEx
24+
): Promise<void> {
25+
if (exAdapter.adapters === undefined) return
26+
27+
let count = 0
28+
29+
console.log('start', workspaceId.name)
30+
31+
// We assume that the adapter moves all new files to the default adapter
32+
const target = exAdapter.defaultAdapter
33+
await exAdapter.adapters.get(target)?.make(ctx, workspaceId)
34+
35+
for (const [name, adapter] of exAdapter.adapters.entries()) {
36+
if (name === target) continue
37+
38+
const iterator = await adapter.listStream(ctx, workspaceId)
39+
while (true) {
40+
const data = await iterator.next()
41+
if (data === undefined) break
42+
43+
const blob = await exAdapter.stat(ctx, workspaceId, data._id)
44+
if (blob === undefined) continue
45+
if (blob.provider === target) continue
46+
47+
const readable = await exAdapter.get(ctx, workspaceId, data._id)
48+
const stream = readable.pipe(new PassThrough())
49+
await exAdapter.put(ctx, workspaceId, data._id, stream, blob.contentType, blob.size)
50+
51+
count += 1
52+
if (count % 100 === 0) {
53+
console.log('...moved: ', count)
54+
}
55+
}
56+
await iterator.close()
57+
}
58+
59+
console.log('...done', workspaceId.name, count)
60+
}

Diff for: packages/core/src/utils.ts

+10-14
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@
1313
// limitations under the License.
1414
//
1515

16-
import { getEmbeddedLabel, IntlString, PlatformError, unknownError } from '@hcengineering/platform'
16+
import { getEmbeddedLabel, IntlString } from '@hcengineering/platform'
1717
import { deepEqual } from 'fast-equals'
18-
import { DOMAIN_BENCHMARK } from './benchmark'
1918
import {
2019
Account,
2120
AccountRole,
@@ -47,6 +46,7 @@ import { TxOperations } from './operations'
4746
import { isPredicate } from './predicate'
4847
import { DocumentQuery, FindResult } from './storage'
4948
import { DOMAIN_TX } from './tx'
49+
import { DOMAIN_BENCHMARK } from './benchmark'
5050

5151
function toHex (value: number, chars: number): string {
5252
const result = value.toString(16)
@@ -355,6 +355,7 @@ export class DocManager<T extends Doc> implements IDocManager<T> {
355355

356356
export class RateLimiter {
357357
idCounter: number = 0
358+
processingQueue = new Map<number, Promise<void>>()
358359
last: number = 0
359360
rate: number
360361

@@ -365,21 +366,21 @@ export class RateLimiter {
365366
}
366367

367368
notify: (() => void)[] = []
368-
finished: boolean = false
369369

370370
async exec<T, B extends Record<string, any> = any>(op: (args?: B) => Promise<T>, args?: B): Promise<T> {
371-
if (this.finished) {
372-
throw new PlatformError(unknownError('No Possible to add/exec on finished queue'))
373-
}
374-
while (this.notify.length >= this.rate) {
371+
const processingId = this.idCounter++
372+
373+
while (this.processingQueue.size >= this.rate) {
375374
await new Promise<void>((resolve) => {
376375
this.notify.push(resolve)
377376
})
378377
}
379378
try {
380379
const p = op(args)
380+
this.processingQueue.set(processingId, p as Promise<void>)
381381
return await p
382382
} finally {
383+
this.processingQueue.delete(processingId)
383384
const n = this.notify.shift()
384385
if (n !== undefined) {
385386
n()
@@ -388,20 +389,15 @@ export class RateLimiter {
388389
}
389390

390391
async add<T, B extends Record<string, any> = any>(op: (args?: B) => Promise<T>, args?: B): Promise<void> {
391-
if (this.notify.length < this.rate) {
392+
if (this.processingQueue.size < this.rate) {
392393
void this.exec(op, args)
393394
} else {
394395
await this.exec(op, args)
395396
}
396397
}
397398

398399
async waitProcessing (): Promise<void> {
399-
this.finished = true
400-
while (this.notify.length > 0) {
401-
await new Promise<void>((resolve) => {
402-
this.notify.push(resolve)
403-
})
404-
}
400+
await Promise.all(this.processingQueue.values())
405401
}
406402
}
407403

Diff for: server/core/src/__tests__/aggregator.spec.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ describe('aggregator tests', () => {
2424
const ws1: WorkspaceId = { name: 'ws1', productId: '' }
2525
return { mem1, mem2, aggr, ws1, testCtx }
2626
}
27-
it('reuse existing storage', async () => {
27+
it('not reuse existing storage', async () => {
2828
const { mem1, aggr, ws1, testCtx } = prepare1()
2929

3030
// Test default provider
@@ -37,7 +37,7 @@ describe('aggregator tests', () => {
3737
// Test content typed provider
3838
await aggr.put(testCtx, ws1, 'test', 'data2', 'text/plain')
3939
const stat2 = await aggr.stat(testCtx, ws1, 'test')
40-
expect(stat2?.provider).toEqual('mem1')
40+
expect(stat2?.provider).toEqual('mem2')
4141

4242
const dta = Buffer.concat(await aggr.read(testCtx, ws1, 'test')).toString()
4343
expect(dta).toEqual('data2')

Diff for: server/core/src/server/aggregator.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -317,12 +317,11 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
317317
contentType: string,
318318
size?: number | undefined
319319
): Promise<UploadedObjectInfo> {
320-
// We need to reuse same provider for existing documents.
321320
const stat = (
322321
await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, { _id: objectName as Ref<Blob> }, { limit: 1 })
323322
).shift()
324323

325-
const { provider, adapter } = this.selectProvider(stat?.provider, contentType)
324+
const { provider, adapter } = this.selectProvider(undefined, contentType)
326325

327326
const result = await adapter.put(ctx, workspaceId, objectName, stream, contentType, size)
328327

@@ -351,6 +350,13 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
351350
}
352351

353352
await this.dbAdapter.upload<Blob>(ctx, workspaceId, DOMAIN_BLOB, [blobDoc])
353+
354+
// If the file is already stored in different provider, we need to remove it.
355+
if (stat !== undefined && stat.provider !== provider) {
356+
const adapter = this.adapters.get(stat.provider)
357+
await adapter?.remove(ctx, workspaceId, [stat._id])
358+
}
359+
354360
return result
355361
}
356362
}

0 commit comments

Comments
 (0)