Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/security-knn-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ jobs:
run: |
opensearch_version=$(./gradlew properties | grep -E '^version:' | awk '{print $2}')
# we publish build artifacts to the below url
sec_plugin_url="https://central.sonatype.com/service/rest/repository/browse/maven-snapshots/org/opensearch/plugin/opensearch-security/"$opensearch_version"/"
sec_st=$(curl -s -o /dev/null -w "%{http_code}" $sec_plugin_url)
sec_plugin_url="https://central.sonatype.com/service/rest/repository/browse/maven-snapshots/org/opensearch/plugin/opensearch-security/${opensearch_version}/"
sec_st=$(curl -L -s -o /dev/null -w "%{http_code}" $sec_plugin_url)
if [ "$sec_st" = "200" ]; then
echo "isSecurityPluginAvailable=True" >> $GITHUB_OUTPUT
cat $GITHUB_OUTPUT
else
echo "isSecurityPluginAvailable=False" >> $GITHUB_OUTPUT
cat $GITHUB_OUTPUT
fi
knn_plugin_url="https://central.sonatype.com/service/rest/repository/browse/maven-snapshots/org/opensearch/plugin/opensearch-knn/"$opensearch_version"/"
knn_st=$(curl -s -o /dev/null -w "%{http_code}" $knn_plugin_url)
knn_plugin_url="https://central.sonatype.com/service/rest/repository/browse/maven-snapshots/org/opensearch/plugin/opensearch-knn/${opensearch_version}/"
knn_st=$(curl -L -s -o /dev/null -w "%{http_code}" $knn_plugin_url)
if [ "$knn_st" = "200" ]; then
echo "isKnnPluginAvailable=True" >> $GITHUB_OUTPUT
cat $GITHUB_OUTPUT
Expand Down
11 changes: 4 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ testClusters {
if(securityEnabled) {
plugin(provider(securityPluginFile))
}
testDistribution = "INTEG_TEST"
testDistribution = "ARCHIVE"
if(knnEnabled) {
plugin(provider(knnPluginFile))
testDistribution = "ARCHIVE"
Expand All @@ -400,7 +400,7 @@ testClusters {
if(_numNodes == 1) jvmArgs "${-> getDebugJvmArgs(debugPort++)}"
}
followCluster {
testDistribution = "INTEG_TEST"
testDistribution = "ARCHIVE"
plugin(project.tasks.bundlePlugin.archiveFile)
if(securityEnabled) {
plugin(provider(securityPluginFile))
Expand Down Expand Up @@ -701,11 +701,8 @@ clusters.each { name ->
}
// Currently fetching the ARCHIVE distribution fails on mac as it tries to fetch the Mac specific "DARWIN" distribution
// for Opensearch which is not publish yet. Changing this to INTEG_TEST to make it work on mac.
if (System.getProperty("os.name").startsWith("Mac")) {
testDistribution = "INTEG_TEST"
} else {
testDistribution = "ARCHIVE"
}

testDistribution = "ARCHIVE"

if (_numNodes != 3) numberOfNodes = 3
setting 'path.repo', repo.absolutePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import org.junit.Assume
import java.nio.file.Files
import java.util.concurrent.TimeUnit
import org.opensearch.replication.ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS
import org.junit.BeforeClass
import org.junit.AfterClass

@MultiClusterAnnotations.ClusterConfigurations(
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER),
Expand All @@ -57,6 +59,44 @@ class ResumeReplicationIT: MultiClusterRestTestCase() {
private val buildDir = System.getProperty("build.dir")
private val synonymsJson = "/analyzers/synonym_setting.json"

companion object {
private val allSynonymPaths = mutableListOf<java.nio.file.Path>()
private val buildDir = System.getProperty("build.dir")

@BeforeClass
@JvmStatic
fun setupSynonymFiles() {
val testClustersDir = PathUtils.get(buildDir, "testclusters")

Files.walk(testClustersDir, 1)
.filter { Files.isDirectory(it) && (it.fileName.toString().startsWith("leaderCluster") || it.fileName.toString().startsWith("followCluster")) }
.forEach { clusterDir ->
val configDir = clusterDir.resolve("config")
if (Files.exists(configDir)) {
// Copy all required synonym files
listOf("synonyms.txt", "synonyms_new.txt", "synonyms_follower.txt").forEach { filename ->
val targetPath = configDir.resolve(filename)
if (!Files.exists(targetPath)) {
ResumeReplicationIT::class.java.getResourceAsStream("/analyzers/synonyms.txt")?.use { synonymStream ->
Files.copy(synonymStream, targetPath)
}
}
allSynonymPaths.add(targetPath)
}
}
}
}

@AfterClass
@JvmStatic
fun cleanupSynonymFiles() {
allSynonymPaths.forEach { if (Files.exists(it)) Files.delete(it) }
allSynonymPaths.clear()
}
}



fun `test pause and resume replication in following state and empty index`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
Expand Down Expand Up @@ -168,133 +208,95 @@ class ResumeReplicationIT: MultiClusterRestTestCase() {

Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote())

val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt")
val config = PathUtils.get(buildDir, leaderClusterPath, "config")
val synonymPath = config.resolve("synonyms.txt")
val leaderClient = getClientForCluster(LEADER)
val followerClient = getClientForCluster(FOLLOWER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
createConnectionBetweenClusters(FOLLOWER, LEADER)
val followerSynonymFilename = "synonyms_not_exists.txt"
val overriddenSettings: Settings = Settings.builder()
.put("index.analysis.filter.my_filter.type", "synonym")
.put("index.analysis.filter.my_filter.synonyms_path", followerSynonymFilename)
.build()
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, overriddenSettings), waitForRestore = true)
followerClient.pauseReplication(followerIndexName)
leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT);
val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false)
.build()
try {
Files.copy(synonyms, synonymPath)
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
createConnectionBetweenClusters(FOLLOWER, LEADER)
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true)
followerClient.pauseReplication(followerIndexName)
leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT);
val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false)
.build()
try {
leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT)
} catch (e: Exception) {
assumeNoException("Ignored test as analyzer setting could not be added", e)
}
leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT);
assertThatThrownBy {
followerClient.resumeReplication(followerIndexName)
}.isInstanceOf(ResponseException::class.java).hasMessageContaining("resource_not_found_exception")
} finally {
if (Files.exists(synonymPath)) {
Files.delete(synonymPath)
}
leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT)
} catch (e: Exception) {
assumeNoException("Ignored test as analyzer setting could not be added", e)
}
leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT);
assertThatThrownBy {
followerClient.resumeReplication(followerIndexName)
}.isInstanceOf(ResponseException::class.java).hasMessageContaining("resource_not_found_exception")
}

