52
52
use pocketmine \network \mcpe \compression \CompressBatchPromise ;
53
53
use pocketmine \network \mcpe \compression \CompressBatchTask ;
54
54
use pocketmine \network \mcpe \compression \Compressor ;
55
+ use pocketmine \network \mcpe \compression \CompressorWorkerPool ;
55
56
use pocketmine \network \mcpe \compression \ZlibCompressor ;
56
57
use pocketmine \network \mcpe \convert \TypeConverter ;
57
58
use pocketmine \network \mcpe \encryption \EncryptionContext ;
@@ -210,8 +211,6 @@ class Server{
210
211
private const TICKS_PER_TPS_OVERLOAD_WARNING = 5 * self ::TARGET_TICKS_PER_SECOND ;
211
212
private const TICKS_PER_STATS_REPORT = 300 * self ::TARGET_TICKS_PER_SECOND ;
212
213
213
- private const DEFAULT_ASYNC_COMPRESSION_THRESHOLD = 10_000 ;
214
-
215
214
private static ?Server $ instance = null ;
216
215
217
216
private TimeTrackingSleeperHandler $ tickSleeper ;
@@ -269,8 +268,13 @@ class Server{
269
268
private bool $ onlineMode = true ;
270
269
271
270
private Network $ network ;
272
- private bool $ networkCompressionAsync = true ;
273
- private int $ networkCompressionAsyncThreshold = self ::DEFAULT_ASYNC_COMPRESSION_THRESHOLD ;
271
+
272
+ private int $ networkCompressionThreads ;
273
+ /**
274
+ * @var CompressorWorkerPool[]
275
+ * @phpstan-var array<int, CompressorWorkerPool>
276
+ */
277
+ private array $ networkCompressionThreadPools = [];
274
278
275
279
private Language $ language ;
276
280
private bool $ forceLanguage = false ;
@@ -909,11 +913,13 @@ public function __construct(
909
913
}
910
914
ZlibCompressor::setInstance (new ZlibCompressor ($ netCompressionLevel , $ netCompressionThreshold , ZlibCompressor::DEFAULT_MAX_DECOMPRESSION_SIZE ));
911
915
912
- $ this ->networkCompressionAsync = $ this ->configGroup ->getPropertyBool (Yml::NETWORK_ASYNC_COMPRESSION , true );
913
- $ this ->networkCompressionAsyncThreshold = max (
914
- $ this ->configGroup ->getPropertyInt (Yml::NETWORK_ASYNC_COMPRESSION_THRESHOLD , self ::DEFAULT_ASYNC_COMPRESSION_THRESHOLD ),
915
- $ netCompressionThreshold ?? self ::DEFAULT_ASYNC_COMPRESSION_THRESHOLD
916
- );
916
+ // todo maybe add toggle to use this or legacy network compression
917
+ $ netCompressionThreads = $ this ->configGroup ->getPropertyString (Yml::NETWORK_COMPRESSION_THREADS , "auto " );
918
+ if ($ netCompressionThreads === "auto " ){
919
+ $ this ->networkCompressionThreads = max (1 , Utils::getCoreCount () - 2 );
920
+ }else {
921
+ $ this ->networkCompressionThreads = max (0 , (int ) $ netCompressionThreads );
922
+ }
917
923
918
924
EncryptionContext::$ ENABLED = $ this ->configGroup ->getPropertyBool (Yml::NETWORK_ENABLE_ENCRYPTION , true );
919
925
@@ -1011,7 +1017,7 @@ public function __construct(
1011
1017
1012
1018
$ this ->queryInfo = new QueryInfo ($ this );
1013
1019
1014
- $ playerDataProviderType = $ this ->configGroup ->getPropertyString (" player.default-data-format " , "datfile " );
1020
+ $ playerDataProviderType = $ this ->configGroup ->getPropertyString (Yml:: PLAYER_DEFAULT_DATA_FORMAT , "datfile " );
1015
1021
$ path = Path::join ($ this ->dataPath , "players " );
1016
1022
if ($ playerDataProviderType === "datfile " ) {
1017
1023
$ this ->playerDataProvider = new DatFilePlayerDataProvider ($ path );
@@ -1364,6 +1370,17 @@ public function broadcastTitle(string $title, string $subtitle = "", int $fadeIn
1364
1370
return count ($ recipients );
1365
1371
}
1366
1372
1373
+ private function getNetworkCompressionWorkerPool (Compressor $ compressor ) : CompressorWorkerPool {
1374
+ $ compressorId = spl_object_id ($ compressor );
1375
+ $ workerPool = $ this ->networkCompressionThreadPools [$ compressorId ] ?? null ;
1376
+ if ($ workerPool === null ) {
1377
+ $ this ->logger ->debug ("Creating new worker pool for compressor " . get_class ($ compressor ) . "# " . $ compressorId );
1378
+ $ workerPool = $ this ->networkCompressionThreadPools [$ compressorId ] = new CompressorWorkerPool ($ this ->networkCompressionThreads , $ compressor , $ this ->tickSleeper );
1379
+ }
1380
+
1381
+ return $ workerPool ;
1382
+ }
1383
+
1367
1384
/**
1368
1385
* @internal
1369
1386
* Promises to compress the given batch buffer using the selected compressor, optionally on a separate thread.
@@ -1382,26 +1399,22 @@ public function prepareBatch(string $buffer, Compressor $compressor, ?bool $sync
1382
1399
try {
1383
1400
$ timings ->startTiming ();
1384
1401
1385
- $ threshold = $ compressor ->getCompressionThreshold ();
1386
- if ($ threshold === null || strlen ($ buffer ) < $ compressor ->getCompressionThreshold ()){
1387
- $ compressionType = CompressionAlgorithm::NONE ;
1388
- $ compressed = $ buffer ;
1389
-
1390
- }else {
1391
- $ sync ??= !$ this ->networkCompressionAsync ;
1402
+ if ($ sync === null ){
1403
+ $ threshold = $ compressor ->getCompressionThreshold ();
1404
+ $ sync = $ threshold === null || strlen ($ buffer ) < $ threshold ;
1405
+ }
1392
1406
1393
- if (!$ sync && strlen ($ buffer ) >= $ this ->networkCompressionAsyncThreshold ){
1394
- $ promise = new CompressBatchPromise ();
1395
- $ task = new CompressBatchTask ($ buffer , $ promise , $ compressor );
1396
- $ this ->asyncPool ->submitTask ($ task );
1397
- return $ promise ;
1398
- }
1407
+ if (!$ sync && $ this ->networkCompressionThreads > 0 ){
1408
+ $ workerPool = $ this ->getNetworkCompressionWorkerPool ($ compressor );
1399
1409
1400
- $ compressionType = $ compressor ->getNetworkId ();
1401
- $ compressed = $ compressor ->compress ($ buffer );
1410
+ //TODO: we really want to be submitting all sessions' buffers in one go to maximize performance
1411
+ $ promise = $ workerPool ->submit ($ buffer );
1412
+ }else {
1413
+ $ promise = new CompressBatchPromise ();
1414
+ $ promise ->resolve ($ compressor ->compress ($ buffer ));
1402
1415
}
1403
1416
1404
- return chr ( $ compressionType ) . $ compressed ;
1417
+ return $ promise ;
1405
1418
}finally {
1406
1419
$ timings ->stopTiming ();
1407
1420
}
@@ -1518,6 +1531,10 @@ public function forceShutdown() : void{
1518
1531
$ this ->network ->unregisterInterface ($ interface );
1519
1532
}
1520
1533
}
1534
+ foreach ($ this ->networkCompressionThreadPools as $ pool ){
1535
+ $ this ->logger ->debug ("Shutting down network compression thread pool for compressor " . get_class ($ pool ->getCompressor ()) . "# " . spl_object_id ($ pool ->getCompressor ()));
1536
+ $ pool ->shutdown ();
1537
+ }
1521
1538
}catch (\Throwable $ e ){
1522
1539
$ this ->logger ->logException ($ e );
1523
1540
$ this ->logger ->emergency ("Crashed while crashing, killing process " );
0 commit comments