@@ -1309,154 +1309,159 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
1309
1309
% that affects such queue, when the process is made available again, the policy
1310
1310
% will eventually get applied. (https://github.com/rabbitmq/rabbitmq-server/issues/7863)
1311
1311
policy_repair (Config ) ->
1312
- [Server0 , _Server1 , _Server2 ] = Servers =
1313
- rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1314
- Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1315
- # 'confirm.select_ok' {} = amqp_channel :call (Ch , # 'confirm.select' {}),
1316
-
1317
- QQ = ? config (queue_name , Config ),
1318
- ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1319
- declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1320
- RaName = ra_name (QQ ),
1321
- ExpectedMaxLength1 = 10 ,
1322
- Priority1 = 1 ,
1323
- ok = rabbit_ct_broker_helpers :rpc (
1324
- Config ,
1325
- 0 ,
1326
- rabbit_policy ,
1327
- set ,
1328
- [
1329
- <<" /" >>,
1330
- <<QQ /binary , " _1" >>,
1331
- QQ ,
1332
- [{<<" max-length" >>, ExpectedMaxLength1 }, {<<" overflow" >>, <<" reject-publish" >>}],
1333
- Priority1 ,
1334
- <<" quorum_queues" >>,
1335
- <<" acting-user" >>
1336
- ]),
1337
-
1338
- % Wait for the policy to apply
1339
- QueryFun = fun rabbit_fifo :overview /1 ,
1340
- ? awaitMatch ({ok , {_ , #{config := #{max_length := ExpectedMaxLength1 }}}, _ },
1341
- rpc :call (Server0 , ra , local_query , [RaName , QueryFun ]),
1342
- ? DEFAULT_AWAIT ),
1343
-
1344
- % Check the policy has been applied
1345
- % Insert MaxLength1 + some messages but after consuming all messages only
1346
- % MaxLength1 are retrieved.
1347
- % Checking twice to ensure consistency
1348
- publish_confirm_many (Ch , QQ , ExpectedMaxLength1 + 1 ),
1349
- % +1 because QQs let one pass
1350
- wait_for_messages_ready (Servers , RaName , ExpectedMaxLength1 + 1 ),
1351
- fail = publish_confirm (Ch , QQ ),
1352
- fail = publish_confirm (Ch , QQ ),
1353
- consume_all (Ch , QQ ),
1354
-
1355
- % Set higher priority policy, allowing more messages
1356
- ExpectedMaxLength2 = 20 ,
1357
- Priority2 = 2 ,
1358
- ok = rabbit_ct_broker_helpers :rpc (
1359
- Config ,
1360
- 0 ,
1361
- rabbit_policy ,
1362
- set ,
1363
- [
1364
- <<" /" >>,
1365
- <<QQ /binary , " _2" >>,
1366
- QQ ,
1367
- [{<<" max-length" >>, ExpectedMaxLength2 }, {<<" overflow" >>, <<" reject-publish" >>}],
1368
- Priority2 ,
1369
- <<" quorum_queues" >>,
1370
- <<" acting-user" >>
1371
- ]),
1372
-
1373
- % Wait for the policy to apply
1374
- ? awaitMatch ({ok , {_ , #{config := #{max_length := ExpectedMaxLength2 }}}, _ },
1375
- rpc :call (Server0 , ra , local_query , [RaName , QueryFun ]),
1376
- ? DEFAULT_AWAIT ),
1377
-
1378
- % Check the policy has been applied
1379
- % Insert MaxLength2 + some messages but after consuming all messages only
1380
- % MaxLength2 are retrieved.
1381
- % Checking twice to ensure consistency.
1382
- % + 1 because QQs let one pass
1383
- publish_confirm_many (Ch , QQ , ExpectedMaxLength2 + 1 ),
1384
- wait_for_messages_ready (Servers , RaName , ExpectedMaxLength2 + 1 ),
1385
- fail = publish_confirm (Ch , QQ ),
1386
- fail = publish_confirm (Ch , QQ ),
1387
- consume_all (Ch , QQ ),
1388
-
1389
- % Ensure the queue process is unavailable
1390
- lists :foreach (fun (Srv ) -> ensure_qq_proc_dead (Config , Srv , RaName ) end , Servers ),
1391
-
1392
- % Add policy with higher priority, allowing even more messages.
1393
- ExpectedMaxLength3 = 30 ,
1394
- Priority3 = 3 ,
1395
- ok = rabbit_ct_broker_helpers :rpc (
1396
- Config ,
1397
- 0 ,
1398
- rabbit_policy ,
1399
- set ,
1400
- [
1401
- <<" /" >>,
1402
- <<QQ /binary , " _3" >>,
1403
- QQ ,
1404
- [{<<" max-length" >>, ExpectedMaxLength3 }, {<<" overflow" >>, <<" reject-publish" >>}],
1405
- Priority3 ,
1406
- <<" quorum_queues" >>,
1407
- <<" acting-user" >>
1408
- ]),
1409
-
1410
- % Restart the queue process.
1411
- {ok , Queue } =
1412
- rabbit_ct_broker_helpers :rpc (
1413
- Config ,
1414
- 0 ,
1415
- rabbit_amqqueue ,
1416
- lookup ,
1417
- [{resource , <<" /" >>, queue , QQ }]),
1418
- lists :foreach (
1419
- fun (Srv ) ->
1420
- rabbit_ct_broker_helpers :rpc (
1312
+ case rabbit_ct_helpers :is_mixed_versions () of
1313
+ true ->
1314
+ {skip , " Should not run in mixed version environments" };
1315
+ _ ->
1316
+ [Server0 , _Server1 , _Server2 ] = Servers =
1317
+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1318
+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1319
+ # 'confirm.select_ok' {} = amqp_channel :call (Ch , # 'confirm.select' {}),
1320
+
1321
+ QQ = ? config (queue_name , Config ),
1322
+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1323
+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1324
+ RaName = ra_name (QQ ),
1325
+ ExpectedMaxLength1 = 10 ,
1326
+ Priority1 = 1 ,
1327
+ ok = rabbit_ct_broker_helpers :rpc (
1421
1328
Config ,
1422
- Srv ,
1423
- rabbit_quorum_queue ,
1424
- recover ,
1425
- [foo , [Queue ]]
1426
- )
1427
- end ,
1428
- Servers ),
1429
-
1430
- % Wait for the queue to be available again.
1431
- lists :foreach (fun (Srv ) ->
1432
- rabbit_ct_helpers :await_condition (
1433
- fun () ->
1434
- is_pid (
1329
+ 0 ,
1330
+ rabbit_policy ,
1331
+ set ,
1332
+ [
1333
+ <<" /" >>,
1334
+ <<QQ /binary , " _1" >>,
1335
+ QQ ,
1336
+ [{<<" max-length" >>, ExpectedMaxLength1 }, {<<" overflow" >>, <<" reject-publish" >>}],
1337
+ Priority1 ,
1338
+ <<" quorum_queues" >>,
1339
+ <<" acting-user" >>
1340
+ ]),
1341
+
1342
+ % Wait for the policy to apply
1343
+ QueryFun = fun rabbit_fifo :overview /1 ,
1344
+ ? awaitMatch ({ok , {_ , #{config := #{max_length := ExpectedMaxLength1 }}}, _ },
1345
+ rpc :call (Server0 , ra , local_query , [RaName , QueryFun ]),
1346
+ ? DEFAULT_AWAIT ),
1347
+
1348
+ % Check the policy has been applied
1349
+ % Insert MaxLength1 + some messages but after consuming all messages only
1350
+ % MaxLength1 are retrieved.
1351
+ % Checking twice to ensure consistency
1352
+ publish_confirm_many (Ch , QQ , ExpectedMaxLength1 + 1 ),
1353
+ % +1 because QQs let one pass
1354
+ wait_for_messages_ready (Servers , RaName , ExpectedMaxLength1 + 1 ),
1355
+ fail = publish_confirm (Ch , QQ ),
1356
+ fail = publish_confirm (Ch , QQ ),
1357
+ consume_all (Ch , QQ ),
1358
+
1359
+ % Set higher priority policy, allowing more messages
1360
+ ExpectedMaxLength2 = 20 ,
1361
+ Priority2 = 2 ,
1362
+ ok = rabbit_ct_broker_helpers :rpc (
1363
+ Config ,
1364
+ 0 ,
1365
+ rabbit_policy ,
1366
+ set ,
1367
+ [
1368
+ <<" /" >>,
1369
+ <<QQ /binary , " _2" >>,
1370
+ QQ ,
1371
+ [{<<" max-length" >>, ExpectedMaxLength2 }, {<<" overflow" >>, <<" reject-publish" >>}],
1372
+ Priority2 ,
1373
+ <<" quorum_queues" >>,
1374
+ <<" acting-user" >>
1375
+ ]),
1376
+
1377
+ % Wait for the policy to apply
1378
+ ? awaitMatch ({ok , {_ , #{config := #{max_length := ExpectedMaxLength2 }}}, _ },
1379
+ rpc :call (Server0 , ra , local_query , [RaName , QueryFun ]),
1380
+ ? DEFAULT_AWAIT ),
1381
+
1382
+ % Check the policy has been applied
1383
+ % Insert MaxLength2 + some messages but after consuming all messages only
1384
+ % MaxLength2 are retrieved.
1385
+ % Checking twice to ensure consistency.
1386
+ % + 1 because QQs let one pass
1387
+ publish_confirm_many (Ch , QQ , ExpectedMaxLength2 + 1 ),
1388
+ wait_for_messages_ready (Servers , RaName , ExpectedMaxLength2 + 1 ),
1389
+ fail = publish_confirm (Ch , QQ ),
1390
+ fail = publish_confirm (Ch , QQ ),
1391
+ consume_all (Ch , QQ ),
1392
+
1393
+ % Ensure the queue process is unavailable
1394
+ lists :foreach (fun (Srv ) -> ensure_qq_proc_dead (Config , Srv , RaName ) end , Servers ),
1395
+
1396
+ % Add policy with higher priority, allowing even more messages.
1397
+ ExpectedMaxLength3 = 30 ,
1398
+ Priority3 = 3 ,
1399
+ ok = rabbit_ct_broker_helpers :rpc (
1400
+ Config ,
1401
+ 0 ,
1402
+ rabbit_policy ,
1403
+ set ,
1404
+ [
1405
+ <<" /" >>,
1406
+ <<QQ /binary , " _3" >>,
1407
+ QQ ,
1408
+ [{<<" max-length" >>, ExpectedMaxLength3 }, {<<" overflow" >>, <<" reject-publish" >>}],
1409
+ Priority3 ,
1410
+ <<" quorum_queues" >>,
1411
+ <<" acting-user" >>
1412
+ ]),
1413
+
1414
+ % Restart the queue process.
1415
+ {ok , Queue } =
1416
+ rabbit_ct_broker_helpers :rpc (
1417
+ Config ,
1418
+ 0 ,
1419
+ rabbit_amqqueue ,
1420
+ lookup ,
1421
+ [{resource , <<" /" >>, queue , QQ }]),
1422
+ lists :foreach (
1423
+ fun (Srv ) ->
1435
1424
rabbit_ct_broker_helpers :rpc (
1436
1425
Config ,
1437
1426
Srv ,
1438
- erlang ,
1439
- whereis ,
1440
- [RaName ]))
1441
- end )
1442
- end ,
1443
- Servers ),
1444
-
1445
- % Wait for the policy to apply
1446
- ? awaitMatch ({ok , {_ , #{config := #{max_length := ExpectedMaxLength3 }}}, _ },
1447
- rpc :call (Server0 , ra , local_query , [RaName , QueryFun ]),
1448
- ? DEFAULT_AWAIT ),
1449
-
1450
- % Check the policy has been applied
1451
- % Insert MaxLength3 + some messages but after consuming all messages only
1452
- % MaxLength3 are retrieved.
1453
- % Checking twice to ensure consistency.
1454
- % + 1 because QQs let one pass
1455
- publish_confirm_many (Ch , QQ , ExpectedMaxLength3 + 1 ),
1456
- wait_for_messages_ready (Servers , RaName , ExpectedMaxLength3 + 1 ),
1457
- fail = publish_confirm (Ch , QQ ),
1458
- fail = publish_confirm (Ch , QQ ),
1459
- consume_all (Ch , QQ ).
1427
+ rabbit_quorum_queue ,
1428
+ recover ,
1429
+ [foo , [Queue ]]
1430
+ )
1431
+ end ,
1432
+ Servers ),
1433
+
1434
+ % Wait for the queue to be available again.
1435
+ lists :foreach (fun (Srv ) ->
1436
+ rabbit_ct_helpers :await_condition (
1437
+ fun () ->
1438
+ is_pid (
1439
+ rabbit_ct_broker_helpers :rpc (
1440
+ Config ,
1441
+ Srv ,
1442
+ erlang ,
1443
+ whereis ,
1444
+ [RaName ]))
1445
+ end )
1446
+ end ,
1447
+ Servers ),
1448
+
1449
+ % Wait for the policy to apply
1450
+ ? awaitMatch ({ok , {_ , #{config := #{max_length := ExpectedMaxLength3 }}}, _ },
1451
+ rpc :call (Server0 , ra , local_query , [RaName , QueryFun ]),
1452
+ ? DEFAULT_AWAIT ),
1453
+
1454
+ % Check the policy has been applied
1455
+ % Insert MaxLength3 + some messages but after consuming all messages only
1456
+ % MaxLength3 are retrieved.
1457
+ % Checking twice to ensure consistency.
1458
+ % + 1 because QQs let one pass
1459
+ publish_confirm_many (Ch , QQ , ExpectedMaxLength3 + 1 ),
1460
+ wait_for_messages_ready (Servers , RaName , ExpectedMaxLength3 + 1 ),
1461
+ fail = publish_confirm (Ch , QQ ),
1462
+ fail = publish_confirm (Ch , QQ ),
1463
+ consume_all (Ch , QQ )
1464
+ end .
1460
1465
1461
1466
1462
1467
gh_12635 (Config ) ->
0 commit comments