Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ Make sure to allocate enough memory to allow this.
Use a transform like Get File Names to obtain file names.
Any supported file location is fine.

|Metadata filename
|If you specify a filename here, you can leave the fields section empty and Hop will automatically determine
the output fields. It prevents you from having to define all the fields when this metadata is already in
a parquet file schema.

|Fields
|In this table you can specify all the fields you want to obtain from the parquet files as well as their desired Hop output type.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.vfs2.FileObject;
import org.apache.hop.core.RowMetaAndData;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.IValueMeta;
import org.apache.hop.core.row.RowDataUtil;
import org.apache.hop.core.vfs.HopVfs;
import org.apache.hop.pipeline.Pipeline;
Expand All @@ -43,7 +47,6 @@ public ParquetInput(

@Override
public boolean processRow() throws HopException {

Object[] row = getRow();
if (row == null) {
// No more files, we're done.
Expand All @@ -54,14 +57,15 @@ public boolean processRow() throws HopException {

if (first) {
first = false;
data.outputRowMeta = getInputRowMeta().clone();
meta.getFields(data.outputRowMeta, getTransformName(), null, null, this, metadataProvider);

data.filenameFieldIndex = getInputRowMeta().indexOfValue(resolve(meta.getFilenameField()));
if (data.filenameFieldIndex < 0) {
throw new HopException(
"Unable to find filename field " + meta.getFilenameField() + " in the input");
}

data.outputRowMeta = getInputRowMeta().clone();
meta.getFields(data.outputRowMeta, getTransformName(), null, null, this, metadataProvider);
}

// Skip null values for file names
Expand All @@ -74,6 +78,26 @@ public boolean processRow() throws HopException {
FileObject fileObject = HopVfs.getFileObject(filename, variables);

try {
List<ParquetField> fields = new ArrayList<>(meta.getFields());

// If we don't have any fields specified, we read them all.
//
if (fields.isEmpty()) {
//
IRowMeta parquetRowMeta = ParquetInputMeta.extractRowMeta(this, filename);
for (int i = 0; i < parquetRowMeta.size(); i++) {
IValueMeta parquetValueMeta = parquetRowMeta.getValueMeta(i);
fields.add(
new ParquetField(
parquetValueMeta.getName(),
parquetValueMeta.getName(),
parquetValueMeta.getTypeDesc(),
parquetValueMeta.getFormatMask(),
Integer.toString(parquetValueMeta.getLength()),
Integer.toString(parquetValueMeta.getPrecision())));
}
}

long size = fileObject.getContent().getSize();
data.inputStream = HopVfs.getInputStream(fileObject);

Expand All @@ -83,7 +107,7 @@ public boolean processRow() throws HopException {
IOUtils.copy(data.inputStream, outputStream);
ParquetStream inputFile = new ParquetStream(outputStream.toByteArray(), filename);

ParquetReadSupport readSupport = new ParquetReadSupport(meta.getFields());
ParquetReadSupport readSupport = new ParquetReadSupport(fields);
data.reader = new ParquetReaderBuilder<>(readSupport, inputFile).build();

RowMetaAndData r = data.reader.read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,19 @@

package org.apache.hop.parquet.transforms.input;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.vfs2.FileObject;
import org.apache.hop.core.Const;
import org.apache.hop.core.RowMetaAndData;
import org.apache.hop.core.logging.LogChannel;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.IValueMeta;
import org.apache.hop.core.row.RowMeta;
import org.apache.hop.core.row.value.ValueMetaFactory;
import org.apache.hop.core.variables.IVariables;
import org.apache.hop.core.vfs.HopVfs;
import org.apache.hop.i18n.BaseMessages;
import org.apache.hop.pipeline.PipelineMeta;
import org.apache.hop.ui.core.PropsUi;
import org.apache.hop.ui.core.dialog.BaseDialog;
import org.apache.hop.ui.core.gui.WindowProperty;
import org.apache.hop.ui.core.widget.ColumnInfo;
import org.apache.hop.ui.core.widget.TableView;
import org.apache.hop.ui.core.widget.TextVar;
import org.apache.hop.ui.pipeline.transform.BaseTransformDialog;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.eclipse.swt.SWT;
import org.eclipse.swt.layout.FormAttachment;
import org.eclipse.swt.layout.FormData;
Expand All @@ -63,6 +49,7 @@ public class ParquetInputDialog extends BaseTransformDialog {
protected ParquetInputMeta input;

private Combo wFilenameField;
private TextVar wMetaFilename;
private TableView wFields;

private String returnValue;
Expand Down Expand Up @@ -143,6 +130,23 @@ public String open() {
wFilenameField.setLayoutData(fdFilenameField);
lastControl = wFilenameField;

Label wlMetaFilename = new Label(shell, SWT.RIGHT);
wlMetaFilename.setText(BaseMessages.getString(PKG, "ParquetInputDialog.MetaFilename.Label"));
PropsUi.setLook(wlMetaFilename);
FormData fdlMetaFilename = new FormData();
fdlMetaFilename.left = new FormAttachment(0, 0);
fdlMetaFilename.right = new FormAttachment(middle, -margin);
fdlMetaFilename.top = new FormAttachment(lastControl, margin);
wlMetaFilename.setLayoutData(fdlMetaFilename);
wMetaFilename = new TextVar(variables, shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER);
PropsUi.setLook(wMetaFilename);
FormData fdMetaFilename = new FormData();
fdMetaFilename.left = new FormAttachment(middle, 0);
fdMetaFilename.top = new FormAttachment(wlMetaFilename, 0, SWT.CENTER);
fdMetaFilename.right = new FormAttachment(100, 0);
wMetaFilename.setLayoutData(fdMetaFilename);
lastControl = wMetaFilename;

Label wlFields = new Label(shell, SWT.LEFT);
wlFields.setText(BaseMessages.getString(PKG, "ParquetInputDialog.Fields.Label"));
PropsUi.setLook(wlFields);
Expand Down Expand Up @@ -211,63 +215,7 @@ private void getFields() {
new String[] {"Parquet files", "All files"},
true);
if (filename != null) {
FileObject fileObject = HopVfs.getFileObject(variables.resolve(filename), variables);

long size = fileObject.getContent().getSize();
InputStream inputStream = HopVfs.getInputStream(fileObject);

// Reads the whole file into memory...
//
ByteArrayOutputStream outputStream = new ByteArrayOutputStream((int) size);
IOUtils.copy(inputStream, outputStream);
ParquetStream inputFile = new ParquetStream(outputStream.toByteArray(), filename);
// Empty list of fields to retrieve: we still grab the schema
//
ParquetReadSupport readSupport = new ParquetReadSupport(new ArrayList<>());
ParquetReader<RowMetaAndData> reader =
new ParquetReaderBuilder<>(readSupport, inputFile).build();

// Read one empty row...
//
reader.read();

// Now we have the schema...
//
MessageType schema = readSupport.getMessageType();
IRowMeta rowMeta = new RowMeta();
List<ColumnDescriptor> columns = schema.getColumns();
for (ColumnDescriptor column : columns) {
String sourceField = "";
String[] path = column.getPath();
if (path.length == 1) {
sourceField = path[0];
} else {
for (int i = 0; i < path.length; i++) {
if (i > 0) {
sourceField += ".";
}
sourceField += path[i];
}
}
PrimitiveType primitiveType = column.getPrimitiveType();
int hopType = IValueMeta.TYPE_STRING;
switch (primitiveType.getPrimitiveTypeName()) {
case INT32, INT64:
hopType = IValueMeta.TYPE_INTEGER;
break;
case INT96:
hopType = IValueMeta.TYPE_BINARY;
break;
case FLOAT, DOUBLE:
hopType = IValueMeta.TYPE_NUMBER;
break;
case BOOLEAN:
hopType = IValueMeta.TYPE_BOOLEAN;
break;
}
IValueMeta valueMeta = ValueMetaFactory.createValueMeta(sourceField, hopType, -1, -1);
rowMeta.addValueMeta(valueMeta);
}
IRowMeta rowMeta = ParquetInputMeta.extractRowMeta(variables, filename);

BaseTransformDialog.getFieldsFromPrevious(
rowMeta, wFields, 1, new int[] {1, 2}, new int[] {3}, -1, -1, null);
Expand All @@ -287,6 +235,7 @@ private void getData() {

wTransformName.setText(Const.NVL(transformName, ""));
wFilenameField.setText(Const.NVL(input.getFilenameField(), ""));
wMetaFilename.setText(Const.NVL(input.getMetadataFilename(), ""));
for (int i = 0; i < input.getFields().size(); i++) {
ParquetField field = input.getFields().get(i);
TableItem item = wFields.table.getItem(i);
Expand All @@ -310,6 +259,7 @@ private void ok() {

private void getInfo(ParquetInputMeta meta) {
meta.setFilenameField(wFilenameField.getText());
meta.setMetadataFilename(wMetaFilename.getText());
meta.getFields().clear();
for (TableItem item : wFields.getNonEmptyItems()) {
int index = 1;
Expand All @@ -329,10 +279,4 @@ private void cancel() {
returnValue = null;
dispose();
}

@Override
public void dispose() {
props.setScreen(new WindowProperty(shell));
shell.dispose();
}
}
Loading