|
45 | 45 | import org.opensearch.common.settings.Settings;
|
46 | 46 | import org.opensearch.common.xcontent.json.JsonXContent;
|
47 | 47 | import org.opensearch.common.xcontent.support.XContentMapValues;
|
48 |
| -import org.opensearch.core.common.Strings; |
| 48 | +import org.opensearch.index.IndexSettings; |
49 | 49 | import org.opensearch.index.seqno.SeqNoStats;
|
50 | 50 | import org.opensearch.indices.replication.common.ReplicationType;
|
51 | 51 | import org.opensearch.core.rest.RestStatus;
|
@@ -465,6 +465,90 @@ public void testSyncedFlushTransition() throws Exception {
|
465 | 465 | }
|
466 | 466 | }
|
467 | 467 |
|
| 468 | + public void testReplicasUsePrimaryIndexingStrategy() throws Exception { |
| 469 | + Nodes nodes = buildNodeAndVersions(); |
| 470 | + logger.info("cluster discovered:\n {}", nodes.toString()); |
| 471 | + Settings.Builder settings = Settings.builder() |
| 472 | + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) |
| 473 | + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1m") |
| 474 | + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2); |
| 475 | + final String index = "test-index"; |
| 476 | + createIndex(index, settings.build()); |
| 477 | + ensureNoInitializingShards(); // wait for all other shard activity to finish |
| 478 | + ensureGreen(index); |
| 479 | + |
| 480 | + int docCount = 200; |
| 481 | + try (RestClient nodeClient = buildClient(restClientSettings(), |
| 482 | + nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { |
| 483 | + indexDocs(index, 0, docCount); |
| 484 | + |
| 485 | + Thread[] indexThreads = new Thread[5]; |
| 486 | + for (int i = 0; i < indexThreads.length; i++) { |
| 487 | + indexThreads[i] = new Thread(() -> { |
| 488 | + try { |
| 489 | + int idStart = randomInt(docCount / 2); |
| 490 | + indexDocs(index, idStart, idStart + docCount / 2); |
| 491 | + if (randomBoolean()) { |
| 492 | + // perform a refresh |
| 493 | + assertOK(client().performRequest(new Request("POST", index + "/_flush"))); |
| 494 | + } |
| 495 | + } catch (IOException e) { |
| 496 | + throw new AssertionError("failed while indexing [" + e.getMessage() + "]"); |
| 497 | + } |
| 498 | + }); |
| 499 | + indexThreads[i].start(); |
| 500 | + } |
| 501 | + for (Thread indexThread : indexThreads) { |
| 502 | + indexThread.join(); |
| 503 | + } |
| 504 | + if (randomBoolean()) { |
| 505 | + // perform a refresh |
| 506 | + assertOK(client().performRequest(new Request("POST", index + "/_flush"))); |
| 507 | + } |
| 508 | + // verify replica catch up with primary |
| 509 | + assertSeqNoOnShards(index, nodes, docCount, nodeClient); |
| 510 | + assertSourceEqualWithPrimary(index, docCount); |
| 511 | + } |
| 512 | + } |
| 513 | + |
| 514 | + private void assertSourceEqualWithPrimary(final String index, final int expectedCount) throws IOException { |
| 515 | + Request primaryRequest = new Request("GET", index + "/_search"); |
| 516 | + primaryRequest.addParameter("preference", "_primary"); |
| 517 | + primaryRequest.addParameter("size", String.valueOf(expectedCount+100)); |
| 518 | + final Response primaryResponse = client().performRequest(primaryRequest); |
| 519 | + |
| 520 | + Map<String, Object> primaryHits = ObjectPath.createFromResponse(primaryResponse).evaluate("hits"); |
| 521 | + Map<String, Object> totals = ObjectPath.evaluate(primaryHits, "total"); |
| 522 | + assertEquals(expectedCount, totals.get("values")); |
| 523 | + |
| 524 | + List<Object> primarySources = ObjectPath.evaluate(primaryHits, "hits"); |
| 525 | + assertEquals(expectedCount, primarySources.size()); |
| 526 | + |
| 527 | + Map<String, Object> primarys = new HashMap<>(expectedCount); |
| 528 | + for (int i = 0; i < primarySources.size(); i++) { |
| 529 | + primarys.put(ObjectPath.evaluate(primarySources.get(i), "_id"), primarySources.get(i)); |
| 530 | + } |
| 531 | + |
| 532 | + |
| 533 | + // replicas source |
| 534 | + Request replicaRequest = new Request("GET", index + "/_search"); |
| 535 | + replicaRequest.addParameter("preference", "_replica"); |
| 536 | + replicaRequest.addParameter("size", String.valueOf(expectedCount+100)); |
| 537 | + final Response replicaResponse = client().performRequest(replicaRequest); |
| 538 | + |
| 539 | + Map<String, Object> replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits"); |
| 540 | + Map<String, Object> replicaTotals = ObjectPath.evaluate(primaryHits, "total"); |
| 541 | + assertEquals(expectedCount, replicaTotals.get("values")); |
| 542 | + |
| 543 | + List<Object> replicaSources = ObjectPath.evaluate(replicaHits, "hits"); |
| 544 | + assertEquals(expectedCount, replicaSources.size()); |
| 545 | + |
| 546 | + for (Object replicaSource : replicaSources) { |
| 547 | + String id = ObjectPath.evaluate(replicaSource, "_id").toString(); |
| 548 | + assertEquals(primarys.get(id), replicaSource); |
| 549 | + } |
| 550 | + } |
| 551 | + |
468 | 552 | private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
|
469 | 553 | Request request = new Request("GET", index + "/_count");
|
470 | 554 | request.addParameter("preference", preference);
|
|
0 commit comments