|
1 | 1 | import { Event as QueueBaseEvent, BasePayload, StaticThis, Event } from '@internal/queue'
|
2 | 2 | import { getPostgresConnection, getServiceKeyUser } from '@internal/database'
|
3 | 3 | import { StorageKnexDB } from '../database'
|
4 |
| -import { createStorageBackend } from '../backend' |
| 4 | +import { createStorageBackend, StorageBackendAdapter } from '../backend' |
5 | 5 | import { Storage } from '../storage'
|
6 | 6 | import { getConfig } from '../../config'
|
7 | 7 | import { logger } from '@internal/monitoring'
|
8 | 8 | import { createAgent } from '@internal/http'
|
9 | 9 |
|
10 | 10 | const { storageS3MaxSockets, storageBackendType, region } = getConfig()
|
11 | 11 |
|
12 |
| -const httpAgent = createAgent('s3_worker', { |
13 |
| - maxSockets: storageS3MaxSockets, |
14 |
| -}) |
15 |
| -const storageBackend = createStorageBackend(storageBackendType, { |
16 |
| - httpAgent: httpAgent, |
17 |
| -}) |
| 12 | +let storageBackend: StorageBackendAdapter | undefined = undefined |
18 | 13 |
|
19 | 14 | export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> extends QueueBaseEvent<T> {
|
20 | 15 | static onStart() {
|
21 |
| - httpAgent.monitor() |
| 16 | + this.getOrCreateStorageBackend() |
22 | 17 | }
|
23 | 18 |
|
24 | 19 | static onClose() {
|
25 |
| - storageBackend.close() |
| 20 | + storageBackend?.close() |
26 | 21 | }
|
27 | 22 |
|
28 | 23 | /**
|
@@ -81,6 +76,26 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> extends
|
81 | 76 | host: payload.tenant.host,
|
82 | 77 | })
|
83 | 78 |
|
84 |
| - return new Storage(storageBackend, db) |
| 79 | + return new Storage(this.getOrCreateStorageBackend(), db) |
| 80 | + } |
| 81 | + |
| 82 | + protected static getOrCreateStorageBackend(monitor = false) { |
| 83 | + if (storageBackend) { |
| 84 | + return storageBackend |
| 85 | + } |
| 86 | + |
| 87 | + const httpAgent = createAgent('s3_worker', { |
| 88 | + maxSockets: storageS3MaxSockets, |
| 89 | + }) |
| 90 | + |
| 91 | + storageBackend = createStorageBackend(storageBackendType, { |
| 92 | + httpAgent: httpAgent, |
| 93 | + }) |
| 94 | + |
| 95 | + if (monitor) { |
| 96 | + httpAgent.monitor() |
| 97 | + } |
| 98 | + |
| 99 | + return storageBackend |
85 | 100 | }
|
86 | 101 | }
|
0 commit comments