Skip to content

Commit 07f8774

Browse files
authored
[ISSUE apache#4690] Fix Kafka Connector failed to start (apache#4691)
* Fix StringDeserializer & (session.timeout.ms)/3 > heartbeat.interval.ms https://stackoverflow.com/questions/38406298/why-cant-i-increase-session-timeout-ms * customize timeout config * add new versions and remove redundant line-breaks * Increase master version * Fix an operator log typo * Revert "Increase master version" This reverts commit b112290. * Revert 'add new versions for issue template'
1 parent 40770dd commit 07f8774

File tree

7 files changed

+11
-14
lines changed

7 files changed

+11
-14
lines changed

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,10 @@ Fixes #<XXXX>.
3030
*Explain the content here.*
3131
*Explain why you want to make the changes and what problem you're trying to solve.*
3232

33-
34-
3533
### Modifications
3634

3735
*Describe the modifications you've done.*
3836

39-
40-
4137
### Documentation
4238

4339
- Does this pull request introduce a new feature? (yes / no)

eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/config/SourceConnectorConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ public class SourceConnectorConfig {
2626
private String topic = "TopicTest";
2727
private String bootstrapServers = "127.0.0.1:9092";
2828
private String groupID = "kafkaSource";
29-
private String keyConverter = "org.apache.kafka.common.serialization.StringSerializer";
30-
private String valueConverter = "org.apache.kafka.common.serialization.StringSerializer";
29+
private String keyConverter = "org.apache.kafka.common.serialization.StringDeserializer";
30+
private String valueConverter = "org.apache.kafka.common.serialization.StringDeserializer";
3131
private String autoCommitIntervalMS = "1000";
3232
private String enableAutoCommit = "false";
33-
private String sessionTimeoutMS = "3000";
33+
private String sessionTimeoutMS = "10000";
3434
private String maxPollRecords = "1000";
3535
private int pollTimeOut = 100;
3636
}

eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private void doInit() {
7676
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, sourceConfig.getConnectorConfig().getAutoCommitIntervalMS());
7777
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sourceConfig.getConnectorConfig().getSessionTimeoutMS());
7878
this.pollTimeOut = sourceConfig.getConnectorConfig().getPollTimeOut();
79-
this.kafkaConsumer = new KafkaConsumer<String, String>(props);
79+
this.kafkaConsumer = new KafkaConsumer<>(props);
8080
}
8181

8282
@Override

eventmesh-connectors/eventmesh-connector-kafka/src/main/resources/sink-config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pubSubConfig:
2626
passWord: kafkaPassWord
2727
connectorConfig:
2828
connectorName: kafkaSink
29-
bootstrapServers:
29+
bootstrapServers: 127.0.0.1:9092
3030
topic: TopicTest
3131
keyConverter: org.apache.kafka.common.serialization.StringSerializer
3232
valueConverter: org.apache.kafka.common.serialization.StringSerializer

eventmesh-connectors/eventmesh-connector-kafka/src/main/resources/source-config.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ pubSubConfig:
2626
passWord: kafkaPassWord
2727
connectorConfig:
2828
connectorName: kafkaSource
29-
bootstrapServers: 127.0.0.1:9090
29+
bootstrapServers: 127.0.0.1:9092
3030
topic: TopicTest
3131
groupID: kafkaSource
32+
sessionTimeoutMS: 10000
3233
maxPollRecords: 1000

eventmesh-operator/controllers/eventmesh_connectors/connectors_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (r ConnectorsReconciler) Reconcile(ctx context.Context, req reconcile.Reque
157157
return reconcile.Result{}, err
158158
}
159159
podNames := getConnectorPodNames(podList.Items)
160-
r.Logger.Info(fmt.Sprintf("Stutas.Nodes = %s", connector.Status.Nodes))
160+
r.Logger.Info(fmt.Sprintf("Status.Nodes = %s", connector.Status.Nodes))
161161
r.Logger.Info(fmt.Sprintf("podNames = %s", podNames))
162162
// Ensure every pod is in running phase
163163
for _, pod := range podList.Items {
@@ -168,7 +168,7 @@ func (r ConnectorsReconciler) Reconcile(ctx context.Context, req reconcile.Reque
168168

169169
if podNames != nil {
170170
connector.Status.Nodes = podNames
171-
r.Logger.Info(fmt.Sprintf("eventMeshRuntime.Stutas.Nodes = %s", connector.Status.Nodes))
171+
r.Logger.Info(fmt.Sprintf("Connector.Status.Nodes = %s", connector.Status.Nodes))
172172
// Update status.Size if needed
173173
if connector.Spec.Size != connector.Status.Size {
174174
r.Logger.Info("Connector.Status.Size = " + strconv.Itoa(connector.Status.Size))

eventmesh-operator/controllers/eventmesh_runtime/runtime_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func (r *RuntimeReconciler) Reconcile(ctx context.Context, req reconcile.Request
197197
}
198198

199199
podNames := getRuntimePodNames(podList.Items)
200-
r.Logger.Info(fmt.Sprintf("Stutas.Nodes = %s", eventMeshRuntime.Status.Nodes))
200+
r.Logger.Info(fmt.Sprintf("Status.Nodes = %s", eventMeshRuntime.Status.Nodes))
201201
r.Logger.Info(fmt.Sprintf("podNames = %s", podNames))
202202
// Ensure every pod is in running phase
203203
for _, pod := range podList.Items {
@@ -208,7 +208,7 @@ func (r *RuntimeReconciler) Reconcile(ctx context.Context, req reconcile.Request
208208

209209
if podNames != nil {
210210
eventMeshRuntime.Status.Nodes = podNames
211-
r.Logger.Info(fmt.Sprintf("eventMeshRuntime.Stutas.Nodes = %s", eventMeshRuntime.Status.Nodes))
211+
r.Logger.Info(fmt.Sprintf("eventMeshRuntime.Status.Nodes = %s", eventMeshRuntime.Status.Nodes))
212212
// Update status.Size if needed
213213
if eventMeshRuntime.Spec.Size != eventMeshRuntime.Status.Size {
214214
r.Logger.Info("eventMeshRuntime.Status.Size = " + strconv.Itoa(eventMeshRuntime.Status.Size))

0 commit comments

Comments
 (0)