Skip to content

Commit a641f3b

Browse files
authored
Merge pull request #1167 from golemfactory/beta
Beta
2 parents ab62687 + 3d0859c commit a641f3b

20 files changed

+326
-137
lines changed

src/activity/activity.module.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ export interface ActivityModule {
1616
*
1717
* @return The resulting activity on the provider for further use
1818
*/
19-
createActivity(agreement: Agreement): Promise<Activity>;
19+
createActivity(agreement: Agreement, signalOrTimeout?: AbortSignal | number): Promise<Activity>;
2020

2121
/**
2222
* Definitely terminate any work on the provider
2323
*
2424
* @return The activity that was permanently terminated
2525
*/
26-
destroyActivity(activity: Activity): Promise<Activity>;
26+
destroyActivity(activity: Activity, signalOrTimeout?: AbortSignal | number): Promise<Activity>;
2727

2828
/**
2929
* Fetches the latest state of the activity. It's recommended to use this method
@@ -214,13 +214,13 @@ export class ActivityModuleImpl implements ActivityModule {
214214
);
215215
}
216216

217-
async createActivity(agreement: Agreement): Promise<Activity> {
217+
async createActivity(agreement: Agreement, signalOrTimeout?: AbortSignal | number): Promise<Activity> {
218218
this.logger.debug("Creating activity", {
219219
agreementId: agreement.id,
220220
provider: agreement.provider,
221221
});
222222
try {
223-
const activity = await this.activityApi.createActivity(agreement);
223+
const activity = await this.activityApi.createActivity(agreement, signalOrTimeout);
224224
this.events.emit("activityCreated", { activity });
225225
this.logger.info("Created activity", {
226226
activityId: activity.id,
@@ -234,10 +234,10 @@ export class ActivityModuleImpl implements ActivityModule {
234234
}
235235
}
236236

237-
async destroyActivity(activity: Activity): Promise<Activity> {
237+
async destroyActivity(activity: Activity, signalOrTimeout?: AbortSignal | number): Promise<Activity> {
238238
this.logger.debug("Destroying activity", activity);
239239
try {
240-
const updated = await this.activityApi.destroyActivity(activity);
240+
const updated = await this.activityApi.destroyActivity(activity, signalOrTimeout);
241241
this.events.emit("activityDestroyed", {
242242
activity: updated,
243243
});

src/activity/api.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ export type ActivityEvents = {
3434
export interface IActivityApi {
3535
getActivity(id: string): Promise<Activity>;
3636

37-
createActivity(agreement: Agreement): Promise<Activity>;
37+
createActivity(agreement: Agreement, signalOrTimeout?: AbortSignal | number): Promise<Activity>;
3838

39-
destroyActivity(activity: Activity): Promise<Activity>;
39+
destroyActivity(activity: Activity, signalOrTimeout?: AbortSignal | number): Promise<Activity>;
4040

4141
getActivityState(id: string): Promise<ActivityStateEnum>;
4242

src/market/agreement/agreement.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export interface AgreementOptions {
2525
}
2626

2727
export interface IAgreementRepository {
28-
getById(id: string): Promise<Agreement>;
28+
getById(id: string, signalOrTimeout?: AbortSignal | number): Promise<Agreement>;
2929
}
3030

3131
/**

src/market/api.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ export interface IMarketApi {
9999
/**
100100
* Retrieves an agreement based on the provided ID.
101101
*/
102-
getAgreement(id: string): Promise<Agreement>;
102+
getAgreement(id: string, signalOrTimeout?: AbortSignal | number): Promise<Agreement>;
103103

104104
/**
105105
* Request creating an agreement from the provided proposal
@@ -117,22 +117,30 @@ export interface IMarketApi {
117117
*
118118
* @return An agreement that's already in an "Approved" state and can be used to create activities on the Provider
119119
*/
120-
proposeAgreement(proposal: OfferProposal, options?: AgreementOptions): Promise<Agreement>;
120+
proposeAgreement(
121+
proposal: OfferProposal,
122+
options?: AgreementOptions,
123+
signalOrTimeout?: AbortSignal | number,
124+
): Promise<Agreement>;
121125

122126
/**
123127
* Confirms the agreement with the provider
124128
*/
125-
confirmAgreement(agreement: Agreement, options?: AgreementOptions): Promise<Agreement>;
129+
confirmAgreement(
130+
agreement: Agreement,
131+
options?: AgreementOptions,
132+
signalOrTimeout?: AbortSignal | number,
133+
): Promise<Agreement>;
126134

127135
/**
128136
* Terminates an agreement.
129137
*/
130-
terminateAgreement(agreement: Agreement, reason?: string): Promise<Agreement>;
138+
terminateAgreement(agreement: Agreement, reason?: string, signalOrTimeout?: AbortSignal | number): Promise<Agreement>;
131139

132140
/**
133141
* Retrieves the state of an agreement based on the provided agreement ID.
134142
*/
135-
getAgreementState(id: string): Promise<AgreementState>;
143+
getAgreementState(id: string, signalOrTimeout?: AbortSignal | number): Promise<AgreementState>;
136144

137145
/**
138146
* Scan the market for offers that match the given specification.

src/market/demand/demand.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,24 @@ export class Demand {
116116
get paymentPlatform(): string {
117117
return this.details.paymentPlatform;
118118
}
119+
120+
/**
121+
* Demand expiration as a timestamp or null if it's not present in the properties object
122+
*/
123+
get expiration(): number | null {
124+
const expirationPropertyValue = this.details.prototype.properties.find(
125+
(property) => property.key === "golem.srv.comp.expiration",
126+
);
127+
if (!expirationPropertyValue) {
128+
return null;
129+
}
130+
if (typeof expirationPropertyValue.value === "number") {
131+
return expirationPropertyValue.value;
132+
}
133+
const valuesAsNumber = Number(expirationPropertyValue.value);
134+
if (Number.isNaN(valuesAsNumber)) {
135+
return null;
136+
}
137+
return valuesAsNumber;
138+
}
119139
}

