diff --git a/chaos-common/src/main/java/io/openchaos/common/utils/SshUtil.java b/chaos-common/src/main/java/io/openchaos/common/utils/SshUtil.java index 3ea46ae..594aef8 100644 --- a/chaos-common/src/main/java/io/openchaos/common/utils/SshUtil.java +++ b/chaos-common/src/main/java/io/openchaos/common/utils/SshUtil.java @@ -43,7 +43,7 @@ public static void init(String username, String password, List nodes) th SshUtil.username = username; SshUtil.password = password; client = SshClient.setUpDefaultClient(); - if (password != null && !Objects.equals(password, "")){ + if (password != null && !Objects.equals(password, "")) { client.addPasswordIdentity(password); } client.start(); diff --git a/chaos-common/src/test/java/io/openchaos/common/utils/SshUtilTest.java b/chaos-common/src/test/java/io/openchaos/common/utils/SshUtilTest.java index 13ea574..2638d27 100644 --- a/chaos-common/src/test/java/io/openchaos/common/utils/SshUtilTest.java +++ b/chaos-common/src/test/java/io/openchaos/common/utils/SshUtilTest.java @@ -1,11 +1,13 @@ package io.openchaos.common.utils; +import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; import java.util.List; import static org.junit.Assert.*; +@Ignore("Requires external SSH host; not part of automated test suite") public class SshUtilTest { static String user = "root"; static String password = "Yigeyy00"; diff --git a/docker/node/Dockerfile b/docker/node/Dockerfile index 08fb9d8..b43d3c9 100644 --- a/docker/node/Dockerfile +++ b/docker/node/Dockerfile @@ -1,5 +1,8 @@ # Based on the deprecated `https://github.com/tutumcloud/tutum-debian` -FROM debian:stretch +FROM debian:sid + +# Set noninteractive to avoid tz/locale prompts +ENV DEBIAN_FRONTEND=noninteractive # Install packages RUN apt-get update && \ @@ -8,10 +11,20 @@ RUN apt-get update && \ openssh-server \ openjdk-8-jdk \ pwgen \ + locales \ && \ -mkdir -p /var/run/sshd && \ -sed -i "s/UsePrivilegeSeparation.*/UsePrivilegeSeparation no/g" /etc/ssh/sshd_config && \ -sed -i "s/PermitRootLogin without-password/PermitRootLogin yes/g" /etc/ssh/sshd_config + mkdir -p /var/run/sshd && \ + sed -i "s/UsePrivilegeSeparation.*/UsePrivilegeSeparation no/g" /etc/ssh/sshd_config && \ + sed -i "s/PermitRootLogin without-password/PermitRootLogin yes/g" /etc/ssh/sshd_config && \ + # Configure UTF-8 locale + sed -i 's/# en_US.UTF-8 UTF-8/en_US.UTF-8 UTF-8/' /etc/locale.gen && \ + locale-gen en_US.UTF-8 && \ + update-locale LANG=en_US.UTF-8 + +# Make sure locale is exported in all shells +ENV LANG=en_US.UTF-8 +ENV LANGUAGE=en_US:en +ENV LC_ALL=en_US.UTF-8 ENV AUTHORIZED_KEYS **None** @@ -27,8 +40,8 @@ RUN apt-get update && apt-get install -y maven RUN apt-get install -y \ sudo net-tools wget \ curl vim man faketime unzip less \ - iptables iputils-ping logrotate && \ - apt-get remove -y --purge --auto-remove systemd + iptables iputils-ping logrotate + # apt-get remove -y --purge --auto-remove systemd EXPOSE 22 CMD ["/run.sh"] diff --git a/driver-kafka/src/main/java/io/openchaos/driver/kafka/KafkaChaosNode.java b/driver-kafka/src/main/java/io/openchaos/driver/kafka/KafkaChaosNode.java index 29fa4d0..a747d3e 100644 --- a/driver-kafka/src/main/java/io/openchaos/driver/kafka/KafkaChaosNode.java +++ b/driver-kafka/src/main/java/io/openchaos/driver/kafka/KafkaChaosNode.java @@ -90,7 +90,7 @@ public void start() { try { //Start broker log.info("Node {} start broker...", node); - SshUtil.execCommandInDir(node, installDir, String.format("nohup sh bin/kafka-server-start.sh '%s' > broker.log 2>&1 &" + SshUtil.execCommandInDir(node, installDir, String.format("nohup bash bin/kafka-server-start.sh '%s' > broker.log 2>&1 &" , configureFilePath)); } catch (Exception e) { log.error("Node {} start kafka node failed", node, e); diff --git a/driver-kafka/src/main/java/io/openchaos/driver/kafka/KafkaChaosZKNode.java b/driver-kafka/src/main/java/io/openchaos/driver/kafka/KafkaChaosZKNode.java index 8f018a4..92f7fbc 100644 --- a/driver-kafka/src/main/java/io/openchaos/driver/kafka/KafkaChaosZKNode.java +++ b/driver-kafka/src/main/java/io/openchaos/driver/kafka/KafkaChaosZKNode.java @@ -83,7 +83,7 @@ public void start() { try { //Start zookeeper log.info("Node {} start zookeeper...", node); - SshUtil.execCommandInDir(node, installDir, String.format("nohup sh bin/zookeeper-server-start.sh '%s' > zookeeper.log 2>&1 &" + SshUtil.execCommandInDir(node, installDir, String.format("nohup bash bin/zookeeper-server-start.sh '%s' > zookeeper.log 2>&1 &" , configureFilePath)); } catch (Exception e) { log.error("Node {} start zookeeper node failed", node, e); diff --git a/driver-rabbitmq/rabbitmq.yaml b/driver-rabbitmq/rabbitmq.yaml index 5667a8b..9ae7a65 100644 --- a/driver-rabbitmq/rabbitmq.yaml +++ b/driver-rabbitmq/rabbitmq.yaml @@ -19,19 +19,23 @@ endToEndLatencyCheck: true # Nodes for broker nodes: # replace with ip or domain name, such as 192.168.0.2 -- acloud +- n1 +- n2 +- n3 +- n4 +- n5 # RabbitMQ configuration -rabbitmqVersion: 3.8.35 -installDir: /usr/local/rabbitmq-server-3.8.35 # you could set existent location for RabbitMQ -configureFilePath: /usr/local/rabbitmq-server-3.8.35/etc +rabbitmqVersion: 4.2.3 +installDir: /usr/local/rabbitmq-server-4.2.3 # you could set existent location for RabbitMQ +configureFilePath: /usr/local/rabbitmq-server-4.2.3/etc # RabbitMQ client configuration. Please add user root with password root user: root password: root # RabbitMQ broker configuration -haMode : classic +haMode : classic # classic or quorum haParms : - totalNodes : 3 - nodes : 3 \ No newline at end of file + totalNodes : 5 + nodes : 5 \ No newline at end of file diff --git a/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQChaosNode.java b/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQChaosNode.java index 166f2a9..e52e542 100644 --- a/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQChaosNode.java +++ b/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQChaosNode.java @@ -33,52 +33,40 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; - public class RabbitMQChaosNode implements QueueNode { - private static final String BROKER_PROCESS_NAME = "beam.smp"; private static final Logger log = LoggerFactory.getLogger(RabbitMQChaosNode.class); - private String node; - private List nodes; - private RabbitMQBrokerConfig rmqBrokerConfig; - private String installDir = "rabbitmq-chaos-test"; - private String rabbitmqVersion = "3.8.35"; - private String configureFilePath = "broker-chaos-test.conf"; - private Sync sync; - - public RabbitMQChaosNode(String node, List nodes, RabbitMQConfig rmqConfig, - RabbitMQBrokerConfig rmqBrokerConfig, Sync sync) { + private static final String PROCESS = "beam.smp"; + + private final String node; + private final List nodes; + private final Sync sync; + private final String rmqHome; + private String rabbitmqVersion = "4.2.3"; + + public RabbitMQChaosNode(String node, List nodes, RabbitMQConfig config, RabbitMQBrokerConfig brokerConfig, Sync sync) { this.node = node; this.nodes = nodes; - this.rmqBrokerConfig = rmqBrokerConfig; - if (rmqConfig.installDir != null && !rmqConfig.installDir.isEmpty()) { - this.installDir = rmqConfig.installDir; - } - if (rmqConfig.rabbitmqVersion != null && !rmqConfig.rabbitmqVersion.isEmpty()) { - this.rabbitmqVersion = rmqConfig.rabbitmqVersion; - } - if (rmqConfig.configureFilePath != null && !rmqConfig.configureFilePath.isEmpty()) { - this.configureFilePath = rmqConfig.configureFilePath; - } this.sync = sync; + if (StringUtils.isNotBlank(config.rabbitmqVersion)) this.rabbitmqVersion = config.rabbitmqVersion; + this.rmqHome = "/usr/local/rabbitmq-server-" + rabbitmqVersion; } @Override public void setup() { - if (sync.status == Sync.State.START || sync.status == Sync.State.FINISH) { - return; - } + if (sync.status == Sync.State.START || sync.status == Sync.State.FINISH) return; + sync.status = Sync.State.START; - CountDownLatch latch = new CountDownLatch(nodes.size()); Executor executor = new ForkJoinPool(nodes.size()); - for (String no : nodes) { - executor.execute(() -> { - try { - setup(no); - } finally { - latch.countDown(); - } - }); - } + CountDownLatch latch = new CountDownLatch(nodes.size()); + + nodes.forEach(no -> executor.execute(() -> { + try { + setupNode(no); + } finally { + latch.countDown(); + } + })); + try { latch.await(20, TimeUnit.MINUTES); sync.addUser("root", "root"); @@ -89,112 +77,109 @@ public void setup() { } } - - public void setup(String no) { + private void setupNode(String no) { try { - // install erlang and rabbitmq installErlang(no); installRabbitmq(no); + sync.barrier.await(14, TimeUnit.MINUTES); - // sync cookie sync.resetBarrier(); sync.syncCookie(no); sync.barrier.await(5, TimeUnit.MINUTES); - try { - SshUtil.execCommand(no, "rabbitmq-server -detached"); - } catch (Exception e) { - log.error(e.getMessage()); - } - // join cluster - sync.resetBarrier(); - if (!Objects.equals(no, sync.getLeader())) { - try { - SshUtil.execCommand(no, "rabbitmqctl stop_app"); - SshUtil.execCommand(no, "rabbitmqctl reset"); - SshUtil.execCommand(no, "rabbitmqctl join_cluster rabbit@" + sync.getLeader()); - SshUtil.execCommand(no, "rabbitmqctl start_app"); - log.info(no + " join cluster rabbit@" + sync.getLeader() + " finished"); - } catch (Exception e) { - log.error(e.getMessage()); - } - } - sync.barrier.await(5, TimeUnit.MINUTES); - ClusterStatus clusterStatus = null; + + // Clean start + SshUtil.execCommand(no, "killall -q " + PROCESS + " || true"); + SshUtil.execCommand(no, "rm -rf " + rmqHome + "/var/lib/rabbitmq/mnesia/*"); + Thread.sleep(2500); + SshUtil.execCommand(no, "rabbitmq-server -detached"); + Thread.sleep(5000); + sync.resetBarrier(); - while (clusterStatus == null || clusterStatus.getRunning_nodes().size() != nodes.size()) { - String cmd = "rabbitmqctl cluster_status --formatter json"; - String res = SshUtil.execCommandWithArgsReturnStr(no, cmd); - ObjectMapper objectMapper = new ObjectMapper(); - clusterStatus = objectMapper.readValue(res, ClusterStatus.class); + boolean isLeader = Objects.equals(no, sync.getLeader()); + + // App Reset Logic + log.info("Resetting RabbitMQ on node: {}", no); + SshUtil.execCommand(no, "rabbitmqctl stop_app"); + Thread.sleep(10000); + SshUtil.execCommand(no, "rabbitmqctl reset"); + + if (!isLeader) { + Thread.sleep(15000); // Wait for leader + SshUtil.execCommand(no, "rabbitmqctl join_cluster rabbit@" + sync.getLeader()); } - sync.barrier.await(5, TimeUnit.MINUTES); + + SshUtil.execCommand(no, "rabbitmqctl start_app"); + Thread.sleep(15000); + + waitForCluster(no); } catch (Exception e) { - log.error("Node {} setup rabbitmq node failed", no, e); + log.error("Setup failed for node {}", no, e); throw new RuntimeException(e); } } + private void waitForCluster(String no) throws Exception { + sync.barrier.await(5, TimeUnit.MINUTES); + sync.resetBarrier(); + ObjectMapper mapper = new ObjectMapper(); + while (true) { + String res = SshUtil.execCommandWithArgsReturnStr(no, "rabbitmqctl cluster_status --formatter json"); + ClusterStatus status = mapper.readValue(res, ClusterStatus.class); + if (status != null && status.getRunningNodes().size() == nodes.size()) break; + Thread.sleep(5000); + } + sync.barrier.await(5, TimeUnit.MINUTES); + } + private void installErlang(String no) throws Exception { - try { - String erl = SshUtil.execCommandWithArgsReturnStr(no, "which erl"); - if (!StringUtils.isEmpty(erl)) { - return; - } - } catch (Exception ignored) { + if (StringUtils.isNotBlank(safeExec(no, "which erl"))) { + SshUtil.execCommand(no, "pgrep epmd || sudo epmd -daemon"); + return; } - SshUtil.execCommand(no, "rm -rf /opt/erlang"); - SshUtil.execCommand(no, "mkdir /opt/erlang"); - SshUtil.execCommand(no, "yum -y install vim make libtool libtool-ltdl-devel libevent-devel lua-devel openssl-devel flex mysql-devel gcc.x86_64 gcc-c++.x86_64 ncurses-devel wget lrzsz"); - SshUtil.execCommandInDir(no, "/opt/erlang", "wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.2.6/erlang-23.2.6-1.el7.x86_64.rpm"); - SshUtil.execCommandInDir(no, "/opt/erlang", "rpm -ivh erlang-23.2.6-1.el7.x86_64.rpm"); + SshUtil.execCommand(no, "apt update && apt install -y erlang vim make libtool libevent-dev lua5.3 libssl-dev flex gcc g++ ncurses-dev wget lrzsz xz-utils"); + SshUtil.execCommand(no, "pgrep epmd || sudo epmd -daemon"); } private void installRabbitmq(String no) throws Exception { - try { - String rab = SshUtil.execCommandWithArgsReturnStr(no, "which rabbitmq-server"); - if (!StringUtils.isEmpty(rab)) { - return; - } - } catch (Exception ignored) { - } - String ls; - try { - ls = SshUtil.execCommandWithArgsReturnStr(no, "ls | grep rabbitmq-server-generic-unix-3.8.35.tar"); - } catch (Exception e) { - ls = e.getLocalizedMessage(); - } - if (!StringUtils.equals(ls, "rabbitmq-server-generic-unix-3.8.35.tar\n")) { - log.info(no + " downloading rabbitmq 3.8.35"); - SshUtil.execCommand(no, "wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.35/rabbitmq-server-generic-unix-3.8.35.tar.xz"); - SshUtil.execCommand(no, "xz -d rabbitmq-server-generic-unix-3.8.35.tar.xz"); + if (StringUtils.isNotBlank(safeExec(no, "which rabbitmq-server"))) return; + + String tar = "rabbitmq-server-generic-unix-" + rabbitmqVersion + ".tar"; + String xz = tar + ".xz"; + + if (!StringUtils.contains(safeExec(no, "ls"), tar)) { + SshUtil.execCommand(no, "wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v" + rabbitmqVersion + "/" + xz); + SshUtil.execCommand(no, "xz -d " + xz); } - SshUtil.execCommand(no, "tar -xvf rabbitmq-server-generic-unix-3.8.35.tar"); - SshUtil.execCommand(no, "rm -rf /usr/local/rabbitmq-server-3.8.35"); - SshUtil.execCommand(no, "mv rabbitmq_server-3.8.35 /usr/local/rabbitmq-server-3.8.35"); - SshUtil.execCommand(no, "echo 'export PATH=$PATH::/usr/local/rabbitmq-server-3.8.35/sbin' >> /etc/profile"); - SshUtil.execCommand(no, "echo 'export PATH=$PATH::/usr/local/rabbitmq-server-3.8.35/sbin' >> ~/.bashrc"); - SshUtil.execCommand(no, "source /etc/profile"); - SshUtil.execCommand(no, "source ~/.bashrc"); + + SshUtil.execCommand(no, String.format("tar -xvf %s && rm -rf %s && mv rabbitmq_server-%s %s", tar, rmqHome, rabbitmqVersion, rmqHome)); + + String pathCmd = "export PATH=$PATH:" + rmqHome + "/sbin"; + SshUtil.execCommand(no, String.format("echo '%s' >> /etc/profile && echo '%s' >> ~/.bashrc", pathCmd, pathCmd)); SshUtil.execCommand(no, "rabbitmq-plugins enable rabbitmq_management"); } - @Override - public void teardown() { - stop(); + private String safeExec(String no, String cmd) { + try { + return SshUtil.execCommandWithArgsReturnStr(no, cmd); + } catch (Exception e) { + return ""; + } } @Override public void start() { try { - // start broker - log.info("Node {} start broker...", node); - SshUtil.execCommandInDir(node, installDir, "sbin/rabbitmq-server -detached"); + SshUtil.execCommand(node, rmqHome + "/sbin/rabbitmq-server -detached"); } catch (Exception e) { - log.error("Node {} start rabbitmq node failed", node, e); throw new RuntimeException(e); } } + @Override + public void teardown() { + stop(); + } + @Override public void stop() { try { @@ -208,9 +193,8 @@ public void stop() { @Override public void kill() { try { - KillProcessUtil.forceKillInErl(node, BROKER_PROCESS_NAME); + KillProcessUtil.forceKillInErl(node, PROCESS); } catch (Exception e) { - log.error("Node {} kill rabbitmq processes failed", node, e); throw new RuntimeException(e); } } @@ -218,9 +202,8 @@ public void kill() { @Override public void pause() { try { - PauseProcessUtil.suspend(node, BROKER_PROCESS_NAME); + PauseProcessUtil.suspend(node, PROCESS); } catch (Exception e) { - log.error("Node {} pause rabbitmq processes failed", node, e); throw new RuntimeException(e); } } @@ -228,10 +211,9 @@ public void pause() { @Override public void resume() { try { - PauseProcessUtil.resumeInErl(node, BROKER_PROCESS_NAME); + PauseProcessUtil.resumeInErl(node, PROCESS); } catch (Exception e) { - log.error("Node {} resume rabbitmq processes failed", node, e); throw new RuntimeException(e); } } -} +} \ No newline at end of file diff --git a/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQChaosPushConsumer.java b/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQChaosPushConsumer.java index c54602f..81f56a1 100644 --- a/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQChaosPushConsumer.java +++ b/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQChaosPushConsumer.java @@ -34,11 +34,12 @@ public class RabbitMQChaosPushConsumer implements QueuePushConsumer { private String queueName; private String consumeGroup; private ConsumerCallback consumerCallBack; + private boolean durableQueue; public RabbitMQChaosPushConsumer(DefaultRabbitMQPushConsumer consumer, ConnectionFactory factory, String queueName, String consumeGroup, - ConsumerCallback consumerCallback, ObjectPool channelPool, Connection connection) { + ConsumerCallback consumerCallback, ObjectPool channelPool, Connection connection, boolean durableQueue) { this.consumer = consumer; this.factory = factory; this.queueName = queueName; @@ -46,13 +47,14 @@ public RabbitMQChaosPushConsumer(DefaultRabbitMQPushConsumer consumer, this.consumerCallBack = consumerCallback; this.connection = connection; this.channelPool = channelPool; + this.durableQueue = durableQueue; } @Override public void start() { try { if (consumer == null) { - consumer = new DefaultRabbitMQPushConsumer(factory, queueName, consumerCallBack, consumeGroup, channelPool, connection); + consumer = new DefaultRabbitMQPushConsumer(factory, queueName, consumerCallBack, consumeGroup, channelPool, connection, durableQueue); } } catch (Exception e) { throw new RuntimeException(e); diff --git a/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQChaosState.java b/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQChaosState.java index c1ddb2e..feca232 100644 --- a/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQChaosState.java +++ b/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQChaosState.java @@ -14,7 +14,9 @@ package io.openchaos.driver.rabbitmq; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; import io.openchaos.common.utils.SshUtil; import io.openchaos.driver.queue.QueueState; import io.openchaos.driver.rabbitmq.core.HaMode; @@ -28,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -55,12 +58,27 @@ public void initialize(String metaName, String metaNode) { public Set getLeader() { Set leaderAddr = new HashSet<>(); if (haMode == HaMode.quorum) { + String res; try { - String res = SshUtil.execCommandWithArgsReturnStr(leader, "rabbitmq-queues quorum_status \"openchaos_client_1\" | grep leader "); - String[] s = res.split(" "); - leaderAddr.add(getHost(s[1])); + res = SshUtil.execCommandWithArgsReturnStr(leader, "rabbitmq-queues quorum_status \"openchaos_client_1\" --formatter json"); } catch (Exception e) { - log.warn("Get leader failed!"); + log.warn("SSH command failed", e); + return Collections.emptySet(); + } + + try { + ObjectMapper mapper = new ObjectMapper(); + ArrayNode nodesJson = (ArrayNode) mapper.readTree(res); + for (JsonNode node : nodesJson) { + String state = node.get("Raft State").asText(); + if ("leader".equalsIgnoreCase(state)) { + String leaderNode = node.get("Node Name").asText(); + leaderAddr.add(getHost(leaderNode)); + break; + } + } + } catch (Exception e) { + log.warn("JSON parsing failed", e); } } else if (haMode == HaMode.classic) { String url = "http://" + leader + ":15672/api/queues/%2f/openchaos_client_1"; @@ -81,7 +99,8 @@ public void close() { } private String getHost(String nodeName) { - return nodeName.split("@")[1]; + int i = nodeName.indexOf('@'); + return i >= 0 ? nodeName.substring(i + 1) : nodeName; } private String sendGet(String url) { diff --git a/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQDriver.java b/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQDriver.java index 49f3413..6844ea1 100644 --- a/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQDriver.java +++ b/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/RabbitMQDriver.java @@ -34,10 +34,11 @@ import org.apache.commons.pool2.ObjectPool; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; - import java.io.File; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeoutException; public class RabbitMQDriver implements QueueDriver { @@ -109,11 +110,18 @@ public void initialState() { tmpFac.setHost(nodes.get(0)); tmpFac.setUsername(user); tmpFac.setPassword(password); + tmpFac.setRequestedHeartbeat(2); // heartbeat every 2 seconds try { Connection tmpCon = tmpFac.newConnection("tmp"); Channel tmpChan = tmpCon.createChannel(); tmpChan.queueDelete(queueName); - tmpChan.queueDeclare(queueName, false, false, false, null); + if (rmqBrokerConfig.haMode.name().equals("quorum")) { + Map argsMap = new HashMap<>(); + argsMap.put("x-queue-type", "quorum"); + tmpChan.queueDeclare(queueName, true, false, false, argsMap); + } else if (rmqBrokerConfig.haMode.name().equals("classic")) { + tmpChan.queueDeclare(queueName, false, false, false, null); + } tmpChan.close(); tmpCon.close(); } catch (IOException | TimeoutException e) { @@ -124,6 +132,7 @@ public void initialState() { factory.setHost(state.getLeader().iterator().next()); factory.setUsername(user); factory.setPassword(password); + factory.setRequestedHeartbeat(2); // heartbeat every 2 seconds try { GenericObjectPoolConfig config = new GenericObjectPoolConfig<>(); config.setMaxTotal(200); @@ -202,9 +211,9 @@ public QueuePushConsumer createPushConsumer(String topic, String subscriptionNam } RabbitMQChaosPushConsumer rabbitMQChaosPushConsumer; try { - DefaultRabbitMQPushConsumer pushConsumer = new DefaultRabbitMQPushConsumer(factory, queueName, consumerCallback, subscriptionName, consumerChannelPool, consumerConnection); - rabbitMQChaosPushConsumer = new RabbitMQChaosPushConsumer(pushConsumer, - factory, queueName, subscriptionName, consumerCallback, consumerChannelPool, consumerConnection); + boolean durable = rmqBrokerConfig.haMode.name().equals("quorum"); + DefaultRabbitMQPushConsumer pushConsumer = new DefaultRabbitMQPushConsumer(factory, queueName, consumerCallback, subscriptionName, consumerChannelPool, consumerConnection, durable); + rabbitMQChaosPushConsumer = new RabbitMQChaosPushConsumer(pushConsumer, factory, queueName, subscriptionName, consumerCallback, consumerChannelPool, consumerConnection, durable); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/core/ClusterStatus.java b/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/core/ClusterStatus.java index 7ce230e..6e5383b 100644 --- a/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/core/ClusterStatus.java +++ b/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/core/ClusterStatus.java @@ -14,18 +14,20 @@ package io.openchaos.driver.rabbitmq.core; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; @JsonIgnoreProperties(ignoreUnknown = true) public class ClusterStatus { - private List running_nodes; + @JsonProperty("running_nodes") + private List runningNodes; - public List getRunning_nodes() { - return running_nodes; + public List getRunningNodes() { + return runningNodes; } - public void setRunning_nodes(List running_nodes) { - this.running_nodes = running_nodes; + public void setRunningNodes(List runningNodes) { + this.runningNodes = runningNodes; } } diff --git a/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/core/DefaultRabbitMQProducer.java b/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/core/DefaultRabbitMQProducer.java index e46f4da..d5ae4d2 100644 --- a/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/core/DefaultRabbitMQProducer.java +++ b/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/core/DefaultRabbitMQProducer.java @@ -32,7 +32,6 @@ public class DefaultRabbitMQProducer { private Channel channel; public DefaultRabbitMQProducer() { - } public DefaultRabbitMQProducer(ConnectionFactory factory, Connection connection, ObjectPool channelPool) { @@ -50,22 +49,21 @@ public void init() { } public void sendMessage(String queueName, byte[] message) throws Exception { - if (channel == null || !channel.isOpen()) { - channel = channelPool.borrowObject(); - } + Channel ch = null; try { - channel.basicPublish("", queueName, null, message); - } catch (ShutdownSignalException sse) { - // possibly check if channel was closed - // by the time we started action and reasons for - // closing it - log.warn("connection or channel is shutdown"); + ch = channelPool.borrowObject(); + ch.confirmSelect(); + ch.basicPublish("", queueName, null, message); + ch.waitForConfirmsOrDie(); + } catch (IOException | ShutdownSignalException e) { + log.warn("publish failed: {}", e.getMessage()); getNewConnection(); - } catch (IOException ioe) { - // check why connection was closed - log.warn("IO was blocked"); + throw e; + } finally { + if (ch != null) { + channelPool.returnObject(ch); + } } - } public void shutdown() { @@ -97,5 +95,4 @@ public Connection getNewConnection() { public Connection getConnection() { return connection; } -} - +} \ No newline at end of file diff --git a/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/core/DefaultRabbitMQPushConsumer.java b/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/core/DefaultRabbitMQPushConsumer.java index 9a10464..8093881 100644 --- a/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/core/DefaultRabbitMQPushConsumer.java +++ b/driver-rabbitmq/src/main/java/io/openchaos/driver/rabbitmq/core/DefaultRabbitMQPushConsumer.java @@ -26,28 +26,37 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeoutException; public class DefaultRabbitMQPushConsumer { private static final Logger log = LoggerFactory.getLogger(DefaultRabbitMQPushConsumer.class); - private Connection connection; private final String queueName; - private ObjectPool channelPool; - private ConsumerCallback consumerCallback; + private Connection connection; + private final ObjectPool channelPool; + private final ConsumerCallback consumerCallback; private Channel channel; - private ConnectionFactory factory; - private String consumerGroup; + private final ConnectionFactory factory; + private final String consumerGroup; + private final boolean durableQueue; - public DefaultRabbitMQPushConsumer(ConnectionFactory factory, String queueName, - ConsumerCallback consumerCallback, - String consumerGroup, ObjectPool channelPool, Connection connection) { + public DefaultRabbitMQPushConsumer(ConnectionFactory factory, String queueName, ConsumerCallback consumerCallback, String consumerGroup, ObjectPool channelPool, Connection connection, boolean durableQueue) { this.connection = connection; this.channelPool = channelPool; this.factory = factory; this.queueName = queueName; + this.durableQueue = durableQueue; + try { this.channel = channelPool.borrowObject(); - channel.queueDeclare(queueName, false, false, false, null); + if (durableQueue) { + Map argsMap = new HashMap<>(); + argsMap.put("x-queue-type", "quorum"); + channel.queueDeclare(queueName, durableQueue, false, false, argsMap); + } else { + channel.queueDeclare(queueName, durableQueue, false, false, null); + } } catch (Exception e) { throw new RuntimeException(e); } @@ -62,24 +71,20 @@ public void createNewChannel() { channel = channelPool.borrowObject(); } channel.basicQos(64); - channel.basicConsume(queueName, false, "openchaos_client", - new DefaultConsumer(channel) { - @Override - public void handleDelivery(String consumerTag, - Envelope envelope, - AMQP.BasicProperties properties, - byte[] body) { - try { - consumerCallback.messageReceived(new Message(body)); - if (channel == null || !channel.isOpen()) { - channel = channelPool.borrowObject(); - } - channel.basicAck(envelope.getDeliveryTag(), false); - } catch (Exception e) { - log.warn("Create channel failed"); - } + channel.basicConsume(queueName, false, "openchaos_client", new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { + try { + consumerCallback.messageReceived(new Message(body)); + if (channel == null || !channel.isOpen()) { + channel = channelPool.borrowObject(); } - }); + channel.basicAck(envelope.getDeliveryTag(), false); + } catch (Exception e) { + log.warn("Create channel failed"); + } + } + }); } catch (Exception e) { log.warn("Connection occured error! Try to create new connection."); if (!connection.isOpen()) { diff --git a/driver-rabbitmq/src/test/java/io/openchaos/driver/rabbitmq/RabbitMQChaosPushConsumerTest.java b/driver-rabbitmq/src/test/java/io/openchaos/driver/rabbitmq/RabbitMQChaosPushConsumerTest.java index d5cf986..abeab29 100644 --- a/driver-rabbitmq/src/test/java/io/openchaos/driver/rabbitmq/RabbitMQChaosPushConsumerTest.java +++ b/driver-rabbitmq/src/test/java/io/openchaos/driver/rabbitmq/RabbitMQChaosPushConsumerTest.java @@ -30,7 +30,7 @@ public class RabbitMQChaosPushConsumerTest { throw new RuntimeException(e); } consumer = new RabbitMQChaosPushConsumer(pushConsumer, factory, "queuename", - "group", callback, channelPool, conn); + "group", callback, channelPool, conn, false); } diff --git a/driver-rabbitmq/src/test/java/io/openchaos/driver/rabbitmq/core/DefaultRabbitMQPushConsumerTest.java b/driver-rabbitmq/src/test/java/io/openchaos/driver/rabbitmq/core/DefaultRabbitMQPushConsumerTest.java index 7698e44..19363f3 100644 --- a/driver-rabbitmq/src/test/java/io/openchaos/driver/rabbitmq/core/DefaultRabbitMQPushConsumerTest.java +++ b/driver-rabbitmq/src/test/java/io/openchaos/driver/rabbitmq/core/DefaultRabbitMQPushConsumerTest.java @@ -26,7 +26,7 @@ public class DefaultRabbitMQPushConsumerTest { } catch (Exception e) { throw new RuntimeException(e); } - consumer = new DefaultRabbitMQPushConsumer(factory, queueName, callback, "group", channelPool, conn); + consumer = new DefaultRabbitMQPushConsumer(factory, queueName, callback, "group", channelPool, conn, false); } @Test