Open
Description
Search before asking
- I had searched in the issues and found no similar issues.
Question
Sending cloudevents format messages with grpc is slow (I'm not sure if the producers are all slow), and sending 101 pieces of data takes over 60 seconds.
code like this
String gPort = ConfigUtil.getProperty("em.grpc.port");
String host = ConfigUtil.getProperty("em.host");
String pwd = ConfigUtil.getProperty("em.pwd");
String user = ConfigUtil.getProperty("em.user");
EventMeshGrpcClientConfig config = EventMeshGrpcClientConfig.builder()
.serverAddr("172.16.15.136")
.serverPort(51112)
.consumerGroup(PRODUCER_GROUP)
.password(pwd)
.userName(user)
.env(ENV)
.idc(IDC)
.sys(SYSID)
.build();
EventMeshGrpcProducer producer = new EventMeshGrpcProducer(config);
....
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject("tc_event_device") //topic
.withSource(URI.create("/"))
.withDataContentType(CLOUDEVENT_CONTENT_TYPE)
.withType(CLOUD_EVENTS_PROTOCOL_NAME)
.withData(eventJson.toJSONString().getBytes(StandardCharsets.UTF_8))
.withExtension("ttl", String.valueOf(4 * 1000))
.build();
list.add(event);
// send data
if(list.size() > 100){
sendMsg2EventMesh(producer);
}
......
private void sendMsg2EventMesh(EventMeshGrpcProducer producer){
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
logger.info("发送事件个数:" + list.size());
long start = System.currentTimeMillis();
System.out.println("处理完数据了:" + start + " 用时:" + (start - begin));
CloudEvent cloudEvent = list.get(list.size() - 1);
String cs = JSON.parseObject(new String(cloudEvent.getData().toBytes())).getString("customCode");
producer.publish(list);
System.out.println("发送成功!" + cs + " 用时:" + (System.currentTimeMillis() - start) + " 发送条数:" + list.size());
list.clear();
begin = System.currentTimeMillis();
}
The server is configured as follows:
eventmesh.properties
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
###############################EVNETMESH-runtime ENV#################################
eventMesh.server.idc=idc
eventMesh.server.env=env
eventMesh.server.provide.protocols=HTTP,TCP,GRPC
eventMesh.server.cluster=COMMON
eventMesh.server.name=EVENTMESH-runtime
eventMesh.sysid=1234
eventMesh.server.http.port=51113
eventMesh.server.grpc.port=51112
########################## eventMesh tcp configuration ############################
eventMesh.server.tcp.enabled=true
eventMesh.server.tcp.port=51111
eventMesh.server.tcp.readerIdleSeconds=120
eventMesh.server.tcp.writerIdleSeconds=120
eventMesh.server.tcp.allIdleSeconds=120
eventMesh.server.tcp.clientMaxNum=10000
# client isolation time if the message send failure
eventMesh.server.tcp.pushFailIsolateTimeInMills=30000
# rebalance internal
eventMesh.server.tcp.RebalanceIntervalInMills=30000
# session expire time about client
eventMesh.server.session.expiredInMills=60000
# flow control, include the global level and session level
eventMesh.server.tcp.msgReqnumPerSecond=15000
eventMesh.server.http.msgReqnumPerSecond=15000
eventMesh.server.session.upstreamBufferSize=20
# for single event publish, maximum size allowed per event
eventMesh.server.maxEventSize=1048576
# for batch event publish, maximum number of events allowed in one batch
eventMesh.server.maxEventBatchSize=1000
# thread number about global scheduler
eventMesh.server.global.scheduler=5
eventMesh.server.tcp.taskHandleExecutorPoolSize=8
#retry
eventMesh.server.retry.async.pushRetryTimes=3
eventMesh.server.retry.sync.pushRetryTimes=3
eventMesh.server.retry.async.pushRetryDelayInMills=500
eventMesh.server.retry.sync.pushRetryDelayInMills=500
eventMesh.server.retry.pushRetryQueueSize=10000
#admin
eventMesh.server.admin.http.port=10106
#registry
eventMesh.server.registry.registerIntervalInMills=10000
eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000
#auto-ack
#eventMesh.server.defibus.client.comsumeTimeoutInMin=5
#sleep interval between closing client of different group in server graceful shutdown
eventMesh.server.gracefulShutdown.sleepIntervalInMills=1000
eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200
#ip address blacklist
eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8
#connector plugin
eventMesh.connector.plugin.type=rocketmq
#storage plugin
eventMesh.storage.plugin.type=rocketmq
#security plugin
#eventMesh.server.security.enabled=false
#eventMesh.security.plugin.type=security
#eventMesh.security.validation.type.token=false
#eventMesh.security.publickey=
#registry plugin
eventMesh.registry.plugin.enabled=false
eventMesh.registry.plugin.type=nacos
eventMesh.registry.plugin.server-addr=127.0.0.1:8848
eventMesh.registry.plugin.username=nacos
eventMesh.registry.plugin.password=nacos
# The TLS configuration of registry plugin consul
# keyStoreInstanceType's value can refer to com.ecwid.consul.transport.TLSConfig.KeyStoreInstanceType
#eventMesh.registry.consul.tls.keyStoreInstanceType=
#eventMesh.registry.consul.tls.certificatePath=
#eventMesh.registry.consul.tls.certificatePassword=
#eventMesh.registry.consul.tls.keyStorePath=
#eventMesh.registry.consul.tls.keyStorePassword=
# metrics plugin, if you have multiple plugin, you can use ',' to split
#eventMesh.metrics.plugin=prometheus
# trace plugin
eventMesh.server.trace.enabled=false
eventMesh.trace.plugin=zipkin
# webhook
# Start webhook admin service
eventMesh.webHook.admin.start=true
# Webhook event configuration storage mode. Currently, only file and Nacos are supported
eventMesh.webHook.operationMode=file
# The file storage path of the file storage mode. If #{eventmeshhome} is written, it is in the eventmesh root directory
eventMesh.webHook.fileMode.filePath= #{eventMeshHome}/webhook
# Nacos storage mode, and the configuration naming rule is eventmesh webHook. nacosMode. {nacos native configuration key} please see the specific configuration [nacos github api](https://github.com/alibaba/nacos/blob/develop/api/src/main/java/com/alibaba/nacos/api/SystemPropertyKeyConst.java)
## Address of Nacos
eventMesh.webHook.nacosMode.serverAddr=0.0.0.0:8848
# Webhook eventcloud sending mode. And eventmesh connector. plugin. The type configuration is the same
eventMesh.webHook.producer.storage=standalone
eventMesh.server.flushDiskType=ASYNC_FLUSH
server.env
APP_START_JVM_OPTION:::-server -Xms1g -Xmx16g -Xmn4g -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:ConcGCThreads=8 -XX:ParallelGCThreads=8 -XX:MaxDirectMemorySize=16G -Dio.netty.eventLoopThreads=32 -Dio.netty.maxDirectMemory=16G -XX:SurvivorRatio=4 -Duser.language=zh
Where I do wrong?
I don't think producers should so slowly!