Skip to content

Commit 8d8930a

Browse files
authored
Merge pull request #692 from Prathamesh9284/fix/csv-header-validation
fix: improve CSV header validation and error messages
2 parents cc43e69 + 455a05a commit 8d8930a

1 file changed

Lines changed: 44 additions & 2 deletions

File tree

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,10 @@ private Record parseLine(final String s) {
126126
final String[] tokens = CsvRowConverter.parseLine(s, this.separator);
127127
if (tokens.length != fieldTypes.size())
128128
throw new IllegalStateException(
129-
String.format("Error while parsing CSV file %s at line %s, using separator %s", sourcePath, s, separator));
129+
String.format(
130+
"CSV file '%s': data row has %d columns but expected %d "
131+
+ "(separator '%s'). Line: '%s'.",
132+
sourcePath, tokens.length, fieldTypes.size(), separator, s));
130133
// now tokens.length == fieldtypes.size
131134

132135
final Object[] objects = new Object[tokens.length];
@@ -168,12 +171,51 @@ private static Stream<String> streamLines(final String path) {
168171
() -> new IllegalStateException(String.format("No file system found for %s", path)));
169172
try {
170173
final Iterator<String> lineIterator = createLineIterator(fileSystem, path);
171-
lineIterator.next(); // skip header row
174+
if (!lineIterator.hasNext()) {
175+
throw new IllegalStateException(String.format("CSV file '%s' is empty. Expected a header row (e.g., 'id:int,name:string').",path));
176+
}
177+
String headerLine = lineIterator.next(); // read and skip header line
178+
validateHeaderLine(path, headerLine);
172179
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(lineIterator, 0), false);
173180
} catch (final IOException e) {
174181
throw new WayangException(String.format("%s failed to read %s.", FileUtils.class, path), e);
175182
}
183+
}
176184

185+
/**
186+
* Validates the CSV header for Calcite compatibility.
187+
* Checks that each column follows the 'name:type' format
188+
* (e.g., 'id:int,name:string,email:string') and that commas
189+
* are used as the header separator.
190+
*
191+
* @param path the filesystem path to the CSV file
192+
* @param headerLine the first line of the CSV file
193+
*/
194+
private static void validateHeaderLine(final String path, final String headerLine) {
195+
final String[] headerColumns = headerLine.split(","); // split header row into columns
196+
197+
int colonCount = 0;
198+
for (int i = 0; i < headerLine.length(); i++) {
199+
if (headerLine.charAt(i) == ':') {
200+
colonCount++;
201+
}
202+
}
203+
204+
for (final String column : headerColumns) {
205+
if (!column.trim().contains(":")) {
206+
throw new IllegalStateException(String.format(
207+
"CSV file '%s': header column '%s' missing required type. "
208+
+ "Expected 'name:type' format (e.g., 'id:int'). Header: '%s'.",
209+
path, column.trim(), headerLine));
210+
}
211+
}
212+
213+
if (headerColumns.length != colonCount) {
214+
throw new IllegalStateException(String.format(
215+
"CSV file '%s': column count mismatch. Expected %d comma-separated 'name:type' columns "
216+
+ "but found %d. Header: '%s'.",
217+
path, colonCount, headerColumns.length, headerLine));
218+
}
177219
}
178220

179221
/**

0 commit comments

Comments
 (0)