Skip to content

Commit e783311

Browse files
committed
reject pending requests during drain
1 parent 2cb41da commit e783311

1 file changed

Lines changed: 77 additions & 50 deletions

File tree

db-service/lib/common/generic-pool.js

Lines changed: 77 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,42 @@ module.exports = DEBUG && !cds.requires.db?.pool?.builtin ? TrackedConnectionPoo
4343
const { EventEmitter } = require('events')
4444

4545
const ResourceState = Object.freeze({
46-
ALLOCATED: 'ALLOCATED',
47-
IDLE: 'IDLE',
48-
INVALID: 'INVALID',
49-
VALIDATION: 'VALIDATION'
46+
ALLOCATED: 'allocated',
47+
IDLE: 'idle',
48+
INVALID: 'invalid',
49+
VALIDATION: 'validation'
5050
})
5151

52+
const RequestState = Object.freeze({
53+
PENDING: 'pending',
54+
RESOLVED: 'resolved',
55+
REJECTED: 'rejected'
56+
})
57+
58+
class Request {
59+
constructor (ttl) {
60+
this.state = 'pending'
61+
this.promise = new Promise((resolve, reject) => {
62+
this._resolve = value => {
63+
clearTimeout(this._timeout)
64+
this.state = RequestState.RESOLVED
65+
resolve(value)
66+
}
67+
this._reject = reason => {
68+
clearTimeout(this._timeout)
69+
this.state = RequestState.REJECTED
70+
reject(reason)
71+
}
72+
if (typeof ttl === 'number' && ttl >= 0) {
73+
const err = new Error(`Pool resource could not be acquired within ${ttl / 1000}s`)
74+
this._timeout = setTimeout(() => this._reject(err), ttl)
75+
}
76+
})
77+
}
78+
resolve (v) { if (this.state === 'pending') this._resolve(v) }
79+
reject (e) { if (this.state === 'pending') this._reject(e) }
80+
}
81+
5282
class PooledResource {
5383
constructor(resource) {
5484
this.obj = resource
@@ -67,53 +97,48 @@ class PooledResource {
6797

6898
class Pool extends EventEmitter {
6999

70-
constructor(factory, options = {}) {
71-
super()
72-
this.factory = factory
73-
this.options = Object.assign({
74-
testOnBorrow: false,
75-
evictionRunIntervalMillis: 100000,
76-
numTestsPerEvictionRun: 3,
77-
softIdleTimeoutMillis: -1,
78-
idleTimeoutMillis: 30000,
79-
acquireTimeoutMillis: null,
80-
destroyTimeoutMillis: null,
81-
fifo: false,
82-
min: 0,
83-
max: 10
84-
}, options)
85-
this._draining = false
86-
this._available = new Set()
87-
this._loans = new Map()
88-
this._all = new Set()
89-
this._creates = new Set()
90-
this._queue = []
91-
this.#scheduleEviction()
92-
for (let i = 0; i < this.options.min - this.size; i++) this.#createResource()
93-
}
100+
constructor (factory, options = {}) {
101+
super()
102+
this.factory = factory
103+
104+
this.options = Object.assign({
105+
testOnBorrow: false,
106+
evictionRunIntervalMillis: 100000,
107+
numTestsPerEvictionRun: 3,
108+
softIdleTimeoutMillis: -1,
109+
idleTimeoutMillis: 30000,
110+
acquireTimeoutMillis: null,
111+
destroyTimeoutMillis: null,
112+
fifo: false,
113+
min: 0,
114+
max: 10
115+
}, options)
116+
117+
/** @type {boolean} */
118+
this._draining = false
119+
120+
/** @type {Set<PooledResource>} */
121+
this._available = new Set()
122+
123+
/** @type {Map<any, { pooledResource: PooledResource }>} */
124+
this._loans = new Map()
125+
126+
/** @type {Set<PooledResource>} */
127+
this._all = new Set()
128+
129+
/** @type {Set<Promise<void>>} */
130+
this._creates = new Set()
131+
132+
/** @type {Request[]} */
133+
this._queue = []
134+
135+
this.#scheduleEviction()
136+
for (let i = 0; i < this.options.min - this.size; i++) this.#createResource()
137+
}
94138

95139
async acquire() {
96140
if (this._draining) throw new Error('Pool is draining and cannot accept new requests')
97-
const request = { state: 'pending' }
98-
request.promise = new Promise((resolve, reject) => {
99-
request.resolve = value => {
100-
clearTimeout(request.timeout)
101-
request.state = 'resolved'
102-
resolve(value)
103-
}
104-
request.reject = reason => {
105-
clearTimeout(request.timeout)
106-
request.state = 'rejected'
107-
reject(reason)
108-
}
109-
const ttl = this.options.acquireTimeoutMillis
110-
if (typeof ttl === 'number' && ttl >= 0) {
111-
const error = new Error(`Pool resource could not be acquired within ${ttl/1000}s`)
112-
request.timeout = setTimeout(() => {
113-
request.reject(error)
114-
}, ttl)
115-
}
116-
})
141+
const request = new Request(this.options.acquireTimeoutMillis)
117142
this._queue.push(request)
118143
this.#dispense()
119144
return request.promise
@@ -140,7 +165,9 @@ class Pool extends EventEmitter {
140165

141166
async drain() {
142167
this._draining = true
143-
if (this._queue.length > 0) await this._queue.at(-1).promise
168+
for (const request of this._queue.splice(0)) {
169+
if (request.state === RequestState.PENDING) request.reject(new Error('Pool is draining and cannot fulfil request'))
170+
}
144171
clearTimeout(this._scheduledEviction)
145172
}
146173

@@ -185,7 +212,7 @@ class Pool extends EventEmitter {
185212
this._available.add(resource)
186213
return false
187214
}
188-
if (request.state !== 'pending') {
215+
if (request.state !== RequestState.PENDING) {
189216
this.#dispense()
190217
return false
191218
}

0 commit comments

Comments
 (0)