Skip to content

Commit e6d877b

Browse files
[protogen] Fix infinite recursion in RecordsFieldFinder (#1282)
1 parent 03313a9 commit e6d877b

File tree

5 files changed

+221
-4
lines changed

5 files changed

+221
-4
lines changed

fluss-protogen/fluss-protogen-generator/src/main/java/org/apache/fluss/protogen/generator/generator/ProtobufMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ private void generateIsLazilyParsed(PrintWriter w) {
172172
w.format(" public boolean isLazilyParsed() {\n");
173173
w.format(
174174
" return %s;\n",
175-
RecordsFieldFinder.hasRecordsField(message) ? "true" : "false");
175+
new RecordsFieldFinder().hasRecordsField(message) ? "true" : "false");
176176
w.format(" }\n");
177177
}
178178

fluss-protogen/fluss-protogen-generator/src/main/java/org/apache/fluss/protogen/generator/generator/RecordsFieldFinder.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,26 @@
2121
import io.protostuff.parser.Message;
2222
import io.protostuff.parser.MessageField;
2323

24+
import java.util.HashSet;
25+
import java.util.Set;
26+
2427
/** Finder to find <code>byte[] records</code> protobuf fields. */
2528
public class RecordsFieldFinder {
2629

2730
public static final String RECORDS_FIELD_NAME = "records";
2831

29-
public static boolean hasRecordsField(Message message) {
30-
return message.getFields().stream().anyMatch(RecordsFieldFinder::hasRecordsField);
32+
private Set<Message> visited = new HashSet<>();
33+
34+
public boolean hasRecordsField(Message message) {
35+
visited.add(message);
36+
return message.getFields().stream().anyMatch(this::hasRecordsField);
3137
}
3238

33-
private static boolean hasRecordsField(Field<?> field) {
39+
private boolean hasRecordsField(Field<?> field) {
3440
if (field instanceof MessageField) {
41+
if (visited.contains(((MessageField) field).getMessage())) {
42+
return false;
43+
}
3544
return hasRecordsField(((MessageField) field).getMessage());
3645
} else if (field instanceof Field.Bytes) {
3746
return !field.isRepeated() && field.getName().equals(RECORDS_FIELD_NAME);

fluss-protogen/fluss-protogen-tests/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@
5959
<artifactId>fluss-test-utils</artifactId>
6060
</dependency>
6161

62+
<dependency>
63+
<groupId>org.apache.fluss</groupId>
64+
<artifactId>fluss-protogen-generator</artifactId>
65+
<version>${project.version}</version>
66+
<scope>test</scope>
67+
</dependency>
68+
6269
<!-- Tests depends on ByteBufChannel -->
6370
<dependency>
6471
<groupId>org.apache.fluss</groupId>
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
syntax = "proto2";
18+
package org.apache.fluss.protogen.tests;
19+
20+
// Message with circular reference - A references B, B references A
21+
message CircularA {
22+
optional string name = 1;
23+
optional CircularB b_ref = 2;
24+
}
25+
26+
message CircularB {
27+
optional int32 id = 1;
28+
optional CircularA a_ref = 2;
29+
}
30+
31+
// Message with records field but no circular reference
32+
message SimpleRecordsMessage {
33+
optional bytes records = 1;
34+
optional string metadata = 2;
35+
}
36+
37+
// Message with circular reference that contains records field
38+
message CircularRecordsA {
39+
optional bytes records = 1;
40+
optional CircularRecordsB nested = 2;
41+
}
42+
43+
message CircularRecordsB {
44+
optional string info = 1;
45+
optional CircularRecordsA parent = 2;
46+
}
47+
48+
// Complex circular reference with multiple levels
49+
message ComplexA {
50+
optional string value = 1;
51+
optional ComplexB b = 2;
52+
optional ComplexC c = 3;
53+
}
54+
55+
message ComplexB {
56+
optional int32 number = 1;
57+
optional ComplexA a = 2;
58+
optional bytes records = 3;
59+
}
60+
61+
message ComplexC {
62+
optional bool flag = 1;
63+
optional ComplexA a = 2;
64+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.protogen.tests;
19+
20+
import org.apache.fluss.protogen.generator.generator.RecordsFieldFinder;
21+
22+
import io.protostuff.parser.Message;
23+
import io.protostuff.parser.Proto;
24+
import io.protostuff.parser.ProtoUtil;
25+
import org.junit.jupiter.api.Test;
26+
27+
import java.io.File;
28+
import java.io.IOException;
29+
30+
import static org.assertj.core.api.Assertions.assertThat;
31+
32+
/** Tests for {@link RecordsFieldFinder}. */
33+
public class RecordsFieldFinderTest {
34+
35+
@Test
36+
public void testHasRecordsFieldForSimpleMessage() throws Exception {
37+
Proto proto = parseProtoFile("bytes.proto");
38+
Message rdMessage = findMessage(proto, "RD");
39+
40+
RecordsFieldFinder finder = new RecordsFieldFinder();
41+
assertThat(finder.hasRecordsField(rdMessage))
42+
.as("RD message should have records field")
43+
.isTrue();
44+
}
45+
46+
@Test
47+
public void testHasRecordsFieldForNestedMessage() throws Exception {
48+
Proto proto = parseProtoFile("bytes.proto");
49+
Message ordMessage = findMessage(proto, "ORD");
50+
51+
RecordsFieldFinder finder = new RecordsFieldFinder();
52+
assertThat(finder.hasRecordsField(ordMessage))
53+
.as("ORD message should have records field through nested RD")
54+
.isTrue();
55+
}
56+
57+
@Test
58+
public void testHasRecordsFieldForMessageWithoutRecords() throws Exception {
59+
Proto proto = parseProtoFile("messages.proto");
60+
Message xMessage = findMessage(proto, "X");
61+
62+
RecordsFieldFinder finder = new RecordsFieldFinder();
63+
assertThat(finder.hasRecordsField(xMessage))
64+
.as("X message should not have records field")
65+
.isFalse();
66+
}
67+
68+
@Test
69+
public void testCircularReferenceWithoutRecords() throws Exception {
70+
Proto proto = parseProtoFile("circular_reference.proto");
71+
Message circularAMessage = findMessage(proto, "CircularA");
72+
73+
RecordsFieldFinder finder = new RecordsFieldFinder();
74+
assertThat(finder.hasRecordsField(circularAMessage))
75+
.as("CircularA message should not have records field")
76+
.isFalse();
77+
}
78+
79+
@Test
80+
public void testCircularReferenceWithRecords() throws Exception {
81+
Proto proto = parseProtoFile("circular_reference.proto");
82+
Message circularRecordsAMessage = findMessage(proto, "CircularRecordsA");
83+
84+
RecordsFieldFinder finder = new RecordsFieldFinder();
85+
assertThat(finder.hasRecordsField(circularRecordsAMessage))
86+
.as("CircularRecordsA message should have records field")
87+
.isTrue();
88+
}
89+
90+
@Test
91+
public void testSimpleRecordsMessage() throws Exception {
92+
Proto proto = parseProtoFile("circular_reference.proto");
93+
Message simpleRecordsMessage = findMessage(proto, "SimpleRecordsMessage");
94+
95+
RecordsFieldFinder finder = new RecordsFieldFinder();
96+
assertThat(finder.hasRecordsField(simpleRecordsMessage))
97+
.as("SimpleRecordsMessage should have records field")
98+
.isTrue();
99+
}
100+
101+
@Test
102+
public void testNewFinderInstanceForEachCall() throws Exception {
103+
Proto proto = parseProtoFile("circular_reference.proto");
104+
Message circularRecordsAMessage = findMessage(proto, "CircularRecordsA");
105+
106+
// Test with first finder instance
107+
RecordsFieldFinder finder1 = new RecordsFieldFinder();
108+
assertThat(finder1.hasRecordsField(circularRecordsAMessage))
109+
.as("CircularRecordsA should have records field (finder1)")
110+
.isTrue();
111+
112+
// Test with second finder instance (should work independently)
113+
RecordsFieldFinder finder2 = new RecordsFieldFinder();
114+
assertThat(finder2.hasRecordsField(circularRecordsAMessage))
115+
.as("CircularRecordsA should have records field (finder2)")
116+
.isTrue();
117+
}
118+
119+
private Proto parseProtoFile(String filename) throws Exception {
120+
File protoFile = new File("src/main/proto/" + filename);
121+
if (!protoFile.exists()) {
122+
throw new IOException("Cannot find proto file: " + filename);
123+
}
124+
125+
Proto proto = new Proto();
126+
ProtoUtil.loadFrom(protoFile, proto);
127+
return proto;
128+
}
129+
130+
private Message findMessage(Proto proto, String messageName) {
131+
return proto.getMessages().stream()
132+
.filter(message -> message.getName().equals(messageName))
133+
.findFirst()
134+
.orElseThrow(
135+
() -> new IllegalArgumentException("Message not found: " + messageName));
136+
}
137+
}

0 commit comments

Comments
 (0)