Skip to content

Commit 37fcb12

Browse files
committed
[#167064659] Use functional programming into RedisUserMetadataStorage.set
1 parent 5b2dc00 commit 37fcb12

File tree

2 files changed

+99
-68
lines changed

2 files changed

+99
-68
lines changed

src/services/__tests__/redisUserMetadataStorage.test.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ mockRedisClient.set = mockSet;
4848
mockRedisClient.get = mockGet;
4949
mockRedisClient.watch = mockWatch;
5050
mockRedisClient.multi = mockMulti;
51-
mockRedisClient.duplicate = () => mockRedisClient;
51+
const mockDuplicate = jest.fn().mockImplementation(() => mockRedisClient);
52+
mockRedisClient.duplicate = mockDuplicate;
5253

5354
mockMulti.mockImplementation(() => {
5455
return {
@@ -120,7 +121,7 @@ describe("RedisUserMetadataStorage#get", () => {
120121
);
121122
});
122123

123-
describe("RedisUserMetadataStorage#get", () => {
124+
describe("RedisUserMetadataStorage#set", () => {
124125
beforeEach(() => {
125126
jest.clearAllMocks();
126127
});
@@ -241,4 +242,30 @@ describe("RedisUserMetadataStorage#get", () => {
241242
expect(mockSet).not.toBeCalled();
242243
expect(response).toEqual(left(expectedWatchError));
243244
});
245+
246+
it("should duplicate the redis client if a race condition happens on the same key", async () => {
247+
mockWatch.mockImplementation((_, callback) => callback(null));
248+
mockGet.mockImplementation((_, callback) => {
249+
callback(undefined, JSON.stringify(aValidUserMetadata));
250+
});
251+
mockExec.mockImplementationOnce(callback => {
252+
callback(undefined, ["OK"]);
253+
});
254+
mockExec.mockImplementationOnce(callback => {
255+
callback(undefined, null);
256+
});
257+
const newMetadata: UserMetadata = {
258+
metadata,
259+
version: validNewVersion
260+
};
261+
const response = await Promise.all([
262+
userMetadataStorage.set(aValidUser, newMetadata),
263+
userMetadataStorage.set(aValidUser, newMetadata)
264+
]);
265+
expect(mockDuplicate).toBeCalledTimes(1);
266+
expect(response).toEqual([
267+
right(true),
268+
left(concurrentWriteRejectionError)
269+
]);
270+
});
244271
});
Lines changed: 70 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as redis from "redis";
22

3-
import { Either, isLeft, isRight, left, right } from "fp-ts/lib/Either";
3+
import { Either, isLeft, left, right, toError } from "fp-ts/lib/Either";
44
import { ReadableReporter } from "italia-ts-commons/lib/reporters";
55
import { UserMetadata } from "../../generated/backend/UserMetadata";
66
import { User } from "../types/user";
@@ -9,6 +9,12 @@ import { IUserMetadataStorage } from "./IUserMetadataStorage";
99
import RedisStorageUtils from "./redisStorageUtils";
1010

1111
import { Sema } from "async-sema";
12+
import {
13+
fromEither,
14+
fromPredicate,
15+
taskify,
16+
tryCatch
17+
} from "fp-ts/lib/TaskEither";
1218
import { FiscalCode } from "italia-ts-commons/lib/strings";
1319

1420
const userMetadataPrefix = "USERMETA-";
@@ -23,7 +29,7 @@ export const concurrentWriteRejectionError = new Error(
2329
*/
2430
export default class RedisUserMetadataStorage extends RedisStorageUtils
2531
implements IUserMetadataStorage {
26-
private setOperations: Record<string, true | undefined> = {};
32+
private setOperations: Set<string> = new Set();
2733
private mutex: Sema = new Sema(1);
2834
constructor(private readonly redisClient: redis.RedisClient) {
2935
super();
@@ -35,84 +41,83 @@ export default class RedisUserMetadataStorage extends RedisStorageUtils
3541
* during write operations of user metadata
3642
* @see https://github.com/NodeRedis/node_redis#optimistic-locks
3743
*/
38-
// tslint:disable-next-line: cognitive-complexity
3944
public async set(
4045
user: User,
4146
payload: UserMetadata
4247
): Promise<Either<Error, boolean>> {
4348
// In order to work properly, optimistic lock needs to be initialized on different
4449
// redis client instances @see https://github.com/NodeRedis/node_redis/issues/1320#issuecomment-373200541
4550
await this.mutex.acquire();
46-
const raceCondition = this.setOperations[user.fiscal_code];
51+
const raceCondition = this.setOperations.has(user.fiscal_code);
4752
// tslint:disable-next-line: no-let
4853
let duplicatedOrOriginalRedisClient = this.redisClient;
49-
if (raceCondition === undefined) {
50-
// tslint:disable-next-line: no-object-mutation
51-
this.setOperations[user.fiscal_code] = true;
54+
if (raceCondition === false) {
55+
this.setOperations.add(user.fiscal_code);
5256
} else {
57+
// A duplicate redis client must be created only if the main client is already
58+
// in use into an optimistic lock update on the same key
5359
duplicatedOrOriginalRedisClient = this.redisClient.duplicate();
5460
}
5561
this.mutex.release();
56-
const userMetadataWatchResult = await new Promise<Either<Error, true>>(
57-
resolve => {
58-
duplicatedOrOriginalRedisClient.watch(
59-
`${userMetadataPrefix}${user.fiscal_code}`,
60-
err => {
61-
if (err) {
62-
return resolve(left(err));
63-
}
64-
resolve(right(true));
65-
}
66-
);
62+
const userMetadataWatchResult = await taskify(
63+
(key: string, callback: (err: Error | null, value: true) => void) => {
64+
duplicatedOrOriginalRedisClient.watch(key, err => callback(err, true));
6765
}
68-
);
69-
if (isLeft(userMetadataWatchResult)) {
70-
raceCondition
71-
? duplicatedOrOriginalRedisClient.end(true)
72-
: await this.resetOperation(user.fiscal_code);
73-
return userMetadataWatchResult;
74-
}
75-
const getUserMetadataResult = await this.loadUserMetadataByFiscalCode(
76-
user.fiscal_code
77-
);
78-
if (
79-
isRight(getUserMetadataResult) &&
80-
getUserMetadataResult.value.version !== payload.version - 1
81-
) {
82-
raceCondition
83-
? duplicatedOrOriginalRedisClient.end(true)
84-
: await this.resetOperation(user.fiscal_code);
85-
return left(invalidVersionNumberError);
86-
}
87-
if (
88-
isLeft(getUserMetadataResult) &&
89-
getUserMetadataResult.value !== metadataNotFoundError
90-
) {
91-
raceCondition
92-
? duplicatedOrOriginalRedisClient.end(true)
93-
: await this.resetOperation(user.fiscal_code);
94-
return left(getUserMetadataResult.value);
95-
}
96-
return await new Promise<Either<Error, boolean>>(resolve => {
97-
duplicatedOrOriginalRedisClient
98-
.multi()
99-
.set(
100-
`${userMetadataPrefix}${user.fiscal_code}`,
101-
JSON.stringify(payload)
66+
)(`${userMetadataPrefix}${user.fiscal_code}`)
67+
.chain(() =>
68+
tryCatch(
69+
() => this.loadUserMetadataByFiscalCode(user.fiscal_code),
70+
toError
10271
)
103-
.exec(async (err, results) => {
104-
raceCondition
105-
? duplicatedOrOriginalRedisClient.end(true)
106-
: await this.resetOperation(user.fiscal_code);
107-
if (err) {
108-
return resolve(left(err));
109-
}
110-
if (results === null) {
111-
return resolve(left(concurrentWriteRejectionError));
72+
)
73+
.chain(_ => {
74+
if (isLeft(_) && _.value === metadataNotFoundError) {
75+
return fromEither(
76+
right({
77+
metadata: "",
78+
version: 0
79+
})
80+
);
81+
}
82+
return fromEither(_);
83+
})
84+
.chain(
85+
fromPredicate(
86+
_ => _.version === payload.version - 1,
87+
_ => invalidVersionNumberError
88+
)
89+
)
90+
.chain(() =>
91+
taskify(
92+
(
93+
key: string,
94+
data: string,
95+
callback: (
96+
err: Error | null,
97+
value?: Either<Error, boolean>
98+
) => void
99+
) => {
100+
duplicatedOrOriginalRedisClient
101+
.multi()
102+
.set(key, data)
103+
.exec((err, results) => {
104+
if (err) {
105+
return callback(err);
106+
}
107+
if (results === null) {
108+
return callback(concurrentWriteRejectionError);
109+
}
110+
callback(null, this.singleStringReply(err, results[0]));
111+
});
112112
}
113-
resolve(this.singleStringReply(err, results[0]));
114-
});
115-
});
113+
)(`${userMetadataPrefix}${user.fiscal_code}`, JSON.stringify(payload))
114+
)
115+
.chain(fromEither)
116+
.run();
117+
raceCondition
118+
? duplicatedOrOriginalRedisClient.end(true)
119+
: await this.resetOperation(user.fiscal_code);
120+
return userMetadataWatchResult;
116121
}
117122

118123
/**
@@ -169,8 +174,7 @@ export default class RedisUserMetadataStorage extends RedisStorageUtils
169174

170175
private async resetOperation(fiscalCode: FiscalCode): Promise<void> {
171176
await this.mutex.acquire();
172-
// tslint:disable-next-line: no-object-mutation
173-
this.setOperations[fiscalCode] = undefined;
177+
this.setOperations.delete(fiscalCode);
174178
this.mutex.release();
175179
}
176180
}

0 commit comments

Comments
 (0)