diff --git a/.github/workflows/security-knn-tests.yml b/.github/workflows/security-knn-tests.yml index ee4d50a3..ea62312d 100644 --- a/.github/workflows/security-knn-tests.yml +++ b/.github/workflows/security-knn-tests.yml @@ -36,8 +36,8 @@ jobs: run: | opensearch_version=$(./gradlew properties | grep -E '^version:' | awk '{print $2}') # we publish build artifacts to the below url - sec_plugin_url="https://central.sonatype.com/service/rest/repository/browse/maven-snapshots/org/opensearch/plugin/opensearch-security/"$opensearch_version"/" - sec_st=$(curl -s -o /dev/null -w "%{http_code}" $sec_plugin_url) + sec_plugin_url="https://central.sonatype.com/service/rest/repository/browse/maven-snapshots/org/opensearch/plugin/opensearch-security/${opensearch_version}/" + sec_st=$(curl -L -s -o /dev/null -w "%{http_code}" $sec_plugin_url) if [ "$sec_st" = "200" ]; then echo "isSecurityPluginAvailable=True" >> $GITHUB_OUTPUT cat $GITHUB_OUTPUT @@ -45,8 +45,8 @@ jobs: echo "isSecurityPluginAvailable=False" >> $GITHUB_OUTPUT cat $GITHUB_OUTPUT fi - knn_plugin_url="https://central.sonatype.com/service/rest/repository/browse/maven-snapshots/org/opensearch/plugin/opensearch-knn/"$opensearch_version"/" - knn_st=$(curl -s -o /dev/null -w "%{http_code}" $knn_plugin_url) + knn_plugin_url="https://central.sonatype.com/service/rest/repository/browse/maven-snapshots/org/opensearch/plugin/opensearch-knn/${opensearch_version}/" + knn_st=$(curl -L -s -o /dev/null -w "%{http_code}" $knn_plugin_url) if [ "$knn_st" = "200" ]; then echo "isKnnPluginAvailable=True" >> $GITHUB_OUTPUT cat $GITHUB_OUTPUT diff --git a/build.gradle b/build.gradle index 88d23915..b740b43a 100644 --- a/build.gradle +++ b/build.gradle @@ -385,7 +385,7 @@ testClusters { if(securityEnabled) { plugin(provider(securityPluginFile)) } - testDistribution = "INTEG_TEST" + testDistribution = "ARCHIVE" if(knnEnabled) { plugin(provider(knnPluginFile)) testDistribution = "ARCHIVE" @@ -400,7 +400,7 @@ testClusters { if(_numNodes == 1) jvmArgs "${-> getDebugJvmArgs(debugPort++)}" } followCluster { - testDistribution = "INTEG_TEST" + testDistribution = "ARCHIVE" plugin(project.tasks.bundlePlugin.archiveFile) if(securityEnabled) { plugin(provider(securityPluginFile)) @@ -701,11 +701,8 @@ clusters.each { name -> } // Currently fetching the ARCHIVE distribution fails on mac as it tries to fetch the Mac specific "DARWIN" distribution // for Opensearch which is not publish yet. Changing this to INTEG_TEST to make it work on mac. - if (System.getProperty("os.name").startsWith("Mac")) { - testDistribution = "INTEG_TEST" - } else { - testDistribution = "ARCHIVE" - } + + testDistribution = "ARCHIVE" if (_numNodes != 3) numberOfNodes = 3 setting 'path.repo', repo.absolutePath diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/ResumeReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/ResumeReplicationIT.kt index deca986c..cbe6d9f7 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/ResumeReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/ResumeReplicationIT.kt @@ -44,6 +44,8 @@ import org.junit.Assume import java.nio.file.Files import java.util.concurrent.TimeUnit import org.opensearch.replication.ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS +import org.junit.BeforeClass +import org.junit.AfterClass @MultiClusterAnnotations.ClusterConfigurations( MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER), @@ -57,6 +59,44 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { private val buildDir = System.getProperty("build.dir") private val synonymsJson = "/analyzers/synonym_setting.json" + companion object { + private val allSynonymPaths = mutableListOf() + private val buildDir = System.getProperty("build.dir") + + @BeforeClass + @JvmStatic + fun setupSynonymFiles() { + val testClustersDir = PathUtils.get(buildDir, "testclusters") + + Files.walk(testClustersDir, 1) + .filter { Files.isDirectory(it) && (it.fileName.toString().startsWith("leaderCluster") || it.fileName.toString().startsWith("followCluster")) } + .forEach { clusterDir -> + val configDir = clusterDir.resolve("config") + if (Files.exists(configDir)) { + // Copy all required synonym files + listOf("synonyms.txt", "synonyms_new.txt", "synonyms_follower.txt").forEach { filename -> + val targetPath = configDir.resolve(filename) + if (!Files.exists(targetPath)) { + ResumeReplicationIT::class.java.getResourceAsStream("/analyzers/synonyms.txt")?.use { synonymStream -> + Files.copy(synonymStream, targetPath) + } + } + allSynonymPaths.add(targetPath) + } + } + } + } + + @AfterClass + @JvmStatic + fun cleanupSynonymFiles() { + allSynonymPaths.forEach { if (Files.exists(it)) Files.delete(it) } + allSynonymPaths.clear() + } + } + + + fun `test pause and resume replication in following state and empty index`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) @@ -168,133 +208,95 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote()) - val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt") - val config = PathUtils.get(buildDir, leaderClusterPath, "config") - val synonymPath = config.resolve("synonyms.txt") val leaderClient = getClientForCluster(LEADER) val followerClient = getClientForCluster(FOLLOWER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + createConnectionBetweenClusters(FOLLOWER, LEADER) + val followerSynonymFilename = "synonyms_not_exists.txt" + val overriddenSettings: Settings = Settings.builder() + .put("index.analysis.filter.my_filter.type", "synonym") + .put("index.analysis.filter.my_filter.synonyms_path", followerSynonymFilename) + .build() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, overriddenSettings), waitForRestore = true) + followerClient.pauseReplication(followerIndexName) + leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); + val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) + .build() try { - Files.copy(synonyms, synonymPath) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) - assertThat(createIndexResponse.isAcknowledged).isTrue() - createConnectionBetweenClusters(FOLLOWER, LEADER) - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) - followerClient.pauseReplication(followerIndexName) - leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); - val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) - .build() - try { - leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) - } catch (e: Exception) { - assumeNoException("Ignored test as analyzer setting could not be added", e) - } - leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT); - assertThatThrownBy { - followerClient.resumeReplication(followerIndexName) - }.isInstanceOf(ResponseException::class.java).hasMessageContaining("resource_not_found_exception") - } finally { - if (Files.exists(synonymPath)) { - Files.delete(synonymPath) - } + leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) + } catch (e: Exception) { + assumeNoException("Ignored test as analyzer setting could not be added", e) } + leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT); + assertThatThrownBy { + followerClient.resumeReplication(followerIndexName) + }.isInstanceOf(ResponseException::class.java).hasMessageContaining("resource_not_found_exception") } fun `test that replication resumes when custom analyser is present in follower`() { Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote()) - val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt") - val config = PathUtils.get(buildDir, leaderClusterPath, "config") - val synonymFilename = "synonyms.txt" - val synonymPath = config.resolve(synonymFilename) - val followerConfig = PathUtils.get(buildDir, followerClusterPath, "config") - val followerSynonymPath = followerConfig.resolve(synonymFilename) val leaderClient = getClientForCluster(LEADER) val followerClient = getClientForCluster(FOLLOWER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + createConnectionBetweenClusters(FOLLOWER, LEADER) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + followerClient.pauseReplication(followerIndexName) + leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); + val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) + .build() try { - Files.copy(synonyms, synonymPath) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) - assertThat(createIndexResponse.isAcknowledged).isTrue() - createConnectionBetweenClusters(FOLLOWER, LEADER) - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) - followerClient.pauseReplication(followerIndexName) - leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); - Files.copy(synonyms, followerSynonymPath) - val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) - .build() - try { - leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) - } catch (e: Exception) { - assumeNoException("Ignored test as analyzer setting could not be added", e) - } - leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT); - followerClient.resumeReplication(followerIndexName) - var statusResp = followerClient.replicationStatus(followerIndexName) - `validate status syncing response`(statusResp) - } finally { - if (Files.exists(synonymPath)) { - Files.delete(synonymPath) - } - if (Files.exists(followerSynonymPath)) { - Files.delete(followerSynonymPath) - } + leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) + } catch (e: Exception) { + assumeNoException("Ignored test as analyzer setting could not be added", e) } + leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT); + followerClient.resumeReplication(followerIndexName) + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate status syncing response`(statusResp) } fun `test that replication resumes when custom analyser is overridden and present in follower`() { Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote()) - val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt") - val config = PathUtils.get(buildDir, leaderClusterPath, "config") - val synonymPath = config.resolve("synonyms.txt") - val newSynonymPath = config.resolve("synonyms_new.txt") - val followerConfig = PathUtils.get(buildDir, followerClusterPath, "config") val followerSynonymFilename = "synonyms_follower.txt" - val followerSynonymPath = followerConfig.resolve(followerSynonymFilename) val leaderClient = getClientForCluster(LEADER) val followerClient = getClientForCluster(FOLLOWER) + + var settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) + .build() try { - Files.copy(synonyms, synonymPath) - Files.copy(synonyms, followerSynonymPath) - var settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) - .build() - try { - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) - assertThat(createIndexResponse.isAcknowledged).isTrue() - } catch (e: Exception) { - assumeNoException("Ignored test as analyzer setting could not be added", e) - } - createConnectionBetweenClusters(FOLLOWER, LEADER) - val overriddenSettings: Settings = Settings.builder() - .put("index.analysis.filter.my_filter.synonyms_path", followerSynonymFilename) - .build() - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, overriddenSettings), waitForRestore = true) - followerClient.pauseReplication(followerIndexName) - leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); - Files.copy(synonyms, newSynonymPath) - settings = Settings.builder() - .put("index.analysis.filter.my_filter.synonyms_path", "synonyms_new.txt") - .build() - try { - leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) - } catch (e: Exception) { - assumeNoException("Ignored test as analyzer setting could not be added", e) - } - leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT); - followerClient.resumeReplication(followerIndexName) - var statusResp = followerClient.replicationStatus(followerIndexName) - `validate status syncing response`(statusResp) - } finally { - if (Files.exists(synonymPath)) { - Files.delete(synonymPath) - } - if (Files.exists(followerSynonymPath)) { - Files.delete(followerSynonymPath) - } - if (Files.exists(newSynonymPath)) { - Files.delete(newSynonymPath) - } + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + } catch (e: Exception) { + assumeNoException("Ignored test as analyzer setting could not be added", e) + } + createConnectionBetweenClusters(FOLLOWER, LEADER) + val overriddenSettings: Settings = Settings.builder() + .put("index.analysis.filter.my_filter.type", "synonym") + .put("index.analysis.filter.my_filter.synonyms_path", followerSynonymFilename) + .build() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, overriddenSettings), waitForRestore = true) + followerClient.pauseReplication(followerIndexName) + leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); + settings = Settings.builder() + .put("index.analysis.filter.my_filter.type", "synonym") + .put("index.analysis.filter.my_filter.synonyms_path", "synonyms_new.txt") + .build() + try { + leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) + } catch (e: Exception) { + assumeNoException("Ignored test as analyzer setting could not be added", e) } + leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT); + followerClient.resumeReplication(followerIndexName) + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate status syncing response`(statusResp) } } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index 685eddc6..52e17452 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -83,6 +83,8 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.index.mapper.Mapping import org.opensearch.indices.replication.common.ReplicationType import org.opensearch.replication.util.ValidationUtil +import org.junit.BeforeClass +import org.junit.AfterClass @MultiClusterAnnotations.ClusterConfigurations( @@ -102,6 +104,41 @@ class StartReplicationIT: MultiClusterRestTestCase() { // 3x of SLEEP_TIME_BETWEEN_POLL_MS val SLEEP_TIME_BETWEEN_SYNC = 15L + companion object { + private val allSynonymPaths = mutableListOf() + private val buildDir = System.getProperty("build.dir") + + @BeforeClass + @JvmStatic + fun setupSynonymFiles() { + val testClustersDir = PathUtils.get(buildDir, "testclusters") + + Files.walk(testClustersDir, 1) + .filter { Files.isDirectory(it) && (it.fileName.toString().startsWith("leaderCluster") || it.fileName.toString().startsWith("followCluster")) } + .forEach { clusterDir -> + val configDir = clusterDir.resolve("config") + if (Files.exists(configDir)) { + listOf("synonyms.txt", "synonyms_new.txt", "synonyms_follower.txt").forEach { filename -> + val targetPath = configDir.resolve(filename) + if (!Files.exists(targetPath)) { + StartReplicationIT::class.java.getResourceAsStream("/analyzers/synonyms.txt")?.use { synonymStream -> + Files.copy(synonymStream, targetPath) + } + } + allSynonymPaths.add(targetPath) + } + } + } + } + + @AfterClass + @JvmStatic + fun cleanupSynonymFiles() { + allSynonymPaths.forEach { if (Files.exists(it)) Files.delete(it) } + allSynonymPaths.clear() + } + } + fun `test start replication in following state and empty index`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) @@ -592,72 +629,50 @@ class StartReplicationIT: MultiClusterRestTestCase() { Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote()) - val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt") - val config = PathUtils.get(buildDir, leaderClusterPath, "config") - val synonymPath = config.resolve("synonyms.txt") + val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .build() + val leaderClient = getClientForCluster(LEADER) + val followerClient = getClientForCluster(FOLLOWER) try { - Files.copy(synonyms, synonymPath) - - val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) - .build() - val leaderClient = getClientForCluster(LEADER) - val followerClient = getClientForCluster(FOLLOWER) - try { - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName) - .settings(settings).mapping(synonymMapping, XContentType.JSON), RequestOptions.DEFAULT) - assertThat(createIndexResponse.isAcknowledged).isTrue() - } catch (e: Exception) { - assumeNoException("Ignored test as analyzer setting could not be added", e) - } - createConnectionBetweenClusters(FOLLOWER, LEADER) - assertThatThrownBy { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - }.isInstanceOf(ResponseException::class.java).hasMessageContaining("resource_not_found_exception") - } finally { - if (Files.exists(synonymPath)) { - Files.delete(synonymPath) - } + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName) + .settings(settings).mapping(synonymMapping, XContentType.JSON), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + } catch (e: Exception) { + assumeNoException("Ignored test as analyzer setting could not be added", e) } + createConnectionBetweenClusters(FOLLOWER, LEADER) + // Use non-existent synonym file to trigger the error + val overriddenSettings: Settings = Settings.builder() + .put("index.analysis.filter.my_filter.type", "synonym") + .put("index.analysis.filter.my_filter.synonyms_path", "synonyms_not_exists.txt") + .build() + assertThatThrownBy { + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, overriddenSettings)) + }.isInstanceOf(ResponseException::class.java).hasMessageContaining("resource_not_found_exception") } fun `test that replication starts successfully when custom analyser is present in follower`() { Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote()) - val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt") - val leaderConfig = PathUtils.get(buildDir, leaderClusterPath, "config") - val leaderSynonymPath = leaderConfig.resolve("synonyms.txt") - val followerConfig = PathUtils.get(buildDir, followerClusterPath, "config") - val followerSynonymPath = followerConfig.resolve("synonyms.txt") + val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .build() + val leaderClient = getClientForCluster(LEADER) + val followerClient = getClientForCluster(FOLLOWER) try { - Files.copy(synonyms, leaderSynonymPath) - Files.copy(synonyms, followerSynonymPath) - val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) - .build() - val leaderClient = getClientForCluster(LEADER) - val followerClient = getClientForCluster(FOLLOWER) - try { - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName) - .settings(settings).mapping(synonymMapping, XContentType.JSON), RequestOptions.DEFAULT) - assertThat(createIndexResponse.isAcknowledged).isTrue() - } catch (e: Exception) { - assumeNoException("Ignored test as analyzer setting could not be added", e) - } - createConnectionBetweenClusters(FOLLOWER, LEADER) - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), - waitForRestore = true) - assertBusy { - assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) - } - } finally { - if (Files.exists(leaderSynonymPath)) { - Files.delete(leaderSynonymPath) - } - if (Files.exists(followerSynonymPath)) { - Files.delete(followerSynonymPath) - } + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName) + .settings(settings).mapping(synonymMapping, XContentType.JSON), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + } catch (e: Exception) { + assumeNoException("Ignored test as analyzer setting could not be added", e) + } + createConnectionBetweenClusters(FOLLOWER, LEADER) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) } } @@ -665,43 +680,28 @@ class StartReplicationIT: MultiClusterRestTestCase() { Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote()) - val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt") - val leaderConfig = PathUtils.get(buildDir, leaderClusterPath, "config") - val leaderSynonymPath = leaderConfig.resolve("synonyms.txt") - val followerConfig = PathUtils.get(buildDir, followerClusterPath, "config") val synonymFollowerFilename = "synonyms_follower.txt" - val followerSynonymPath = followerConfig.resolve(synonymFollowerFilename) + val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .build() + val leaderClient = getClientForCluster(LEADER) + val followerClient = getClientForCluster(FOLLOWER) try { - Files.copy(synonyms, leaderSynonymPath) - Files.copy(synonyms, followerSynonymPath) - val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) - .build() - val leaderClient = getClientForCluster(LEADER) - val followerClient = getClientForCluster(FOLLOWER) - try { - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName) - .settings(settings).mapping(synonymMapping, XContentType.JSON), RequestOptions.DEFAULT) - assertThat(createIndexResponse.isAcknowledged).isTrue() - } catch (e: Exception) { - assumeNoException("Ignored test as analyzer setting could not be added", e) - } - createConnectionBetweenClusters(FOLLOWER, LEADER) - val overriddenSettings: Settings = Settings.builder() - .put("index.analysis.filter.my_filter.synonyms_path", synonymFollowerFilename) - .build() - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, overriddenSettings), - waitForRestore = true) - assertBusy { - assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) - } - } finally { - if (Files.exists(leaderSynonymPath)) { - Files.delete(leaderSynonymPath) - } - if (Files.exists(followerSynonymPath)) { - Files.delete(followerSynonymPath) - } + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName) + .settings(settings).mapping(synonymMapping, XContentType.JSON), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + } catch (e: Exception) { + assumeNoException("Ignored test as analyzer setting could not be added", e) + } + createConnectionBetweenClusters(FOLLOWER, LEADER) + val overriddenSettings: Settings = Settings.builder() + .put("index.analysis.filter.my_filter.type", "synonym") + .put("index.analysis.filter.my_filter.synonyms_path", synonymFollowerFilename) + .build() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, overriddenSettings), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) } }