Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.excel.InputFileType;
import org.apache.nifi.excel.ProtectionType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
Expand All @@ -36,6 +37,10 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.poi.hssf.record.crypto.Biff8EncryptionKey;
import org.apache.poi.hssf.usermodel.HSSFRow;
import org.apache.poi.hssf.usermodel.HSSFSheet;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.CellCopyContext;
import org.apache.poi.ss.usermodel.CellCopyPolicy;
Expand All @@ -48,9 +53,12 @@
import org.apache.poi.xssf.streaming.SXSSFSheet;
import org.apache.poi.xssf.streaming.SXSSFWorkbook;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -66,10 +74,10 @@
@Tags({"split", "text"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("This processor splits a multi sheet Microsoft Excel spreadsheet into multiple Microsoft Excel spreadsheets where each sheet from the original" +
" file is converted to an individual spreadsheet in its own flow file. Currently this processor is only capable of processing .xlsx" +
" (XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents." +
" Please note all original cell styles are dropped and formulas are removed leaving only the calculated values." +
" Even a single sheet Microsoft Excel spreadsheet is converted to its own flow file with all the original cell styles dropped and formulas removed."
" file is converted to an individual spreadsheet in its own flow file. This processor is capable of processing both password and non password protected" +
" modern XLSX and older XLS Excel spreadsheets." +
" Please note all original cell styles are copied and formulas are removed leaving only the calculated values." +
" Even a single sheet Microsoft Excel spreadsheet is converted to its own flow file with all the original cell styles copied and formulas removed."
)
@WritesAttributes({
@WritesAttribute(attribute = "fragment.identifier", description = "All split Excel FlowFiles produced from the same parent Excel FlowFile will have the same randomly generated UUID added" +
Expand Down Expand Up @@ -100,6 +108,14 @@ public class SplitExcel extends AbstractProcessor {
.dependsOn(PROTECTION_TYPE, ProtectionType.PASSWORD)
.build();

public static final PropertyDescriptor INPUT_FILE_TYPE = new PropertyDescriptor.Builder()
.name("Input File Type")
.description("Specifies type of Excel input file.")
.required(true)
.allowableValues(InputFileType.class)
.defaultValue(InputFileType.XLSX)
.build();

public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship")
Expand All @@ -117,7 +133,8 @@ public class SplitExcel extends AbstractProcessor {

private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
PROTECTION_TYPE,
PASSWORD
PASSWORD,
INPUT_FILE_TYPE
);

private static final Set<Relationship> RELATIONSHIPS = Set.of(
Expand All @@ -126,17 +143,6 @@ public class SplitExcel extends AbstractProcessor {
REL_SPLIT
);

private static final CellCopyPolicy CELL_COPY_POLICY = new CellCopyPolicy.Builder()
.cellFormula(false) // NOTE: setting to false allows for copying the evaluated formula value.
.cellStyle(CellCopyPolicy.DEFAULT_COPY_CELL_STYLE_POLICY)
.cellValue(CellCopyPolicy.DEFAULT_COPY_CELL_VALUE_POLICY)
.condenseRows(CellCopyPolicy.DEFAULT_CONDENSE_ROWS_POLICY)
.copyHyperlink(false) // NOTE: the hyperlinks appear at end of sheet, so we need to iterate them separately at the end.
.mergeHyperlink(CellCopyPolicy.DEFAULT_MERGE_HYPERLINK_POLICY)
.mergedRegions(false) // NOTE: set to false because of the explicit merge region handling in the copyRows method.
.rowHeight(CellCopyPolicy.DEFAULT_COPY_ROW_HEIGHT_POLICY)
.build();

@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
Expand All @@ -156,35 +162,17 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro

final ProtectionType protectionType = context.getProperty(PROTECTION_TYPE).asAllowableValue(ProtectionType.class);
final String password = protectionType == ProtectionType.PASSWORD ? context.getProperty(PASSWORD).getValue() : null;
final InputFileType inputFileType = context.getProperty(INPUT_FILE_TYPE).asAllowableValue(InputFileType.class);
final CellCopyPolicy cellCopyPolicy = createCellCopyPolicy(inputFileType);
final List<WorkbookSplit> workbookSplits = new ArrayList<>();

try {
session.read(originalFlowFile, in -> {

final Workbook originalWorkbook = StreamingReader.builder()
.rowCacheSize(100)
.bufferSize(4096)
.password(password)
.setReadHyperlinks(true) // NOTE: Needed for copying rows.
.setReadSharedFormulas(true) // NOTE: If not set to true, then data with shared formulas fail.
.open(in);

int index = 0;
for (final Sheet originalSheet : originalWorkbook) {
final String originalSheetName = originalSheet.getSheetName();

try (final SXSSFWorkbook newWorkbook = new SXSSFWorkbook(null, SXSSFWorkbook.DEFAULT_WINDOW_SIZE, false, true)) {
final SXSSFSheet newSheet = newWorkbook.createSheet(originalSheetName);
final int numberOfCopiedRows = copyRows(originalSheet, newSheet);

final FlowFile newFlowFile = session.create(originalFlowFile);
try (final OutputStream out = session.write(newFlowFile)) {
newWorkbook.write(out);
workbookSplits.add(new WorkbookSplit(index, newFlowFile, originalSheetName, numberOfCopiedRows));
}
}

index++;
if (inputFileType == InputFileType.XLSX) {
handleXSSF(session, originalFlowFile, in, password, workbookSplits, cellCopyPolicy);
} else {
handleHSSF(session, originalFlowFile, in, password, workbookSplits, cellCopyPolicy);
}
});
} catch (ExcelRuntimeException | IllegalStateException | ProcessException e) {
Expand Down Expand Up @@ -229,7 +217,56 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
session.transfer(flowFileSplits, REL_SPLIT);
}

private int copyRows(final Sheet originalSheet, final SXSSFSheet destinationSheet) {
private CellCopyPolicy createCellCopyPolicy(InputFileType inputFileType) {
CellCopyPolicy.Builder builder = new CellCopyPolicy.Builder()
.cellFormula(false) // NOTE: setting to false allows for copying the evaluated formula value.
.cellStyle(CellCopyPolicy.DEFAULT_COPY_CELL_STYLE_POLICY)
.cellValue(CellCopyPolicy.DEFAULT_COPY_CELL_VALUE_POLICY)
.condenseRows(CellCopyPolicy.DEFAULT_CONDENSE_ROWS_POLICY)
.mergeHyperlink(CellCopyPolicy.DEFAULT_MERGE_HYPERLINK_POLICY)
.rowHeight(CellCopyPolicy.DEFAULT_COPY_ROW_HEIGHT_POLICY);

if (inputFileType == InputFileType.XLSX) {
builder.copyHyperlink(false) // NOTE: the hyperlinks appear at end of sheet, so we need to iterate them separately at the end.
.mergedRegions(false); // NOTE: set to false because of the explicit merge region handling in the copyRows method.
} else {
builder.copyHyperlink(CellCopyPolicy.DEFAULT_COPY_HYPERLINK_POLICY)
.mergedRegions(CellCopyPolicy.DEFAULT_COPY_MERGED_REGIONS_POLICY);
}

return builder.build();
}

private void handleXSSF(ProcessSession session, FlowFile originalFlowFile, InputStream inputStream, String password,
List<WorkbookSplit> workbookSplits, CellCopyPolicy cellCopyPolicy) throws IOException {
final Workbook originalWorkbook = StreamingReader.builder()
.rowCacheSize(100)
.bufferSize(4096)
.password(password)
.setReadHyperlinks(true) // NOTE: Needed for copying rows.
.setReadSharedFormulas(true) // NOTE: If not set to true, then data with shared formulas fail.
.open(inputStream);

int index = 0;
for (final Sheet originalSheet : originalWorkbook) {
final String originalSheetName = originalSheet.getSheetName();

try (final SXSSFWorkbook newWorkbook = new SXSSFWorkbook(null, SXSSFWorkbook.DEFAULT_WINDOW_SIZE, false, true)) {
final SXSSFSheet newSheet = newWorkbook.createSheet(originalSheetName);
final int numberOfCopiedRows = copyRows(originalSheet, newSheet, cellCopyPolicy);

final FlowFile newFlowFile = session.create(originalFlowFile);
try (final OutputStream out = session.write(newFlowFile)) {
newWorkbook.write(out);
workbookSplits.add(new WorkbookSplit(index, newFlowFile, originalSheetName, numberOfCopiedRows));
}
}

index++;
}
}

private int copyRows(final Sheet originalSheet, final SXSSFSheet destinationSheet, CellCopyPolicy cellCopyPolicy) {
final CellCopyContext cellCopyContext = new CellCopyContext();
int rowCount = 0;

Expand All @@ -239,7 +276,7 @@ private int copyRows(final Sheet originalSheet, final SXSSFSheet destinationShee

for (final Cell sourceCell : sourceRow) {
final Cell destCell = destinationRow.createCell(sourceCell.getColumnIndex());
CellUtil.copyCell(sourceCell, destCell, CELL_COPY_POLICY, cellCopyContext);
CellUtil.copyCell(sourceCell, destCell, cellCopyPolicy, cellCopyContext);
}

rowCount++;
Expand All @@ -256,5 +293,50 @@ private int copyRows(final Sheet originalSheet, final SXSSFSheet destinationShee
return rowCount;
}

private record WorkbookSplit(int index, FlowFile content, String sheetName, int numRows) { }
private void handleHSSF(ProcessSession session, FlowFile originalFlowFile, InputStream inputStream, String password, List<WorkbookSplit> workbookSplits, CellCopyPolicy cellCopyPolicy) {
// Providing the password to the HSSFWorkbook is done by setting a thread variable managed by
// Biff8EncryptionKey. After the workbook is created, the thread variable can be cleared.
Biff8EncryptionKey.setCurrentUserPassword(password);

try {
final HSSFWorkbook originalWorkbook = new HSSFWorkbook(inputStream);
final Iterator<Sheet> originalSheetsIterator = originalWorkbook.sheetIterator();
final CellCopyContext cellCopyContext = new CellCopyContext();

int index = 0;
while (originalSheetsIterator.hasNext()) {
final HSSFSheet originalSheet = (HSSFSheet) originalSheetsIterator.next();
final String originalSheetName = originalSheet.getSheetName();
//NOTE: Per the POI Javadocs, the rowIterator returns an iterator of the physical rows,
// hence the original number of rows should reflect this.
final int originalNumRows = originalSheet.getPhysicalNumberOfRows();
final Iterator<Row> originalRowsIterator = originalSheet.rowIterator();

try (HSSFWorkbook newWorkbook = new HSSFWorkbook()) {
final HSSFSheet newSheet = newWorkbook.createSheet(originalSheetName);
while (originalRowsIterator.hasNext()) {
HSSFRow originalRow = (HSSFRow) originalRowsIterator.next();
HSSFRow newRow = newSheet.createRow(originalRow.getRowNum());
newRow.copyRowFrom(originalRow, cellCopyPolicy, cellCopyContext);
}

FlowFile newFlowFile = session.create(originalFlowFile);

try (final OutputStream out = session.write(newFlowFile)) {
newWorkbook.write(out);
workbookSplits.add(new WorkbookSplit(index, newFlowFile, originalSheetName, originalNumRows));
}
}
index++;
}

} catch (final IOException e) {
throw new ProcessException("Failed to split XLS file", e);
} finally {
Biff8EncryptionKey.setCurrentUserPassword(null);
}
}

private record WorkbookSplit(int index, FlowFile content, String sheetName, int numRows) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,26 @@
*/
package org.apache.nifi.processors.excel;

import org.apache.nifi.excel.InputFileType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.CellStyle;
import org.apache.poi.ss.usermodel.CellType;
import org.apache.poi.ss.usermodel.CreationHelper;
import org.apache.poi.ss.usermodel.DateUtil;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.xssf.usermodel.XSSFHyperlink;
import org.apache.poi.xssf.usermodel.XSSFSheet;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -198,8 +202,9 @@ void testDataWithSharedFormula() throws IOException {
}
}

@Test
void testCopyDateTime() throws Exception {
@ParameterizedTest
@EnumSource(InputFileType.class)
void testCopyDateTime(InputFileType inputFileType) throws Exception {
final LocalDateTime localDateTime = LocalDateTime.of(2023, 1, 1, 0, 0, 0);
final LocalDateTime nonValidExcelDate = LocalDateTime.of(1899, 12, 31, 0, 0, 0);

Expand All @@ -210,14 +215,15 @@ void testCopyDateTime() throws Exception {
};

final ByteArrayOutputStream workbookOutputStream = new ByteArrayOutputStream();
try (XSSFWorkbook workbook = new XSSFWorkbook()) {
final XSSFSheet sheet = workbook.createSheet("SomeSheetName");
try (Workbook workbook = InputFileType.XLSX == inputFileType ? new XSSFWorkbook() : new HSSFWorkbook()) {
final Sheet sheet = workbook.createSheet("SomeSheetName");
populateSheet(sheet, data);
setCellStyles(sheet, workbook);
workbook.write(workbookOutputStream);
}

final ByteArrayInputStream input = new ByteArrayInputStream(workbookOutputStream.toByteArray());
runner.setProperty(SplitExcel.INPUT_FILE_TYPE, inputFileType.getValue());
runner.enqueue(input);
runner.run();

Expand All @@ -226,7 +232,7 @@ void testCopyDateTime() throws Exception {
runner.assertTransferCount(SplitExcel.REL_FAILURE, 0);

final MockFlowFile flowFile = runner.getFlowFilesForRelationship(SplitExcel.REL_SPLIT).getFirst();
try (XSSFWorkbook workbook = new XSSFWorkbook(flowFile.getContentStream())) {
try (Workbook workbook = InputFileType.XLSX == inputFileType ? new XSSFWorkbook(flowFile.getContentStream()) : new HSSFWorkbook(flowFile.getContentStream())) {
final Sheet firstSheet = workbook.sheetIterator().next();

List<List<Cell>> dateCells = Stream.iterate(firstSheet.getFirstRowNum() + 1, rowIndex -> rowIndex + 1)
Expand Down Expand Up @@ -273,7 +279,7 @@ void testHyperlinks() throws IOException {
}
}

private static void populateSheet(XSSFSheet sheet, Object[][] data) {
private static void populateSheet(Sheet sheet, Object[][] data) {
int rowCount = 0;
for (Object[] dataRow : data) {
Row row = sheet.createRow(rowCount++);
Expand All @@ -293,7 +299,7 @@ private static void populateSheet(XSSFSheet sheet, Object[][] data) {
}
}

void setCellStyles(XSSFSheet sheet, XSSFWorkbook workbook) {
void setCellStyles(Sheet sheet, Workbook workbook) {
CreationHelper creationHelper = workbook.getCreationHelper();
CellStyle dayMonthYearCellStyle = workbook.createCellStyle();
dayMonthYearCellStyle.setDataFormat(creationHelper.createDataFormat().getFormat("dd/mm/yyyy"));
Expand Down
Loading