Skip to content

Commit 5ca78e4

Browse files
fix(transfer): optimized stats workflow
1 parent ba876c2 commit 5ca78e4

File tree

6 files changed

+102
-24
lines changed

6 files changed

+102
-24
lines changed
Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
1-
import { map, Subject } from 'rxjs';
1+
import { concat, delay, EMPTY, map, of, Subject, switchMap } from 'rxjs';
22

33
import { calcReceivedStats, MBIT, SECOND } from './utils';
44

55
export default {
6-
create: (byteUnit = MBIT, timeUnit = SECOND) => {
6+
create: (byteRatio = MBIT, timeRatio = SECOND) => {
77
return new Subject().pipe(
88
calcReceivedStats(),
9-
calcAverageByteLengthPerTimeUnit(timeUnit),
10-
calcTransferRate(byteUnit)
9+
calcBandwidth(byteRatio, timeRatio)
10+
//
1111
);
1212
}
1313
};
1414

15-
const calcAverageByteLengthPerTimeUnit = timeRatio => {
16-
return source => source.pipe(map(({ value, period }) => (value / period) * timeRatio));
15+
const calcBandwidth = (byteRatio, timeRatio) => {
16+
return source =>
17+
source.pipe(
18+
switchMap(stats => {
19+
let noBandwidth = stats.value === stats.total ? EMPTY : of(0).pipe(delay(500));
20+
return concat(calcTransmittableBytes(stats, byteRatio, timeRatio), noBandwidth);
21+
})
22+
);
1723
};
1824

19-
const calcTransferRate = byteRatio => {
20-
return source => source.pipe(map(bytes => bytes * byteRatio));
25+
const calcTransmittableBytes = (stats, byteRatio, timeRatio) => {
26+
return of(stats).pipe(map(({ value, period }) => (value / period) * byteRatio * timeRatio));
2127
};

packages/operators/src/transfer/stats/Bandwidth.test.js

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,36 @@ describe('Bandwidth', () => {
4040
const bandwidth = Bandwidth.create(KBIT);
4141

4242
testScheduler.run(({ cold, expectObservable }) => {
43-
expectObservable(bandwidth).toBe('--a-b-c-d-e|', expectedVal);
4443
expectObservable(cold('--a-b-c-d-e|', triggerVal).pipe(tap(bandwidth)));
44+
expectObservable(bandwidth).toBe('--a-b-c-d-e|', expectedVal);
45+
});
46+
});
47+
48+
test('bandwidth outage', async () => {
49+
const time = Date.now();
50+
51+
const triggerVal = {
52+
a: { value: new TextEncoder().encode('abcd'), total: 20, period: time },
53+
b: { value: new TextEncoder().encode('edgh'), total: 20, period: time },
54+
c: { value: new TextEncoder().encode('ijkl'), total: 20, period: time },
55+
d: { value: new TextEncoder().encode('mnop'), total: 20, period: time },
56+
e: { value: new TextEncoder().encode('qrst'), total: 20, period: time }
57+
};
58+
59+
const expectedVal = {
60+
a: 15.625,
61+
b: 0,
62+
c: 0.10364842454394693,
63+
d: 0.15495867768595042,
64+
e: 0.20593080724876442,
65+
f: 0.256568144499179
66+
};
67+
68+
const bandwidth = Bandwidth.create(KBIT);
69+
70+
testScheduler.run(({ cold, expectObservable }) => {
71+
expectObservable(cold('--a 600ms b-c-d-e|', triggerVal).pipe(tap(bandwidth)));
72+
expectObservable(bandwidth).toBe('--a 499ms b 100ms c-d-e-f|', expectedVal);
4573
});
4674
});
4775
});

packages/operators/src/transfer/stats/ElapsedTime.test.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ describe('ElapsedTime', () => {
4848
const elapsedTime = ElapsedTime.create();
4949

5050
testScheduler.run(({ cold, expectObservable }) => {
51-
expectObservable(elapsedTime).toBe('a-b-c-d-e-f-g-h-i|', expectedVal);
5251
expectObservable(cold('a-b-c-d-e-f-g-h-i|', triggerVal).pipe(tap(elapsedTime)));
52+
expectObservable(elapsedTime).toBe('a-b-c-d-e-f-g-h-i|', expectedVal);
5353
});
5454
});
5555

@@ -83,8 +83,8 @@ describe('ElapsedTime', () => {
8383
const elapsedTimeSecond = ElapsedTime.create(SECOND);
8484

8585
testScheduler.run(({ cold, expectObservable }) => {
86-
expectObservable(elapsedTimeSecond).toBe('a-b-c-d-e-f-g-h-i|', expectedVal);
8786
expectObservable(cold('a-b-c-d-e-f-g-h-i|', triggerVal).pipe(tap(elapsedTimeSecond)));
87+
expectObservable(elapsedTimeSecond).toBe('a-b-c-d-e-f-g-h-i|', expectedVal);
8888
});
8989
});
9090
});