fun `test that replication resumes when custom analyser is present in follower`() {

Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote())

val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt")
val config = PathUtils.get(buildDir, leaderClusterPath, "config")
val synonymFilename = "synonyms.txt"
val synonymPath = config.resolve(synonymFilename)
val followerConfig = PathUtils.get(buildDir, followerClusterPath, "config")
val followerSynonymPath = followerConfig.resolve(synonymFilename)
val leaderClient = getClientForCluster(LEADER)
val followerClient = getClientForCluster(FOLLOWER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
createConnectionBetweenClusters(FOLLOWER, LEADER)
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true)
followerClient.pauseReplication(followerIndexName)
leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT);
val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false)
.build()
try {
Files.copy(synonyms, synonymPath)
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
createConnectionBetweenClusters(FOLLOWER, LEADER)
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true)
followerClient.pauseReplication(followerIndexName)
leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT);
Files.copy(synonyms, followerSynonymPath)
val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false)
.build()
try {
leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT)
} catch (e: Exception) {
assumeNoException("Ignored test as analyzer setting could not be added", e)
}
leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT);
followerClient.resumeReplication(followerIndexName)
var statusResp = followerClient.replicationStatus(followerIndexName)
`validate status syncing response`(statusResp)
} finally {
if (Files.exists(synonymPath)) {
Files.delete(synonymPath)
}
if (Files.exists(followerSynonymPath)) {
Files.delete(followerSynonymPath)
}
leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT)
} catch (e: Exception) {
assumeNoException("Ignored test as analyzer setting could not be added", e)
}
leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT);
followerClient.resumeReplication(followerIndexName)
var statusResp = followerClient.replicationStatus(followerIndexName)
`validate status syncing response`(statusResp)
}

fun `test that replication resumes when custom analyser is overridden and present in follower`() {

Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote())

val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt")
val config = PathUtils.get(buildDir, leaderClusterPath, "config")
val synonymPath = config.resolve("synonyms.txt")
val newSynonymPath = config.resolve("synonyms_new.txt")
val followerConfig = PathUtils.get(buildDir, followerClusterPath, "config")
val followerSynonymFilename = "synonyms_follower.txt"
val followerSynonymPath = followerConfig.resolve(followerSynonymFilename)
val leaderClient = getClientForCluster(LEADER)
val followerClient = getClientForCluster(FOLLOWER)

var settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false)
.build()
try {
Files.copy(synonyms, synonymPath)
Files.copy(synonyms, followerSynonymPath)
var settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false)
.build()
try {
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
} catch (e: Exception) {
assumeNoException("Ignored test as analyzer setting could not be added", e)
}
createConnectionBetweenClusters(FOLLOWER, LEADER)
val overriddenSettings: Settings = Settings.builder()
.put("index.analysis.filter.my_filter.synonyms_path", followerSynonymFilename)
.build()
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, overriddenSettings), waitForRestore = true)
followerClient.pauseReplication(followerIndexName)
leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT);
Files.copy(synonyms, newSynonymPath)
settings = Settings.builder()
.put("index.analysis.filter.my_filter.synonyms_path", "synonyms_new.txt")
.build()
try {
leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT)
} catch (e: Exception) {
assumeNoException("Ignored test as analyzer setting could not be added", e)
}
leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT);
followerClient.resumeReplication(followerIndexName)
var statusResp = followerClient.replicationStatus(followerIndexName)
`validate status syncing response`(statusResp)
} finally {
if (Files.exists(synonymPath)) {
Files.delete(synonymPath)
}
if (Files.exists(followerSynonymPath)) {
Files.delete(followerSynonymPath)
}
if (Files.exists(newSynonymPath)) {
Files.delete(newSynonymPath)
}
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
} catch (e: Exception) {
assumeNoException("Ignored test as analyzer setting could not be added", e)
}
createConnectionBetweenClusters(FOLLOWER, LEADER)
val overriddenSettings: Settings = Settings.builder()
.put("index.analysis.filter.my_filter.type", "synonym")
.put("index.analysis.filter.my_filter.synonyms_path", followerSynonymFilename)
.build()
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, overriddenSettings), waitForRestore = true)
followerClient.pauseReplication(followerIndexName)
leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT);
settings = Settings.builder()
.put("index.analysis.filter.my_filter.type", "synonym")
.put("index.analysis.filter.my_filter.synonyms_path", "synonyms_new.txt")
.build()
try {
leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT)
} catch (e: Exception) {
assumeNoException("Ignored test as analyzer setting could not be added", e)
}
leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT);
followerClient.resumeReplication(followerIndexName)
var statusResp = followerClient.replicationStatus(followerIndexName)
`validate status syncing response`(statusResp)
}
}
Loading
Loading