src/market/draft-offer-proposal-pool.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { createAbortSignalFromTimeout, defaultLogger, Logger, runOnNextEventLoop
55
import { Observable, Subscription } from "rxjs";
66
import { AcquireQueue } from "../shared/utils/acquireQueue";
77

8-
export type OfferProposalSelector = (proposals: OfferProposal[]) => OfferProposal;
8+
export type OfferProposalSelector = (proposals: OfferProposal[]) => OfferProposal | null;
99

1010
export interface ProposalPoolOptions {
1111
/**
@@ -48,6 +48,10 @@ export interface ProposalPoolEvents {
4848
* parties.
4949
*
5050
* Technically, the "market" part of you application should populate this pool with such offer proposals.
51+
*
52+
* It's important to know that offers are never automatically removed from the pool, even if the corresponding
53+
* Demand becomes expired. It's on the application developer to ensure that a proposal is still valid before
54+
* trying to sign an agreement.
5155
*/
5256
export class DraftOfferProposalPool {
5357
public readonly events = new EventEmitter<ProposalPoolEvents>();
@@ -69,6 +73,13 @@ export class DraftOfferProposalPool {
6973
*/
7074
private available = new Set<OfferProposal>();
7175

76+
/**
77+
* Returns a read-only copy of all draft offers currently in the pool
78+
*/
79+
public getAvailable(): Array<OfferProposal> {
80+
return [...this.available];
81+
}
82+
7283
/**
7384
* The proposal that were already leased to someone and shouldn't be leased again
7485
*/
@@ -121,7 +132,14 @@ export class DraftOfferProposalPool {
121132
const tryGettingFromAvailable = async (): Promise<OfferProposal | undefined> => {
122133
signal.throwIfAborted();
123134

124-
const proposal = this.available.size > 0 ? this.selectOfferProposal([...this.available]) : null;
135+
let proposal: OfferProposal | null = null;
136+
if (this.available.size > 0) {
137+
try {
138+
proposal = this.selectOfferProposal(this.getAvailable());
139+
} catch (e) {
140+
this.logger.error("Error in user-defined offer proposal selector", { error: e });
141+
}
142+
}
125143
if (!proposal) {
126144
// No proposal was selected, either `available` is empty or the user's proposal filter didn't select anything
127145
// no point retrying

src/market/market.module.ts

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -147,15 +147,21 @@ export interface MarketModule {
147147
* - ya-ts-client "wait for approval"
148148
*
149149
* @param proposal
150+
* @param signalOrTimeout - The timeout in milliseconds or an AbortSignal that will be used to cancel the operation
150151
*
151152
* @return Returns when the provider accepts the agreement, rejects otherwise. The resulting agreement is ready to create activities from.
152153
*/
153-
proposeAgreement(proposal: OfferProposal): Promise<Agreement>;
154+
proposeAgreement(
155+
proposal: OfferProposal,
156+
agreementOptions?: AgreementOptions,
157+
signalOrTimeout?: AbortSignal | number,
158+
): Promise<Agreement>;
154159

155160
/**
156161
* @return The Agreement that has been terminated via Yagna
162+
* @param signalOrTimeout - The timeout in milliseconds or an AbortSignal that will be used to cancel the operation
157163
*/
158-
terminateAgreement(agreement: Agreement, reason?: string): Promise<Agreement>;
164+
terminateAgreement(agreement: Agreement, reason?: string, signalOrTimeout?: AbortSignal | number): Promise<Agreement>;
159165

160166
/**
161167
* Acquire a proposal from the pool and sign an agreement with the provider. If signing the agreement fails,
@@ -210,7 +216,7 @@ export interface MarketModule {
210216
/**
211217
* Fetch the most up-to-date agreement details from the yagna
212218
*/
213-
fetchAgreement(agreementId: string): Promise<Agreement>;
219+
fetchAgreement(agreementId: string, signalOrTimeout?: AbortSignal | number): Promise<Agreement>;
214220

215221
/**
216222
* Scan the market for offers that match the given demand specification.
@@ -456,8 +462,12 @@ export class MarketModuleImpl implements MarketModule {
456462
}
457463
}
458464

459-
async proposeAgreement(proposal: OfferProposal, options?: AgreementOptions): Promise<Agreement> {
460-
const agreement = await this.marketApi.proposeAgreement(proposal, options);
465+
async proposeAgreement(
466+
proposal: OfferProposal,
467+
options?: AgreementOptions,
468+
signalOrTimeout?: AbortSignal | number,
469+
): Promise<Agreement> {
470+
const agreement = await this.marketApi.proposeAgreement(proposal, options, signalOrTimeout);
461471

462472
this.logger.info("Proposed and got approval for agreement", {
463473
agreementId: agreement.id,
@@ -467,8 +477,12 @@ export class MarketModuleImpl implements MarketModule {
467477
return agreement;
468478
}
469479

470-
async terminateAgreement(agreement: Agreement, reason?: string): Promise<Agreement> {
471-
await this.marketApi.terminateAgreement(agreement, reason);
480+
async terminateAgreement(
481+
agreement: Agreement,
482+
reason?: string,
483+
signalOrTimeout?: AbortSignal | number,
484+
): Promise<Agreement> {
485+
await this.marketApi.terminateAgreement(agreement, reason, signalOrTimeout);
472486

473487
this.logger.info("Terminated agreement", {
474488
agreementId: agreement.id,
@@ -665,8 +679,8 @@ export class MarketModuleImpl implements MarketModule {
665679
}
666680
}
667681

668-
async fetchAgreement(agreementId: string): Promise<Agreement> {
669-
return this.marketApi.getAgreement(agreementId);
682+
async fetchAgreement(agreementId: string, signalOrTimeout?: AbortSignal | number): Promise<Agreement> {
683+
return this.marketApi.getAgreement(agreementId, signalOrTimeout);
670684
}
671685

672686
/**

src/network/api.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,43 @@ export interface INetworkApi {
2020
/**
2121
* Creates a new network with the specified options.
2222
* @param options NetworkOptions
23+
* @param signalOrTimeout - The timeout in milliseconds or an AbortSignal that will be used to cancel the operation
2324
*/
24-
createNetwork(options: NetworkOptions): Promise<Network>;
25+
createNetwork(options: NetworkOptions, signalOrTimeout?: AbortSignal | number): Promise<Network>;
2526

2627
/**
2728
* Removes an existing network.
2829
* @param network - The network to be removed.
30+
* @param signalOrTimeout - The timeout in milliseconds or an AbortSignal that will be used to cancel the operation
2931
*/
30-
removeNetwork(network: Network): Promise<void>;
32+
removeNetwork(network: Network, signalOrTimeout?: AbortSignal | number): Promise<void>;
3133

3234
/**
3335
* Creates a new node within a specified network.
3436
* @param network - The network to which the node will be added.
3537
* @param nodeId - The ID of the node to be created.
3638
* @param nodeIp - Optional IP address for the node. If not provided, the first available IP address will be assigned.
39+
* @param signalOrTimeout - The timeout in milliseconds or an AbortSignal that will be used to cancel the operation
3740
*/
3841

39-
createNetworkNode(network: Network, nodeId: string, nodeIp?: string): Promise<NetworkNode>;
42+
createNetworkNode(
43+
network: Network,
44+
nodeId: string,
45+
nodeIp?: string,
46+
signalOrTimeout?: AbortSignal | number,
47+
): Promise<NetworkNode>;
4048

4149
/**
4250
* Removes an existing node from a specified network.
4351
* @param network - The network from which the node will be removed.
4452
* @param node - The node to be removed.
53+
* @param signalOrTimeout - The timeout in milliseconds or an AbortSignal that will be used to cancel the operation
4554
*/
46-
removeNetworkNode(network: Network, node: NetworkNode): Promise<void>;
55+
removeNetworkNode(network: Network, node: NetworkNode, signalOrTimeout?: AbortSignal | number): Promise<void>;
4756

4857
/**
4958
* Returns the identifier of the requesor
59+
* @param signalOrTimeout - The timeout in milliseconds or an AbortSignal that will be used to cancel the operation
5060
*/
51-
getIdentity(): Promise<string>;
61+
getIdentity(signalOrTimeout?: AbortSignal | number): Promise<string>;
5262
}

src/network/network.module.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ describe("Network", () => {
2424
networkApi: instance(mockNetworkApi),
2525
logger: instance(imock<Logger>()),
2626
});
27-
when(mockNetworkApi.createNetwork(anything())).thenResolve(instance(mockNetwork));
27+
when(mockNetworkApi.createNetwork(anything(), anything())).thenResolve(instance(mockNetwork));
2828
when(mockNetwork.getNetworkInfo()).thenReturn({
2929
id: "test-id-1",
3030
ip: "192.168.0.0",
@@ -188,13 +188,13 @@ describe("Network", () => {
188188
it("should remove network", async () => {
189189
const network = instance(mockNetwork);
190190
await networkModule.removeNetwork(network);
191-
verify(mockNetworkApi.removeNetwork(network)).once();
191+
verify(mockNetworkApi.removeNetwork(network, anything())).once();
192192
});
193193

194194
it("should not remove network that doesn't exist", async () => {
195195
const network = instance(mockNetwork);
196196
const mockError = new Error("404");
197-
when(mockNetworkApi.removeNetwork(network)).thenReject(mockError);
197+
when(mockNetworkApi.removeNetwork(network, anything())).thenReject(mockError);
198198
await expect(networkModule.removeNetwork(network)).rejects.toMatchError(mockError);
199199
});
200200
});

0 commit comments

Comments
 (0)