diff --git a/eventmesh-meta/eventmesh-meta-raft/build.gradle b/eventmesh-meta/eventmesh-meta-raft/build.gradle new file mode 100644 index 0000000000..386b5c6e72 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/build.gradle @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id 'com.google.protobuf' version '0.8.17' +} + +def grpcVersion = '1.50.2' // CURRENT_GRPC_VERSION +def protobufVersion = '3.21.5' +def protocVersion = protobufVersion + +dependencies { + implementation ("io.grpc:grpc-protobuf:${grpcVersion}") { + exclude group: "com.google.protobuf", module: "protobuf-java" + } + implementation("com.google.protobuf:protobuf-java:${protobufVersion}") + implementation "io.grpc:grpc-stub:${grpcVersion}" + implementation "com.google.protobuf:protobuf-java-util:${protobufVersion}" + implementation "javax.annotation:javax.annotation-api:1.3.2" + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' + + implementation project(":eventmesh-meta:eventmesh-meta-api") + implementation project(":eventmesh-common") + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.0.1' + implementation "com.alipay.sofa:jraft-core:1.3.14" + implementation "com.alipay.sofa:rpc-grpc-impl:1.3.14" + testImplementation 'org.junit.jupiter:junit-jupiter:5.6.0' +} + +protobuf { + protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" } + plugins { + grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" } + } + generateProtoTasks { + all()*.plugins { + grpc {} + } + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/gradle.properties b/eventmesh-meta/eventmesh-meta-raft/gradle.properties new file mode 100644 index 0000000000..0010b2d014 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/gradle.properties @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pluginType=metaStorage +pluginName=raft \ No newline at end of file diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java new file mode 100644 index 0000000000..a568a801dc --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft; + +import org.apache.eventmesh.meta.raft.consts.MetaRaftConstants; +import org.apache.eventmesh.meta.raft.rpc.RequestResponse; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.Status; + + +public abstract class EventClosure implements Closure { + + private CompletableFuture future; + + private RequestResponse requestResponse; + + private EventOperation eventOperation; + + public static EventClosure createDefaultEventClosure() { + return new EventClosure() { + + @Override + public void run(Status status) { + + } + }; + } + + public void setFuture(CompletableFuture future) { + this.future = future; + } + + public void setRequestResponse(RequestResponse requestResponse) { + this.requestResponse = requestResponse; + if (future != null) { + future.complete(getRequestResponse()); + } + } + + public RequestResponse getRequestResponse() { + return requestResponse; + } + + public EventOperation getEventOperation() { + return eventOperation; + } + + protected void failure(final String errorMsg, final String redirect) { + final RequestResponse response = RequestResponse.newBuilder().setSuccess(false).setErrorMsg(errorMsg) + .setRedirect(redirect).build(); + setRequestResponse(response); + } + + public void setEventOperation(EventOperation opreation) { + this.eventOperation = opreation; + } + + protected void success(final Map map) { + + final RequestResponse response = RequestResponse.newBuilder().setValue(MetaRaftConstants.RESPONSE) + .setSuccess(true).putAllInfo(map).build(); + setRequestResponse(response); + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java new file mode 100644 index 0000000000..b8c7a6cd55 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft; + +import org.apache.eventmesh.meta.raft.consts.MetaRaftConstants; +import org.apache.eventmesh.meta.raft.rpc.RequestResponse; + +import java.io.Serializable; +import java.util.Map; + + +public class EventOperation implements Serializable { + + private static final long serialVersionUID = -6597003954824547294L; + + public static final byte PUT = 0x01; + + public static final byte GET = 0x02; + + public static final byte DELETE = 0x03; + + private byte op; + private Map data; + + public static EventOperation createOpreation(RequestResponse response) { + if (response.getValue() == MetaRaftConstants.PUT) { + return new EventOperation(PUT, response.getInfoMap()); + } else if (response.getValue() == MetaRaftConstants.GET) { + return new EventOperation(GET, response.getInfoMap()); + } else if (response.getValue() == MetaRaftConstants.DELETE) { + return new EventOperation(DELETE, response.getInfoMap()); + + } + return null; + } + + public EventOperation(byte op, Map data) { + this.op = op; + this.data = data; + } + + public byte getOp() { + return op; + } + + public Map getData() { + return data; + } + + public boolean isReadOp() { + return GET == this.op; + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaService.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaService.java new file mode 100644 index 0000000000..daf636ccd4 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft; + + +import org.apache.eventmesh.meta.raft.rpc.RequestResponse; + +/** + * MetaService. + */ +public interface JraftMetaService { + + void handle(RequestResponse request, EventClosure closure); +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaServiceImpl.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaServiceImpl.java new file mode 100644 index 0000000000..1af6d5c963 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaServiceImpl.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft; + +import org.apache.eventmesh.meta.raft.rpc.RequestResponse; + +import org.apache.commons.lang.StringUtils; + +import java.nio.ByteBuffer; + +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.SerializerManager; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.entity.Task; +import com.alipay.sofa.jraft.error.RaftError; + +public class JraftMetaServiceImpl implements JraftMetaService { + + JraftServer server; + + + public JraftMetaServiceImpl(JraftServer server) { + this.server = server; + } + + @Override + public void handle(RequestResponse request, EventClosure closure) { + applyOperation(EventOperation.createOpreation(request), closure); + } + + public void applyOperation(EventOperation opreation, EventClosure closure) { + if (!isLeader()) { + handlerNotLeaderError(closure); + return; + } + try { + closure.setEventOperation(opreation); + final Task task = new Task(); + task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(opreation))); + task.setDone(closure); + this.server.getNode().apply(task); + } catch (CodecException e) { + String errorMsg = "Fail to encode EventOperation"; + closure.failure(errorMsg, StringUtils.EMPTY); + closure.run(new Status(RaftError.EINTERNAL, errorMsg)); + } + } + + + private String getRedirect() { + return this.server.redirect().getRedirect(); + } + + private boolean isLeader() { + return this.server.getFsm().isLeader(); + } + + + private void handlerNotLeaderError(final EventClosure closure) { + closure.failure("Not leader.", getRedirect()); + closure.run(new Status(RaftError.EPERM, "Not leader")); + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftServer.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftServer.java new file mode 100644 index 0000000000..ad8cb14d96 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftServer.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft; + +import org.apache.eventmesh.meta.raft.rpc.MetaServerHelper; +import org.apache.eventmesh.meta.raft.rpc.RequestProcessor; +import org.apache.eventmesh.meta.raft.rpc.RequestResponse; + +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; + +import com.alipay.sofa.jraft.Node; +import com.alipay.sofa.jraft.RaftGroupService; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; +import com.alipay.sofa.jraft.rpc.RpcServer; + + +public class JraftServer { + + private RaftGroupService raftGroupService; + + private Node node; + + private MetaStateMachine fsm = new MetaStateMachine(); + + public MetaStateMachine getFsm() { + return fsm; + } + + private JraftMetaServiceImpl metaImpl; + + public JraftServer(final String dataPath, final String groupId, final PeerId serverId, + final NodeOptions nodeOptions) throws IOException { + // init raft data path, it contains log,meta,snapshot + FileUtils.forceMkdir(new File(dataPath)); + // here use same RPC server for raft and business. It also can be seperated generally + final RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint()); + MetaServerHelper.initGRpc(); + MetaServerHelper.setRpcServer(rpcServer); + // register business processor + metaImpl = new JraftMetaServiceImpl(this); + rpcServer.registerProcessor(new RequestProcessor(metaImpl)); + nodeOptions.setFsm(this.fsm); + // set storage path (log,meta,snapshot) + // log, must + nodeOptions.setLogUri(dataPath + File.separator + "log"); + // meta, must + nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta"); + // snapshot, optional, generally recommended + nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); + // init raft group service framework + this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer); + // start raft node + this.node = this.raftGroupService.start(); + + } + + public RequestResponse redirect() { + final RequestResponse.Builder builder = RequestResponse.newBuilder().setSuccess(false); + if (this.node != null) { + final PeerId leader = this.node.getLeaderId(); + if (leader != null) { + builder.setRedirect(leader.toString()); + } + } + return builder.build(); + } + + public JraftMetaServiceImpl getMetaImpl() { + return metaImpl; + } + + public Node getNode() { + return node; + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java new file mode 100644 index 0000000000..a0607f5ab4 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft; + +import static org.apache.eventmesh.meta.raft.EventOperation.DELETE; +import static org.apache.eventmesh.meta.raft.EventOperation.GET; +import static org.apache.eventmesh.meta.raft.EventOperation.PUT; + +import org.apache.eventmesh.meta.raft.snapshot.MetaSnapshotFile; + +import org.apache.commons.lang.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.SerializerManager; +import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.Iterator; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.core.StateMachineAdapter; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MetaStateMachine extends StateMachineAdapter { + + private final AtomicLong leaderTerm = new AtomicLong(-1); + + private static ObjectMapper objectMapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + private Map contentTable = new ConcurrentHashMap<>(); + + public boolean isLeader() { + return this.leaderTerm.get() > 0; + } + + @Override + public boolean onSnapshotLoad(final SnapshotReader reader) { + if (isLeader()) { + log.warn("Leader is not supposed to load snapshot"); + return false; + } + if (reader.getFileMeta("data") == null) { + log.error("Fail to find data file in {}", reader.getPath()); + return false; + } + final MetaSnapshotFile snapshot = new MetaSnapshotFile(reader.getPath() + File.separator + "data"); + try { + Map snapshotLoaded = objectMapper.readValue(snapshot.load(), Map.class); + contentTable.clear(); + contentTable.putAll(snapshotLoaded); + return true; + } catch (final IOException e) { + log.error("Fail to load snapshot from {}", snapshot.getPath()); + return false; + } + + } + + @Override + public void onSnapshotSave(SnapshotWriter writer, Closure done) { + executor.submit(() -> { + final MetaSnapshotFile snapshot = new MetaSnapshotFile(writer.getPath() + File.separator + "data"); + try { + if (snapshot.save(objectMapper.writeValueAsString(contentTable))) { + if (writer.addFile("data")) { + done.run(Status.OK()); + } else { + done.run(new Status(RaftError.EIO, "Fail to add file to writer")); + } + } else { + done.run(new Status(RaftError.EIO, "Fail to save snapshot %s", snapshot.getPath())); + } + } catch (IOException e) { + done.run(new Status(RaftError.EIO, "Fail to deserialize snapshot %s", snapshot.getPath())); + } + }); + } + + @Override + public void onApply(Iterator iter) { + while (iter.hasNext()) { + Exception e1 = null; + EventOperation eventOperation = null; + EventClosure closure = null; + if (iter.done() != null) { + // This task is applied by this node, get value from closure to avoid additional parsing. + closure = (EventClosure) iter.done(); + eventOperation = closure.getEventOperation(); + } else { + // Have to parse FetchAddRequest from this user log. + final ByteBuffer data = iter.getData(); + try { + eventOperation = SerializerManager.getSerializer(SerializerManager.Hessian2) + .deserialize(data.array(), EventOperation.class.getName()); + } catch (final CodecException e) { + e.printStackTrace(System.err); + e1 = e; + + } + // follower ignore read operation + if (eventOperation != null && eventOperation.isReadOp()) { + iter.next(); + continue; + } + } + if (eventOperation != null) { + switch (eventOperation.getOp()) { + case GET: + break; + case PUT: + Map tempTable = eventOperation.getData(); + contentTable.putAll(tempTable); + log.info("update MetaStateMachine successfully {}", contentTable); + break; + case DELETE: + Map tempTable2 = eventOperation.getData(); + tempTable2.forEach((key, value) -> { + String remove = contentTable.remove(key); + if (Objects.isNull(remove)) { + log.warn("delete MetaStateMachine key: {} fail.", remove); + } else { + log.info("delete MetaStateMachine key: {} successfully.", remove); + } + + }); + + break; + default: + break; + } + + if (closure != null) { + if (e1 != null) { + closure.failure(e1.getMessage(), StringUtils.EMPTY); + } else { + if (eventOperation.getOp() == PUT) { + closure.success(Collections.EMPTY_MAP); + } else { + closure.success(Collections.unmodifiableMap(contentTable)); + } + + } + closure.run(Status.OK()); + } + } + iter.next(); + } + } + + @Override + public void onLeaderStart(final long term) { + this.leaderTerm.set(term); + super.onLeaderStart(term); + + } + + @Override + public void onLeaderStop(final Status status) { + this.leaderTerm.set(-1); + super.onLeaderStop(status); + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java new file mode 100644 index 0000000000..d9cf371630 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft; + +import org.apache.eventmesh.api.exception.MetaException; +import org.apache.eventmesh.api.meta.MetaService; +import org.apache.eventmesh.api.meta.MetaServiceListener; +import org.apache.eventmesh.api.meta.config.EventMeshMetaConfig; +import org.apache.eventmesh.api.meta.dto.EventMeshDataInfo; +import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo; +import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo; +import org.apache.eventmesh.common.config.ConfigService; +import org.apache.eventmesh.common.utils.ConfigurationContextUtil; +import org.apache.eventmesh.meta.raft.config.RaftMetaStorageConfiguration; +import org.apache.eventmesh.meta.raft.consts.MetaRaftConstants; +import org.apache.eventmesh.meta.raft.rpc.MetaServerHelper; +import org.apache.eventmesh.meta.raft.rpc.RequestResponse; + +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.alipay.sofa.jraft.CliService; +import com.alipay.sofa.jraft.RaftServiceFactory; +import com.alipay.sofa.jraft.RouteTable; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.core.CliServiceImpl; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.error.RemotingException; +import com.alipay.sofa.jraft.option.CliOptions; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl; +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class RaftMetaService implements MetaService { + + private static ObjectMapper objectMapper = new ObjectMapper(); + + private ConcurrentMap eventMeshRegisterInfoMap; + + private final AtomicBoolean initStatus = new AtomicBoolean(false); + + private final AtomicBoolean startStatus = new AtomicBoolean(false); + + RaftMetaStorageConfiguration configuration; + + private JraftServer jraftServer; + + private CliService cliService; + + private CliClientServiceImpl cliClientService; + + private PeerId leader; + + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + + + @Override + public void init() throws MetaException { + if (!initStatus.compareAndSet(false, true)) { + return; + } + eventMeshRegisterInfoMap = new ConcurrentHashMap<>(ConfigurationContextUtil.KEYS.size()); + ConfigService configService = ConfigService.getInstance(); + configuration = configService.buildConfigInstance(RaftMetaStorageConfiguration.class); + } + + @Override + public void start() throws MetaException { + final String dataPath = configuration.getDataPath(); + final String groupId = MetaRaftConstants.GROUP; + final String serverIdStr = configuration.getSelfIpAndPort(); + final String initConfStr = configuration.getMembersIpAndPort(); + final NodeOptions nodeOptions = new NodeOptions(); + nodeOptions.setElectionTimeoutMs(configuration.getElectionTimeoutMs() * 1000); + nodeOptions.setDisableCli(false); + nodeOptions.setSnapshotIntervalSecs(configuration.getSnapshotIntervalSecs()); + final PeerId serverId = new PeerId(); + if (!serverId.parse(serverIdStr)) { + throw new MetaException("Fail to parse serverId:" + serverIdStr); + } + final Configuration initConf = new Configuration(); + if (!initConf.parse(initConfStr)) { + throw new MetaException("Fail to parse initConf:" + initConfStr); + } + initConf.addPeer(serverId); + nodeOptions.setInitialConf(initConf); + try { + jraftServer = new JraftServer(dataPath, groupId, serverId, nodeOptions); + } catch (IOException e) { + throw new MetaException("fail to start jraft server", e); + } + log.info("Started jraft server at port: {}", jraftServer.getNode().getNodeId().getPeerId().getPort()); + + final Configuration conf = new Configuration(); + if (!conf.parse(serverIdStr)) { + throw new IllegalArgumentException("Fail to parse conf:" + serverIdStr); + } + RouteTable.getInstance().updateConfiguration(MetaRaftConstants.GROUP, conf); + cliService = RaftServiceFactory.createAndInitCliService(new CliOptions()); + cliClientService = (CliClientServiceImpl) ((CliServiceImpl) this.cliService).getCliClientService(); + while (true) { + try { + refreshleader(); + if (this.leader != null) { + break; + } + } catch (Exception e) { + log.warn("fail to get leader node"); + try { + Thread.sleep(3000L); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + } + scheduledExecutorService.scheduleAtFixedRate(() -> { + try { + RaftMetaService.this.refreshleader(); + } catch (Exception e) { + log.error("fail to Refresh Leader", e); + } + }, configuration.getRefreshLeaderInterval(), configuration.getRefreshLeaderInterval(), TimeUnit.SECONDS); + + startStatus.compareAndSet(false, true); + } + + private void refreshleader() throws InterruptedException, TimeoutException { + if (!RouteTable.getInstance().refreshLeader(cliClientService, MetaRaftConstants.GROUP, 3000).isOk()) { + throw new IllegalStateException("Refresh leader failed"); + } + this.leader = RouteTable.getInstance().selectLeader(MetaRaftConstants.GROUP); + log.info("raft Leader is {}", leader); + } + + @Override + public void shutdown() throws MetaException { + if (!startStatus.compareAndSet(true, false)) { + return; + } + scheduledExecutorService.shutdown(); + MetaServerHelper.shutDown(); + if (cliService != null) { + cliService.shutdown(); + } + if (cliClientService != null) { + cliClientService.shutdown(); + } + } + + @Override + public List findEventMeshInfoByCluster(String clusterName) throws MetaException { + List listEventMeshDataInfo = new ArrayList<>(); + RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.GET).build(); + boolean result = false; + try { + CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); + RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); + if (requestResponse != null) { + result = requestResponse.getSuccess(); + if (result) { + Map infoMap = requestResponse.getInfoMap(); + for (Entry entry : infoMap.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith("eventMeshInfo@@")) { + if (Objects.isNull(clusterName)) { + if (!key.endsWith("@@" + clusterName)) { + continue; + } + } + EventMeshDataInfo eventMeshDataInfo = objectMapper.readValue(value, EventMeshDataInfo.class); + listEventMeshDataInfo.add(eventMeshDataInfo); + } + } + } + } + } catch (Exception e) { + throw new MetaException("fail to get meta data ", e); + } + return listEventMeshDataInfo; + } + + @Override + public List findAllEventMeshInfo() throws MetaException { + return findEventMeshInfoByCluster(null); + } + + @Override + public void registerMetadata(Map metadataMap) { + for (Map.Entry eventMeshRegisterInfo : eventMeshRegisterInfoMap.entrySet()) { + EventMeshRegisterInfo registerInfo = eventMeshRegisterInfo.getValue(); + registerInfo.setMetadata(metadataMap); + this.register(registerInfo); + } + } + + @Override + public Map getMetaData(String key, boolean fuzzyEnabled) { + Map resultMap = new HashMap<>(); + RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.GET).build(); + boolean result = false; + try { + CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); + RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); + if (requestResponse != null) { + result = requestResponse.getSuccess(); + if (result) { + Map infoMap = requestResponse.getInfoMap(); + resultMap.putAll(infoMap); + } + } + } catch (Exception e) { + throw new MetaException("fail to get meta data ", e); + } + if (fuzzyEnabled) { + // todo + } else { + Map finalResult = new HashMap<>(); + finalResult.put(key, resultMap.get(key)); + return finalResult; + } + + return resultMap; + } + + @Override + public void getMetaDataWithListener(MetaServiceListener metaServiceListener, String key) { + //todo + } + + @Override + public void updateMetaData(Map metadataMap) { + String protocol = metadataMap.get(EventMeshMetaConfig.EVENT_MESH_PROTO); + String reftDataId = "Raft" + "@@" + protocol; + boolean result = false; + try { + RequestResponse req = + RequestResponse.newBuilder().setValue(MetaRaftConstants.PUT).putInfo(reftDataId, objectMapper.writeValueAsString(metadataMap)) + .build(); + CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); + RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); + if (requestResponse != null) { + result = requestResponse.getSuccess(); + } + } catch (Exception e) { + throw new MetaException("fail to serialize ", e); + } + if (!result) { + throw new MetaException("fail to updateMetaData "); + } + + } + + @Override + public void removeMetaData(String key) { + RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.DELETE).putInfo(key, StringUtils.EMPTY).build(); + + try { + CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); + RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); + if (requestResponse != null) { + boolean result = requestResponse.getSuccess(); + if (result) { + throw new MetaException("fail to remove MetaData"); + } + } + } catch (Exception e) { + throw new MetaException("fail to remove MetaData", e); + } + + } + + @Override + public boolean register(EventMeshRegisterInfo eventMeshRegisterInfo) throws MetaException { + //key= eventMeshInfo@@eventMeshName@@IP@@PORT@@protocolType@@CLUSTER_NAME + String eventMeshName = eventMeshRegisterInfo.getEventMeshName(); + String protocolType = eventMeshRegisterInfo.getProtocolType(); + String[] ipAndPort = eventMeshRegisterInfo.getEndPoint().split(":"); + String clusterName = eventMeshRegisterInfo.getEventMeshClusterName(); + String key = "eventMeshInfo" + "@@" + eventMeshName + "@@" + ipAndPort[0] + "@@" + ipAndPort[1] + "@@" + protocolType + "@@" + clusterName; + InfoInner infoInner = new InfoInner(eventMeshRegisterInfo); + String registerInfo = null; + boolean result = false; + try { + registerInfo = objectMapper.writeValueAsString(infoInner); + RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.PUT).putInfo(key, registerInfo).build(); + CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); + RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); + if (requestResponse != null) { + result = requestResponse.getSuccess(); + } + } catch (Exception e) { + throw new MetaException("fail to serialize ", e); + } + if (result) { + eventMeshRegisterInfoMap.put(eventMeshName, eventMeshRegisterInfo); + } + return result; + } + + @Override + public boolean unRegister(EventMeshUnRegisterInfo eventMeshUnRegisterInfo) throws MetaException { + //key= eventMeshInfo@@eventMeshName@@IP@@PORT@@protocolType@@CLUSTER_NAME + String eventMeshName = eventMeshUnRegisterInfo.getEventMeshName(); + String protocolType = eventMeshUnRegisterInfo.getProtocolType(); + String[] ipAndPort = eventMeshUnRegisterInfo.getEndPoint().split(":"); + String clusterName = eventMeshUnRegisterInfo.getEventMeshClusterName(); + String key = "eventMeshInfo" + "@@" + eventMeshName + "@@" + ipAndPort[0] + "@@" + ipAndPort[1] + "@@" + protocolType + "@@" + clusterName; + RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.DELETE).putInfo(key, StringUtils.EMPTY).build(); + boolean result = false; + try { + CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); + RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); + if (requestResponse != null) { + result = requestResponse.getSuccess(); + } + } catch (Exception e) { + throw new MetaException(e.getMessage(), e); + } + if (result) { + eventMeshRegisterInfoMap.remove(eventMeshName); + } + return result; + } + + @Data + class InfoInner implements Serializable { + + EventMeshRegisterInfo eventMeshRegisterInfo; + + public InfoInner(EventMeshRegisterInfo eventMeshRegisterInfo) { + this.eventMeshRegisterInfo = eventMeshRegisterInfo; + } + } + + public CompletableFuture commit(RequestResponse requestResponse, EventClosure eventClosure) + throws RemotingException, InterruptedException { + CompletableFuture future = new CompletableFuture<>(); + eventClosure.setFuture(future); + if (isLeader()) { + this.jraftServer.getMetaImpl().handle(requestResponse, eventClosure); + } else { + invokeToLeader(requestResponse, future); + } + return future; + } + + private void invokeToLeader(RequestResponse requestResponse, CompletableFuture future) + throws RemotingException, InterruptedException { + cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), requestResponse, (result, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + future.complete((RequestResponse) result); + }, 3000); + } + + + private boolean isLeader() { + return this.jraftServer.getFsm().isLeader(); + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/config/RaftMetaStorageConfiguration.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/config/RaftMetaStorageConfiguration.java new file mode 100644 index 0000000000..d5f4a80191 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/config/RaftMetaStorageConfiguration.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft.config; + +import org.apache.eventmesh.common.config.Config; +import org.apache.eventmesh.common.config.ConfigField; +import org.apache.eventmesh.meta.raft.consts.MetaRaftConstants; + +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@Config(prefix = "eventMesh.metaStorage.raft") +public class RaftMetaStorageConfiguration { + + @ConfigField(field = MetaRaftConstants.DATAPATH) + private String dataPath; + + @ConfigField(field = MetaRaftConstants.SELF) + private String selfIpAndPort; + + @ConfigField(field = MetaRaftConstants.MEMBERS) + private String membersIpAndPort; + + @ConfigField(field = MetaRaftConstants.ELECTIONTIMEOUT) + private Integer electionTimeoutMs; + + @ConfigField(field = MetaRaftConstants.SNAPSHOTINTERVAL) + private Integer snapshotIntervalSecs; + + @ConfigField(field = MetaRaftConstants.REFRESHLEADERINTERVAL) + private Integer refreshLeaderInterval; + +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java new file mode 100644 index 0000000000..6be76855e9 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft.consts; + +/** + * MetaRaftConstants. + */ + +public interface MetaRaftConstants { + + String GROUP = "EM_META"; + + String SELF = "self"; + + String MEMBERS = "members"; + + String DATAPATH = "dataPath"; + + String ELECTIONTIMEOUT = "electionTimeout"; + + String SNAPSHOTINTERVAL = "snapshotInterval"; + + String REFRESHLEADERINTERVAL = "refreshLeaderInterval"; + + int PUT = 1; + + int GET = 2; + + int DELETE = 3; + + int RESPONSE = 4; +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/MetaServerHelper.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/MetaServerHelper.java new file mode 100644 index 0000000000..abaeb58d8d --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/MetaServerHelper.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft.rpc; + +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import com.alipay.sofa.jraft.rpc.RpcServer; +import com.alipay.sofa.jraft.util.RpcFactoryHelper; +import com.google.protobuf.Message; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MetaServerHelper { + + public static RpcServer rpcServer; + + + public static void setRpcServer(RpcServer rpcServer) { + MetaServerHelper.rpcServer = rpcServer; + } + + + public static void initGRpc() { + if ("com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory".equals( + RpcFactoryHelper.rpcFactory().getClass().getName())) { + RpcFactoryHelper.rpcFactory() + .registerProtobufSerializer(RequestResponse.class.getName(), + RequestResponse.getDefaultInstance()); + try { + Class clazz = Class.forName("com.alipay.sofa.jraft.rpc.impl.MarshallerHelper"); + Method registerRespInstance = clazz.getMethod("registerRespInstance", String.class, Message.class); + registerRespInstance.invoke(null, RequestResponse.class.getName(), + RequestResponse.getDefaultInstance()); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + } + + public static void shutDown() { + if (rpcServer == null) { + return; + } + rpcServer.shutdown(); + } + + public static void blockUntilShutdown() { + if (rpcServer == null) { + return; + } + if ("com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory".equals( + RpcFactoryHelper.rpcFactory().getClass().getName())) { + try { + Method getServer = rpcServer.getClass().getMethod("getServer"); + Object grpcServer = getServer.invoke(rpcServer); + + Method shutdown = grpcServer.getClass().getMethod("shutdown"); + Method awaitTerminationLimit = grpcServer.getClass() + .getMethod("awaitTermination", long.class, TimeUnit.class); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + shutdown.invoke(grpcServer); + awaitTerminationLimit.invoke(grpcServer, 30, TimeUnit.SECONDS); + } catch (Exception e) { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + log.error(e.getMessage(), e); + } + } + }); + Method awaitTermination = grpcServer.getClass().getMethod("awaitTermination"); + awaitTermination.invoke(grpcServer); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/RequestProcessor.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/RequestProcessor.java new file mode 100644 index 0000000000..2fd77ec5b2 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/RequestProcessor.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft.rpc; + +import org.apache.eventmesh.meta.raft.EventClosure; +import org.apache.eventmesh.meta.raft.JraftMetaService; + +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.rpc.RpcContext; +import com.alipay.sofa.jraft.rpc.RpcProcessor; + + +public class RequestProcessor implements RpcProcessor { + + private final JraftMetaService metaService; + + public RequestProcessor(JraftMetaService metaService) { + super(); + this.metaService = metaService; + } + + @Override + public void handleRequest(final RpcContext rpcCtx, final RequestResponse request) { + final EventClosure closure = new EventClosure() { + @Override + public void run(Status status) { + rpcCtx.sendResponse(getRequestResponse()); + } + }; + this.metaService.handle(request, closure); + } + + @Override + public String interest() { + return RequestResponse.class.getName(); + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/snapshot/MetaSnapshotFile.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/snapshot/MetaSnapshotFile.java new file mode 100644 index 0000000000..b185829a81 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/snapshot/MetaSnapshotFile.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft.snapshot; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MetaSnapshotFile { + + private String path; + + public MetaSnapshotFile(String path) { + super(); + this.path = path; + } + + public String getPath() { + return this.path; + } + + /** + * Save value to snapshot file. + */ + public boolean save(final String str) { + try { + FileUtils.writeStringToFile(new File(path), str, Charset.forName("UTF-8")); + return true; + } catch (IOException e) { + log.error("Fail to save snapshot", e); + return false; + } + } + + public String load() throws IOException { + final String s = FileUtils.readFileToString(new File(path), Charset.forName("UTF-8")); + if (!StringUtils.isBlank(s)) { + return s; + } + throw new IOException("Fail to load snapshot from " + path + ",content: " + s); + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/proto/request.proto b/eventmesh-meta/eventmesh-meta-raft/src/main/proto/request.proto new file mode 100644 index 0000000000..6865495858 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/proto/request.proto @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +option java_multiple_files = true; + +option java_package = "org.apache.eventmesh.meta.raft.rpc"; + +message RequestResponse { + int64 value = 1; + bool success = 2; + string redirect = 3; + string errorMsg = 4; + map info = 5; +} + + diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.meta.MetaService b/eventmesh-meta/eventmesh-meta-raft/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.meta.MetaService new file mode 100644 index 0000000000..4771d7fc1c --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.meta.MetaService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +raft=org.apache.eventmesh.meta.raft.RaftMetaService \ No newline at end of file diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 91a422fa4c..4984181ca8 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -85,6 +85,20 @@ eventMesh.metaStorage.plugin.server-addr=127.0.0.1:8848 eventMesh.metaStorage.plugin.username=nacos eventMesh.metaStorage.plugin.password=nacos +# metaStorage plugin: raft +# Path to local snapshot storage for raft data +#eventMesh.metaStorage.raft.dataPath=/tmp/eventmesh-meta-raft +# Raft local ip and port +#eventMesh.metaStorage.raft.self=127.0.0.1:9091 +# Members ip and port +#eventMesh.metaStorage.raft.members=192.168.1.2:9091,192.168.1.3:9091 +# Raft leader election timeout second +#eventMesh.metaStorage.raft.electionTimeout=5 +# Raft snapshot interval second +#eventMesh.metaStorage.raft.snapshotInterval=30 +# Raft refresh leader interval second +#eventMesh.metaStorage.raft.refreshLeaderInterval=3 + # metaStorage plugin: nacos #eventMesh.metaStorage.nacos.endpoint= #eventMesh.metaStorage.nacos.accessKey= diff --git a/eventmesh-runtime/conf/log4j2.xml b/eventmesh-runtime/conf/log4j2.xml index 3ce5ac985e..6341a0e629 100644 --- a/eventmesh-runtime/conf/log4j2.xml +++ b/eventmesh-runtime/conf/log4j2.xml @@ -95,5 +95,14 @@ + + + + + + + + + \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 6a8e27bf98..7e223d9a3c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -98,6 +98,7 @@ include 'eventmesh-meta:eventmesh-meta-nacos' include 'eventmesh-meta:eventmesh-meta-etcd' include 'eventmesh-meta:eventmesh-meta-consul' include 'eventmesh-meta:eventmesh-meta-zookeeper' +include 'eventmesh-meta:eventmesh-meta-raft' include 'eventmesh-protocol-plugin' include 'eventmesh-protocol-plugin:eventmesh-protocol-api' @@ -127,3 +128,6 @@ include 'eventmesh-webhook:eventmesh-webhook-receive' include 'eventmesh-retry' include 'eventmesh-retry:eventmesh-retry-api' include 'eventmesh-retry:eventmesh-retry-rocketmq' + + +