packages/operators/src/transfer/stats/Progress.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ describe('Progress', () => {
3939
const progress = Progress.create();
4040

4141
testScheduler.run(({ cold, expectObservable }) => {
42-
expectObservable(progress).toBe('a-b-c-d-e-f-g-h-i|', expectedVal);
4342
expectObservable(cold('a-b-c-d-e-f-g-h-i|', triggerVal).pipe(tap(progress)));
43+
expectObservable(progress).toBe('a-b-c-d-e-f-g-h-i|', expectedVal);
4444
});
4545
});
4646
});
Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,40 @@
1-
import { concatWith, distinctUntilChanged, map, of, Subject } from 'rxjs';
1+
import {
2+
concat,
3+
concatWith,
4+
delay,
5+
distinctUntilChanged,
6+
EMPTY,
7+
map,
8+
of,
9+
Subject,
10+
switchMap
11+
} from 'rxjs';
212

313
import { calcReceivedStats, MSECOND } from './utils';
414

515
export default {
6-
create: (timeUnit = MSECOND) => {
16+
create: (timeRatio = MSECOND) => {
717
return new Subject().pipe(
818
calcReceivedStats(),
9-
calcEstimatedTime(),
10-
convertEstimatedTimeTo(timeUnit),
19+
calcTimeEstimate(timeRatio),
1120
concatWith(of(0)),
1221
distinctUntilChanged()
1322
);
1423
}
1524
};
1625

17-
const calcEstimatedTime = () => {
26+
const calcTimeEstimate = timeRatio => {
1827
return source =>
19-
source.pipe(map(({ value, total, period }) => Math.ceil((total - value) * (period / value))));
28+
source.pipe(
29+
switchMap(stats => {
30+
let noEstimation = stats.value === stats.total ? EMPTY : of(Infinity).pipe(delay(500));
31+
return concat(calcEstimation(stats, timeRatio), noEstimation);
32+
})
33+
);
2034
};
2135

22-
const convertEstimatedTimeTo = timeRatio => {
23-
return source => source.pipe(map(value => value / timeRatio));
36+
const calcEstimation = (stats, timeRatio) => {
37+
return of(stats).pipe(
38+
map(({ value, total, period }) => Math.ceil((total - value) * (period / value)) / timeRatio)
39+
);
2440
};

packages/operators/src/transfer/stats/TimeEstimate.test.js

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ describe('TimeEstimate', () => {
1818
vi.restoreAllMocks();
1919
});
2020

21-
test('calc estimate time - millisecond', async () => {
21+
test('time estimation - millisecond', async () => {
2222
const time = Date.now();
2323

2424
const triggerVal = {
@@ -40,12 +40,12 @@ describe('TimeEstimate', () => {
4040
const timeEstimate = TimeEstimate.create();
4141

4242
testScheduler.run(({ cold, expectObservable }) => {
43-
expectObservable(timeEstimate).toBe('--a-b-c-d-e|', expectedVal);
4443
expectObservable(cold('--a-b-c-d-e|', triggerVal).pipe(tap(timeEstimate)));
44+
expectObservable(timeEstimate).toBe('--a-b-c-d-e|', expectedVal);
4545
});
4646
});
4747

48-
test('calc estimate time - second', async () => {
48+
test('time estimation - second', async () => {
4949
const time = Date.now();
5050

5151
const triggerVal = {
@@ -67,8 +67,36 @@ describe('TimeEstimate', () => {
6767
const timeEstimateSecond = TimeEstimate.create(SECOND);
6868

6969
testScheduler.run(({ cold, expectObservable }) => {
70-
expectObservable(timeEstimateSecond).toBe('--a-b-c-d-e|', expectedVal);
7170
expectObservable(cold('--a-b-c-d-e|', triggerVal).pipe(tap(timeEstimateSecond)));
71+
expectObservable(timeEstimateSecond).toBe('--a-b-c-d-e|', expectedVal);
72+
});
73+
});
74+
75+
test('time estimation - outage', async () => {
76+
const time = Date.now();
77+
78+
const triggerVal = {
79+
a: { value: new TextEncoder().encode('abcd'), total: 20, period: time },
80+
b: { value: new TextEncoder().encode('edgh'), total: 20, period: time },
81+
c: { value: new TextEncoder().encode('ijkl'), total: 20, period: time },
82+
d: { value: new TextEncoder().encode('mnop'), total: 20, period: time },
83+
e: { value: new TextEncoder().encode('qrst'), total: 20, period: time }
84+
};
85+
86+
const expectedVal = {
87+
a: 0.008,
88+
b: Infinity,
89+
c: 0.905,
90+
d: 0.404,
91+
e: 0.152,
92+
f: 0
93+
};
94+
95+
const timeEstimateSecond = TimeEstimate.create(SECOND);
96+
97+
testScheduler.run(({ cold, expectObservable }) => {
98+
expectObservable(cold('--a 600ms b-c-d-e|', triggerVal).pipe(tap(timeEstimateSecond)));
99+
expectObservable(timeEstimateSecond).toBe('--a 499ms b 100ms c-d-e-f|', expectedVal);
72100
});
73101
});
74102
});

0 commit comments

Comments
 (0)