Skip to content

Commit be6f579

Browse files
authored
Merge pull request #1172 from golemfactory/fix/expire-old-proposals
Expire old proposals to prevent memory consumption over time
2 parents 933df07 + 4b170a4 commit be6f579

File tree

5 files changed

+171
-18
lines changed

5 files changed

+171
-18
lines changed

src/golem-network/golem-network.ts

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import { IProposalRepository } from "../market/proposal";
3737
import { Subscription } from "rxjs";
3838
import { GolemConfigError, GolemUserError } from "../shared/error/golem-error";
3939
import { GolemPluginInitializer, GolemPluginOptions, GolemPluginRegistration } from "./plugin";
40+
import { ExpirationManager } from "../shared/expiration/ExpirationManager";
4041

4142
/**
4243
* Instance of an object or a factory function that you can call `new` on.
@@ -68,6 +69,21 @@ function getFactory<
6869
return (...args) => new defaultFactory(...args);
6970
}
7071

72+
interface MarketProposalExpirationOptions {
73+
/**
74+
* Number of milliseconds before a market proposal is considered stale and will be
75+
* removed from all internal caches and DraftOfferProposalPools created by `oneOf` and `manyOf`.
76+
* If unspecified it defaults to twice the demand refresh interval (`market.demandRefreshIntervalSec`) if
77+
* that's specified or 30 minutes otherwise.
78+
*/
79+
offerProposalTTLMs: number;
80+
81+
/**
82+
* Interval at which to check for expired proposals in milliseconds. Defaults to 60s.
83+
*/
84+
offerProposalCleanupIntervalMs: number;
85+
}
86+
7187
export interface GolemNetworkOptions {
7288
/**
7389
* Logger instance to use for logging.
@@ -98,7 +114,7 @@ export interface GolemNetworkOptions {
98114
* This is where you can globally specify several options that determine how the SDK will
99115
* interact with the market.
100116
*/
101-
market?: Partial<MarketModuleOptions>;
117+
market?: Partial<MarketModuleOptions> & Partial<MarketProposalExpirationOptions>;
102118

103119
/**
104120
* Set the data transfer protocol to use for file transfers.
@@ -197,6 +213,7 @@ export type GolemServices = {
197213
demandRepository: IDemandRepository;
198214
fileServer: IFileServer;
199215
storageProvider: StorageProvider;
216+
proposalExpirationManager: ExpirationManager;
200217
};
201218

202219
/**
@@ -267,8 +284,23 @@ export class GolemNetwork {
267284
const demandCache = new CacheService<Demand>();
268285
const proposalCache = new CacheService<OfferProposal>();
269286

287+
const proposalExpirationManager = new ExpirationManager({
288+
logger: this.logger.child("expiration-manager"),
289+
intervalMs: options.market?.offerProposalCleanupIntervalMs || 60 * 1000, // default 60s
290+
timeToLiveMs:
291+
options.market?.offerProposalTTLMs ||
292+
(options?.market?.demandRefreshIntervalSec
293+
? options?.market?.demandRefreshIntervalSec * 1000 * 2
294+
: 30 * 60 * 1000),
295+
});
296+
270297
const demandRepository = new DemandRepository(this.yagna.market, demandCache);
271-
const proposalRepository = new ProposalRepository(this.yagna.market, this.yagna.identity, proposalCache);
298+
const proposalRepository = new ProposalRepository(
299+
this.yagna.market,
300+
this.yagna.identity,
301+
proposalCache,
302+
proposalExpirationManager,
303+
);
272304
const agreementRepository = new AgreementRepository(this.yagna.market, demandRepository);
273305

274306
this.services = {
@@ -278,6 +310,7 @@ export class GolemNetwork {
278310
demandRepository,
279311
proposalCache,
280312
proposalRepository,
313+
proposalExpirationManager,
281314
paymentApi:
282315
this.options.override?.paymentApi ||
283316
new PaymentApiAdapter(
@@ -338,6 +371,7 @@ export class GolemNetwork {
338371
await this.services.paymentApi.connect();
339372
await this.storageProvider.init();
340373
await this.connectPlugins();
374+
this.services.proposalExpirationManager.start();
341375
this.events.emit("connected");
342376
this.hasConnection = true;
343377
} catch (err) {
@@ -359,6 +393,7 @@ export class GolemNetwork {
359393
.catch((err) =>
360394
this.logger.warn("Closing connections with yagna resulted with an error, it will be ignored", err),
361395
);
396+
this.services.proposalExpirationManager.stopAndReset();
362397
this.services.proposalCache.flushAll();
363398
this.abortController = new AbortController();
364399
} catch (err) {
@@ -487,6 +522,7 @@ export class GolemNetwork {
487522
logger: this.logger,
488523
validateOfferProposal: order.market.offerProposalFilter,
489524
selectOfferProposal: order.market.offerProposalSelector,
525+
expirationManager: this.services.proposalExpirationManager,
490526
});
491527

492528
allocation = await this.getAllocationFromOrder({ order, maxAgreements: 1 });
@@ -611,6 +647,7 @@ export class GolemNetwork {
611647
logger: this.logger,
612648
validateOfferProposal: order.market.offerProposalFilter,
613649
selectOfferProposal: order.market.offerProposalSelector,
650+
expirationManager: this.services.proposalExpirationManager,
614651
});
615652

616653
const maxAgreements = typeof poolSize === "number" ? poolSize : (poolSize?.max ?? poolSize?.min ?? 1);

src/market/draft-offer-proposal-pool.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ describe("Draft Offer Proposal Pool", () => {
77
const mockProposal = mock(OfferProposal);
88
// Most of the time we're testing the case when the Proposal is in `Draft` status
99
when(mockProposal.isDraft()).thenReturn(true);
10+
when(mockProposal.id).thenReturn("1");
1011

1112
// NOTE: ts-mockito instance + JS Set.add() doesn't play along, 2x instance(mockProposal) produces "the same" value for (Set.add)
1213

1314
const secondMockProposal = mock(OfferProposal);
1415
// Most of the time we're testing the case when the Proposal is in `Draft` status
1516
when(secondMockProposal.isDraft()).thenReturn(true);
17+
when(secondMockProposal.id).thenReturn("2");
1618

1719
describe("Adding proposals", () => {
1820
describe("Positive cases", () => {

src/market/draft-offer-proposal-pool.ts

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { GolemMarketError, MarketErrorCode } from "./error";
44
import { createAbortSignalFromTimeout, defaultLogger, Logger, runOnNextEventLoopIteration } from "../shared/utils";
55
import { Observable, Subscription } from "rxjs";
66
import { AcquireQueue } from "../shared/utils/acquireQueue";
7+
import { ExpirationManager } from "../shared/expiration/ExpirationManager";
78

89
export type OfferProposalSelector = (proposals: OfferProposal[]) => OfferProposal | null;
910

@@ -28,6 +29,8 @@ export interface ProposalPoolOptions {
2829
minCount?: number;
2930

3031
logger?: Logger;
32+
33+
expirationManager?: ExpirationManager;
3134
}
3235

3336
export interface ProposalPoolEvents {
@@ -68,16 +71,19 @@ export class DraftOfferProposalPool {
6871
/** {@link ProposalPoolOptions.validateOfferProposal} */
6972
private readonly validateOfferProposal: OfferProposalFilter = (proposal: OfferProposal) => proposal !== undefined;
7073

74+
/** {@link ProposalPoolOptions.expirationManager} */
75+
private readonly expirationManager: ExpirationManager | undefined = undefined;
76+
7177
/**
7278
* The proposals that were not yet leased to anyone and are available for lease
7379
*/
74-
private available = new Set<OfferProposal>();
80+
private available = new Map<OfferProposal["id"], OfferProposal>();
7581

7682
/**
7783
* Returns a read-only copy of all draft offers currently in the pool
7884
*/
7985
public getAvailable(): Array<OfferProposal> {
80-
return [...this.available];
86+
return [...this.available.values()];
8187
}
8288

8389
/**
@@ -97,6 +103,13 @@ export class DraftOfferProposalPool {
97103
this.minCount = options.minCount;
98104
}
99105

106+
if (options?.expirationManager) {
107+
this.expirationManager = options.expirationManager;
108+
this.expirationManager.registerCleanupFunction((proposalId: string) => {
109+
this.removeFromAvailable(proposalId);
110+
});
111+
}
112+
100113
this.logger = this.logger = options?.logger || defaultLogger("proposal-pool");
101114
}
102115

@@ -114,7 +127,7 @@ export class DraftOfferProposalPool {
114127
return;
115128
}
116129

117-
this.available.add(proposal);
130+
this.available.set(proposal.id, proposal);
118131

119132
this.events.emit("added", { proposal });
120133
}
@@ -147,7 +160,7 @@ export class DraftOfferProposalPool {
147160
}
148161
if (!this.validateOfferProposal(proposal)) {
149162
// Drop if not valid
150-
this.removeFromAvailable(proposal);
163+
this.removeFromAvailable(proposal.id);
151164
// and try again
152165
return runOnNextEventLoopIteration(tryGettingFromAvailable);
153166
}
@@ -158,7 +171,7 @@ export class DraftOfferProposalPool {
158171
// Try to get one
159172

160173
if (proposal) {
161-
this.available.delete(proposal);
174+
this.available.delete(proposal.id);
162175
this.leased.add(proposal);
163176
this.events.emit("acquired", { proposal });
164177
return proposal;
@@ -184,7 +197,7 @@ export class DraftOfferProposalPool {
184197
return;
185198
}
186199
// otherwise, put it back to the list of available proposals
187-
this.available.add(proposal);
200+
this.available.set(proposal.id, proposal);
188201
} else {
189202
this.events.emit("removed", { proposal });
190203
}
@@ -196,8 +209,8 @@ export class DraftOfferProposalPool {
196209
this.events.emit("removed", { proposal });
197210
}
198211

199-
if (this.available.has(proposal)) {
200-
this.available.delete(proposal);
212+
if (this.available.has(proposal.id)) {
213+
this.available.delete(proposal.id);
201214
this.events.emit("removed", { proposal });
202215
}
203216
}
@@ -235,8 +248,8 @@ export class DraftOfferProposalPool {
235248
*/
236249
public async clear() {
237250
this.acquireQueue.releaseAll();
238-
for (const proposal of this.available) {
239-
this.available.delete(proposal);
251+
for (const [id, proposal] of this.available) {
252+
this.available.delete(id);
240253
this.events.emit("removed", { proposal });
241254
}
242255

@@ -245,19 +258,26 @@ export class DraftOfferProposalPool {
245258
this.events.emit("removed", { proposal });
246259
}
247260

248-
this.available = new Set();
261+
this.available = new Map();
249262
this.leased = new Set();
250263
this.events.emit("cleared");
251264
}
252265

253-
protected removeFromAvailable(proposal: OfferProposal): void {
254-
this.available.delete(proposal);
255-
this.events.emit("removed", { proposal });
266+
protected removeFromAvailable(proposalId: OfferProposal["id"]): void {
267+
const proposalToDelete = this.available.get(proposalId);
268+
if (!proposalToDelete) return;
269+
this.available.delete(proposalId);
270+
this.events.emit("removed", { proposal: proposalToDelete });
256271
}
257272

258273
public readFrom(source: Observable<OfferProposal>): Subscription {
259274
return source.subscribe({
260-
next: (proposal) => this.add(proposal),
275+
next: (proposal) => {
276+
if (this.expirationManager) {
277+
this.expirationManager.registerObjectForCleanup(proposal.id);
278+
}
279+
this.add(proposal);
280+
},
261281
error: (err) => this.logger.error("Error while collecting proposals", err),
262282
});
263283
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import { Logger } from "../utils";
2+
3+
/**
4+
* Simple utility class that runs all given cleanup functions on a set interval.
5+
* It can be used to manage expiration of cache entries.
6+
*/
7+
export class ExpirationManager {
8+
private intervalId: NodeJS.Timeout | null = null;
9+
private trackedItems: Map<string, number> = new Map();
10+
private cleanupFunctions: Set<(id: string) => void | Promise<void>> = new Set();
11+
12+
private ttlMs: number;
13+
private intervalMs: number;
14+
private logger: Logger;
15+
16+
constructor(options: { timeToLiveMs: number; intervalMs: number; logger: Logger }) {
17+
this.ttlMs = options.timeToLiveMs;
18+
this.intervalMs = options.intervalMs;
19+
this.logger = options.logger;
20+
}
21+
22+
start() {
23+
if (this.intervalId) return;
24+
this.intervalId = setInterval(() => {
25+
this.#cleanupExpiredItems().catch((error) => {
26+
this.logger.error("Error occurred in ExpirationManager", { error });
27+
});
28+
}, this.intervalMs);
29+
}
30+
31+
registerObjectForCleanup(id: string) {
32+
if (this.trackedItems.has(id)) {
33+
this.trackedItems.delete(id);
34+
}
35+
this.trackedItems.set(id, Date.now() + this.ttlMs);
36+
}
37+
38+
registerCleanupFunction(fn: (id: string) => void) {
39+
this.cleanupFunctions.add(fn);
40+
return () => this.unregisterCleanupFunction(fn);
41+
}
42+
43+
unregisterObjectForCleanup(id: string) {
44+
this.trackedItems.delete(id);
45+
}
46+
47+
unregisterCleanupFunction(fn: (id: string) => void) {
48+
this.cleanupFunctions.delete(fn);
49+
}
50+
51+
async #cleanupExpiredItems() {
52+
const now = Date.now();
53+
const expiredItems: string[] = [];
54+
for (const [id, expirationDate] of this.trackedItems) {
55+
if (expirationDate <= now) {
56+
this.logger.debug(`Item with id ${id} has expired and will be cleaned up.`);
57+
expiredItems.push(id);
58+
} else {
59+
// JS Map is guaranteed to maintain insertion order
60+
// so when we find the first item that's not expired we
61+
// can safely exit the loop
62+
break;
63+
}
64+
}
65+
66+
const allProcessingPromises = expiredItems.map(async (id) => {
67+
this.trackedItems.delete(id);
68+
const cleanupPromises = [...this.cleanupFunctions].map((fn) => fn(id));
69+
try {
70+
await Promise.all(cleanupPromises);
71+
} catch (error) {
72+
this.logger.error(`Error occurred in cleanup functions for id ${id}`, { error, id });
73+
}
74+
});
75+
76+
await Promise.all(allProcessingPromises);
77+
}
78+
79+
stopAndReset() {
80+
if (this.intervalId) {
81+
clearInterval(this.intervalId);
82+
this.intervalId = null;
83+
}
84+
this.trackedItems.clear();
85+
this.cleanupFunctions.clear();
86+
}
87+
}

src/shared/yagna/repository/proposal-repository.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,23 @@ import { Demand, GolemMarketError, MarketErrorCode } from "../../../market";
44
import { CacheService } from "../../cache/CacheService";
55
import { IProposalRepository, MarketProposal } from "../../../market/proposal/market-proposal";
66
import { OfferCounterProposal } from "../../../market/proposal/offer-counter-proposal";
7+
import { ExpirationManager } from "../../expiration/ExpirationManager";
78

89
export class ProposalRepository implements IProposalRepository {
910
constructor(
1011
private readonly marketService: MarketApi.RequestorService,
1112
private readonly identityService: IdentityApi.DefaultService,
1213
private readonly cache: CacheService<MarketProposal>,
13-
) {}
14+
private readonly expirationManager: ExpirationManager,
15+
) {
16+
this.expirationManager.registerCleanupFunction((proposalId: string) => {
17+
this.cache.delete(proposalId);
18+
});
19+
}
1420

1521
add(proposal: MarketProposal) {
1622
this.cache.set(proposal.id, proposal);
23+
this.expirationManager.registerObjectForCleanup(proposal.id);
1724
return proposal;
1825
}
1926

0 commit comments

Comments
 (0)