Skip to content

[couchbase] Add support for scopes and collections. #4805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@

package org.testcontainers.couchbase;

import java.util.ArrayList;
import java.util.List;

/**
* Allows to configure the properties of a bucket that should be created.
*/
public class BucketDefinition {

private final String name;
private final List<ScopeDefinition> scopes = new ArrayList<>();

private boolean flushEnabled = false;
private boolean queryPrimaryIndex = true;
private int quota = 100;
Expand Down Expand Up @@ -66,6 +71,17 @@ public BucketDefinition withPrimaryIndex(final boolean create) {
return this;
}

/**
* Adds a scope (with its collections) to this bucket - only available with 7.0 and later.
*
* @param scope the scope with its collections.
* @return this {@link BucketDefinition} for chaining purposes.
*/
public BucketDefinition withScope(final ScopeDefinition scope) {
this.scopes.add(scope);
return this;
}

public String getName() {
return name;
}
Expand All @@ -82,4 +98,8 @@ public int getQuota() {
return quota;
}

public List<ScopeDefinition> getScopes() {
return scopes;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2021 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.testcontainers.couchbase;

public class CollectionDefinition {

private final String name;
private boolean queryPrimaryIndex = true;

public CollectionDefinition(final String name) {
this.name = name;
}

/**
* Allows to disable creating a primary index for this collection (enabled by default).
*
* @param create if false, a primary index will not be created.
* @return this {@link CollectionDefinition} for chaining purposes.
*/
public CollectionDefinition withPrimaryIndex(final boolean create) {
this.queryPrimaryIndex = create;
return this;
}

public String getName() {
return name;
}

public boolean hasPrimaryIndex() {
return queryPrimaryIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -100,6 +102,11 @@ public class CouchbaseContainer extends GenericContainer<CouchbaseContainer> {
CouchbaseService.INDEX
);

/**
* Holds the custom service quotas if configured by the user.
*/
private final Map<CouchbaseService, Integer> customServiceQuotas = new HashMap<>();

private final List<BucketDefinition> buckets = new ArrayList<>();

private boolean isEnterprise = false;
Expand Down Expand Up @@ -158,6 +165,26 @@ public CouchbaseContainer withEnabledServices(final CouchbaseService... enabled)
return this;
}

/**
* Configures a custom memory quota for a given service.
*
* @param service the service to configure the quota for.
* @param quotaMb the memory quota in MB.
* @return this {@link CouchbaseContainer} for chaining purposes.
*/
public CouchbaseContainer withServiceQuota(final CouchbaseService service, final int quotaMb) {
checkNotRunning();
if (!service.hasQuota()) {
throw new IllegalArgumentException("The provided service (" + service + ") has no quota to configure");
}
if (quotaMb < service.getMinimumQuotaMb()) {
throw new IllegalArgumentException("The custom quota (" + quotaMb + ") must not be smaller than the " +
"minimum quota for the service (" + service.getMinimumQuotaMb() + ")");
}
this.customServiceQuotas.put(service, quotaMb);
return this;
}

/**
* Enables the analytics service which is not enabled by default.
*
Expand Down Expand Up @@ -262,6 +289,7 @@ protected void containerIsStarting(final InspectContainerResponse containerInfo)
timePhase("initializeIsEnterprise", this::initializeIsEnterprise);
timePhase("renameNode", this::renameNode);
timePhase("initializeServices", this::initializeServices);
timePhase("setMemoryQuotas", this::setMemoryQuotas);
timePhase("configureAdminUser", this::configureAdminUser);
timePhase("configureExternalPorts", this::configureExternalPorts);

Expand Down Expand Up @@ -341,6 +369,35 @@ private void initializeServices() {
checkSuccessfulResponse(response, "Could not enable couchbase services");
}

/**
* Sets the memory quotas for each enabled service.
* <p>
* If there is no explicit custom quota defined, the default minimum quota will be used.
*/
private void setMemoryQuotas() {
logger().debug("Custom service memory quotas: {}", customServiceQuotas);

final FormBody.Builder quotaBuilder = new FormBody.Builder();
for (CouchbaseService service : enabledServices) {
if (!service.hasQuota()) {
continue;
}

int quota = customServiceQuotas.getOrDefault(service, service.getMinimumQuotaMb());
if (CouchbaseService.KV.equals(service)) {
quotaBuilder.add("memoryQuota", Integer.toString(quota));
} else {
quotaBuilder.add(service.getIdentifier() + "MemoryQuota", Integer.toString(quota));
}
}

@Cleanup Response response = doHttpRequest(
MGMT_PORT, "/pools/default", "POST", quotaBuilder.build(), false
);

checkSuccessfulResponse(response, "Could not configure service memory quotas");
}

/**
* Configures the admin user on the couchbase node.
* <p>
Expand Down Expand Up @@ -447,63 +504,164 @@ private void createBuckets() {
.waitUntilReady(this)
);

timePhase("createBucket:" + bucket.getName() + ":createScopes", () -> createScopes(bucket));

if (enabledServices.contains(CouchbaseService.QUERY)) {
// If the query service is enabled, make sure that we only proceed if the query engine also
// knows about the bucket in its metadata configuration.
timePhase(
"createBucket:" + bucket.getName() + ":queryKeyspacePresent",
() -> Unreliables.retryUntilTrue(1, TimeUnit.MINUTES, () -> {
@Cleanup Response queryResponse = doHttpRequest(QUERY_PORT, "/query/service", "POST", new FormBody.Builder()
.add("statement", "SELECT COUNT(*) > 0 as present FROM system:keyspaces WHERE name = \"" + bucket.getName() + "\"")
.build(), true);

String body = queryResponse.body() != null ? queryResponse.body().string() : null;
checkSuccessfulResponse(queryResponse, "Could not poll query service state for bucket: " + bucket.getName());

return Optional.of(MAPPER.readTree(body))
.map(n -> n.at("/results/0/present"))
.map(JsonNode::asBoolean)
.orElse(false);
}));
() -> verifyKeyspacePresent(bucket.getName(), null, null)
);
}

if (bucket.hasPrimaryIndex()) {
if (enabledServices.contains(CouchbaseService.QUERY)) {
@Cleanup Response queryResponse = doHttpRequest(QUERY_PORT, "/query/service", "POST", new FormBody.Builder()
.add("statement", "CREATE PRIMARY INDEX on `" + bucket.getName() + "`")
.build(), true);

try {
checkSuccessfulResponse(queryResponse, "Could not create primary index for bucket " + bucket.getName());
} catch (IllegalStateException ex) {
// potentially ignore the error, the index will be eventually built.
if (!ex.getMessage().contains("Index creation will be retried in background")) {
throw ex;
}
}

timePhase(
"createBucket:" + bucket.getName() + ":primaryIndexOnline",
() -> Unreliables.retryUntilTrue(1, TimeUnit.MINUTES, () -> {
@Cleanup Response stateResponse = doHttpRequest(QUERY_PORT, "/query/service", "POST", new FormBody.Builder()
.add("statement", "SELECT count(*) > 0 AS online FROM system:indexes where keyspace_id = \"" + bucket.getName() + "\" and is_primary = true and state = \"online\"")
.build(), true);

String body = stateResponse.body() != null ? stateResponse.body().string() : null;
checkSuccessfulResponse(stateResponse, "Could not poll primary index state for bucket: " + bucket.getName());

return Optional.of(MAPPER.readTree(body))
.map(n -> n.at("/results/0/online"))
.map(JsonNode::asBoolean)
.orElse(false);
}));
"createBucket:" + bucket.getName() + ":createPrimaryIndex",
() -> createPrimaryIndex(bucket.getName(), null, null)
);
} else {
logger().info("Primary index creation for bucket {} ignored, since QUERY service is not present.", bucket.getName());
}
}
}
}

/**
* Creates scopes and collections for a given bucket - only available with server 7.0 and later.
*
* @param bucket the bucket information.
*/
private void createScopes(final BucketDefinition bucket) {
logger().debug("Creating scopes " + bucket.getScopes() + " for bucket " + bucket.getName());

for (ScopeDefinition scope : bucket.getScopes()) {
// We don't need to create the _default scope, it already exists. In fact, if we tried the
// server wouldn't let us. But we can still create collections for it which we do below.
if (!scope.getName().equals("_default")) {
@Cleanup Response scopeResponse = doHttpRequest(
MGMT_PORT,
"/pools/default/buckets/" + bucket.getName() + "/scopes",
"POST",
new FormBody.Builder().add("name", scope.getName()).build(),
true
);

if (scopeResponse.code() == 404) {
throw new IllegalStateException("Scopes and collections are not supported with this cluster version");
}
checkSuccessfulResponse(scopeResponse, "Could not create scope " + scope.getName()
+ " for bucket " + bucket.getName());
}

for (CollectionDefinition collection : scope.getCollections()) {
@Cleanup Response collectionResponse = doHttpRequest(
MGMT_PORT,
"/pools/default/buckets/" + bucket.getName() + "/scopes/" + scope.getName() + "/collections",
"POST",
new FormBody.Builder().add("name", collection.getName()).build(),
true
);

if (collectionResponse.code() == 404) {
throw new IllegalStateException("Scopes and collections are not supported with this cluster version");
}
checkSuccessfulResponse(collectionResponse, "Could not create collection " + collection
+ " for bucket " + bucket.getName());

if (enabledServices.contains(CouchbaseService.QUERY) && collection.hasPrimaryIndex()) {
timePhase(
"createCollection:" + collection.getName() + ":queryKeyspacePresent",
() -> verifyKeyspacePresent(bucket.getName(), scope.getName(), collection.getName())
);
timePhase(
"createCollection:" + collection.getName() + ":createPrimaryIndex",
() -> createPrimaryIndex(bucket.getName(), scope.getName(), collection.getName())
);
}
}
}
}

/**
* Helper method to check if the keyspace is present in the query engine to avoid eventual consistency failures.
*
* @param bucket the name of the bucket.
* @param scope the name of the scope - can be null.
* @param collection the name of the collection - can be null.
*/
private void verifyKeyspacePresent(final String bucket, final String scope, final String collection) {
Unreliables.retryUntilTrue(1, TimeUnit.MINUTES, () -> {
String query = "SELECT COUNT(*) > 0 as present FROM system:keyspaces WHERE name = \"" + bucket + "\"";
String keyspace = bucket;
if (scope != null && collection != null) {
query = "SELECT COUNT(*) > 0 as present FROM system:keyspaces WHERE " +
"name = \"" + collection + "\" AND `bucket` = \"" + bucket + "\" AND `scope` = \"" + scope + "\"";
keyspace = keyspace + "." + scope + "." + collection;
}

@Cleanup Response queryResponse = doHttpRequest(QUERY_PORT, "/query/service", "POST", new FormBody.Builder()
.add("statement", query)
.build(), true);

String body = queryResponse.body() != null ? queryResponse.body().string() : null;
checkSuccessfulResponse(queryResponse, "Could not poll query service state for keyspace: " + keyspace);

return Optional.of(MAPPER.readTree(body))
.map(n -> n.at("/results/0/present"))
.map(JsonNode::asBoolean)
.orElse(false);
});
}

/**
* Helper method to create a primary index (for a bucket or a collection).
*
* @param bucket the name of the bucket.
* @param scope the name of the scope - can be null.
* @param collection the name of the collection - can be null.
*/
private void createPrimaryIndex(final String bucket, final String scope, final String collection) {
final String keyspace = scope != null && collection != null
? "`" + bucket + "`.`" + scope + "`.`" + collection + "`"
: "`" + bucket +"`";

@Cleanup Response queryResponse = doHttpRequest(QUERY_PORT, "/query/service", "POST", new FormBody.Builder()
.add("statement", "CREATE PRIMARY INDEX on " + keyspace + "")
.build(), true);

try {
checkSuccessfulResponse(queryResponse, "Could not create primary index for keyspace " + keyspace);
} catch (IllegalStateException ex) {
// potentially ignore the error, the index will be eventually built.
if (!ex.getMessage().contains("Index creation will be retried in background")) {
throw ex;
}
}

timePhase(
"createPrimaryIndex:" + keyspace + ":primaryIndexOnline",
() -> Unreliables.retryUntilTrue(1, TimeUnit.MINUTES, () -> {
String whereClause = scope != null && collection != null
? "keyspace_id = \"" + collection + "\" and scope_id = \"" + scope + "\" and bucket_id = \"" + bucket + "\""
: "keyspace_id = \"" + bucket + "\"";

@Cleanup Response stateResponse = doHttpRequest(QUERY_PORT, "/query/service", "POST", new FormBody.Builder()
.add("statement", "SELECT count(*) > 0 AS online FROM system:indexes where " + whereClause + " and is_primary = true and state = \"online\"")
.build(), true);


String body = stateResponse.body() != null ? stateResponse.body().string() : null;
checkSuccessfulResponse(stateResponse, "Could not poll primary index state for keyspace: " + keyspace);

return Optional.of(MAPPER.readTree(body))
.map(n -> n.at("/results/0/online"))
.map(JsonNode::asBoolean)
.orElse(false);
}));
}

/**
* Helper method to extract the internal IP address based on the network configuration.
*/
Expand Down
Loading