Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.bitmap.Bitmap;
import org.apache.flink.types.variant.Variant;

import java.util.List;
Expand Down Expand Up @@ -257,6 +258,16 @@ public Variant getVariant(int pos) {
}
}

@Override
public Bitmap getBitmap(int pos) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getBitmap(index);
} else {
return fixedRow.getBitmap(-(index + 1));
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
/**
* A compressed data structure for storing sets of 32-bit integers.
*
* <p>Currently, the only supported implementation is {@link RoaringBitmapData}, which is backed by
* <a href="https://roaringbitmap.org/">Roaring Bitmap</a>. Custom implementations of this interface
* are not supported.
*
* <p>The modifying methods in this interface modify the bitmap in place by default. Consider using
* {@link Bitmap#from(Bitmap other)} to create a copied bitmap before modification if immutability
* is required.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BitmapType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DateType;
Expand Down Expand Up @@ -162,6 +163,8 @@ private LogicalType deserializeInternal(JsonNode logicalTypeNode) {
return deserializeRaw(logicalTypeNode).copy(isNullable);
case VARIANT:
return new VariantType(isNullable);
case BITMAP:
return new BitmapType(isNullable);
default:
throw new UnsupportedOperationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ private void serializeInternal(LogicalType logicalType, JsonGenerator jsonGenera
case DOUBLE:
case DATE:
case VARIANT:
case BITMAP:
break;
case CHAR:
jsonGenerator.writeNumberField(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BitmapType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DateType;
Expand Down Expand Up @@ -195,6 +196,7 @@ private static List<LogicalType> generateTestData() {
new MultisetType(VarCharType.ofEmptyLiteral()),
new MultisetType(BinaryType.ofEmptyLiteral()),
new MultisetType(VarBinaryType.ofEmptyLiteral()),
new BitmapType(),
RowType.of(new BigIntType(), new IntType(false), new VarCharType(200)),
RowType.of(
new LogicalType[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,5 @@ RelDataType createStructuredType(
String className, List<RelDataType> typeList, List<String> fieldNameList);

/** Creates a BITMAP type. */
default RelDataType createBitmapType() {
// The actual implementation will be provided by FlinkTypeFactory as part of FLINK-39185.
throw new UnsupportedOperationException("BITMAP type is not supported yet.");
}
RelDataType createBitmapType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ public final class FailingSqlTestStep implements TestStep {
FailingSqlTestStep(
String sql, Class<? extends Exception> expectedException, String expectedErrorMessage) {
Preconditions.checkArgument(
// UnsupportedOperationException is a special case in GenerateUtils#generateCompare
expectedException == ValidationException.class
|| expectedException == TableRuntimeException.class,
|| expectedException == TableRuntimeException.class
|| expectedException == UnsupportedOperationException.class,
"Usually a SQL query should fail with either validation or runtime exception. "
+ "Otherwise this might require an update to the exception design.");
this.sql = sql;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BitmapType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DateType;
Expand Down Expand Up @@ -1063,6 +1064,17 @@ public static DataType VARIANT() {
return new AtomicDataType(new VariantType());
}

/**
* Data type of bitmap data.
*
* <p>The type supports storing 32-bit integers in a compressed form.
*
* @see BitmapType
*/
public static DataType BITMAP() {
return new AtomicDataType(new BitmapType());
}

// --------------------------------------------------------------------------------------------
// Helper functions
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DistinctType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.bitmap.Bitmap;
import org.apache.flink.types.variant.Variant;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -116,6 +117,12 @@ public interface ArrayData {
*/
RowData getRow(int pos, int numFields);

/** Returns the bitmap value at the given position. */
default Bitmap getBitmap(int pos) {
throw new UnsupportedOperationException(
"This ArrayData implementation does not support Bitmap type.");
}

// ------------------------------------------------------------------------------------------
// Conversion Utilities
// ------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -215,6 +222,9 @@ static ElementGetter createElementGetter(LogicalType elementType) {
case VARIANT:
elementGetter = ArrayData::getVariant;
break;
case BITMAP:
elementGetter = ArrayData::getBitmap;
break;
case NULL:
case SYMBOL:
case UNRESOLVED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.types.bitmap.Bitmap;
import org.apache.flink.types.variant.Variant;

import org.apache.commons.lang3.ArrayUtils;
Expand Down Expand Up @@ -259,6 +260,11 @@ public MapData getMap(int pos) {
return (MapData) getObject(pos);
}

@Override
public Bitmap getBitmap(int pos) {
return (Bitmap) getObject(pos);
}

private Object getObject(int pos) {
return ((Object[]) array)[pos];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.bitmap.Bitmap;
import org.apache.flink.types.variant.Variant;
import org.apache.flink.util.StringUtils;

Expand Down Expand Up @@ -211,6 +212,11 @@ public Variant getVariant(int pos) {
return (Variant) this.fields[pos];
}

@Override
public Bitmap getBitmap(int pos) {
return (Bitmap) this.fields[pos];
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.bitmap.Bitmap;
import org.apache.flink.types.variant.Variant;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -108,6 +109,8 @@
* +--------------------------------+-----------------------------------------+
* | RAW | {@link RawValueData} |
* +--------------------------------+-----------------------------------------+
* | BITMAP | {@link Bitmap} |
* +--------------------------------+-----------------------------------------+
* </pre>
*
* <p>Nullability is always handled by the container data structure.
Expand Down Expand Up @@ -205,6 +208,12 @@ public interface RowData {
/** Returns the variant value at the given position. */
Variant getVariant(int pos);

/** Returns the bitmap value at the given position. */
default Bitmap getBitmap(int pos) {
throw new UnsupportedOperationException(
"This RowData implementation does not support Bitmap type.");
}

// ------------------------------------------------------------------------------------------
// Access Utilities
// ------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -287,6 +296,9 @@ static FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) {
case VARIANT:
fieldGetter = row -> row.getVariant(fieldPos);
break;
case BITMAP:
fieldGetter = row -> row.getBitmap(fieldPos);
break;
case NULL:
case SYMBOL:
case UNRESOLVED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.types.logical.DistinctType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
import org.apache.flink.types.bitmap.Bitmap;
import org.apache.flink.types.variant.Variant;

import java.lang.reflect.Array;
Expand Down Expand Up @@ -93,6 +94,7 @@ public static int calculateFixLengthPartSize(LogicalType type) {
case STRUCTURED_TYPE:
case RAW:
case VARIANT:
case BITMAP:
// long and double are 8 bytes;
// otherwise it stores the length and offset of the variable-length part for types
// such as is string, map, etc.
Expand Down Expand Up @@ -287,6 +289,14 @@ public RowData getRow(int pos, int numFields) {
return BinarySegmentUtils.readRowData(segments, numFields, offset, offsetAndSize);
}

@Override
public Bitmap getBitmap(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getElementOffset(pos, 8);
final long offsetAndSize = BinarySegmentUtils.getLong(segments, fieldOffset);
return BinarySegmentUtils.readBitmap(segments, offset, offsetAndSize);
}

@Override
public boolean getBoolean(int pos) {
assertIndexIsValid(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.bitmap.Bitmap;
import org.apache.flink.types.variant.BinaryVariant;

import java.nio.ByteOrder;
Expand Down Expand Up @@ -395,6 +396,12 @@ public BinaryVariant getVariant(int pos) {
return BinarySegmentUtils.readVariant(segments, offset, getLong(pos));
}

@Override
public Bitmap getBitmap(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.readBitmap(segments, offset, getLong(pos));
}

/** The bit is 1 when the field is null. Default is 0. */
@Override
public boolean anyNull() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.types.bitmap.Bitmap;
import org.apache.flink.types.variant.BinaryVariant;

import java.io.IOException;
Expand Down Expand Up @@ -1190,4 +1191,12 @@ public static BinaryVariant readVariant(

return new BinaryVariant(value, meta);
}

/** Gets an instance of {@link Bitmap} from underlying {@link MemorySegment}. */
public static Bitmap readBitmap(MemorySegment[] segments, int baseOffset, long offsetAndSize) {
final int size = ((int) offsetAndSize);
int offset = (int) (offsetAndSize >> 32);
byte[] bytes = copyToBytes(segments, offset + baseOffset, size);
return Bitmap.fromBytes(bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.bitmap.Bitmap;
import org.apache.flink.types.variant.Variant;

import static org.apache.flink.table.data.binary.BinaryRowData.calculateBitSetWidthInBytes;
Expand Down Expand Up @@ -305,6 +306,12 @@ public Variant getVariant(int pos) {
return BinarySegmentUtils.readVariant(segments, offset, getLong(pos));
}

@Override
public Bitmap getBitmap(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.readBitmap(segments, offset, getLong(pos));
}

@Override
public ArrayData getArray(int pos) {
assertIndexIsValid(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.table.data.columnar.vector.RowColumnVector;
import org.apache.flink.table.data.columnar.vector.ShortColumnVector;
import org.apache.flink.table.data.columnar.vector.TimestampColumnVector;
import org.apache.flink.types.bitmap.Bitmap;
import org.apache.flink.types.variant.Variant;

import java.util.Arrays;
Expand Down Expand Up @@ -161,6 +162,11 @@ public RowData getRow(int pos, int numFields) {
return ((RowColumnVector) data).getRow(offset + pos);
}

@Override
public Bitmap getBitmap(int pos) {
throw new UnsupportedOperationException("Bitmap is not supported yet.");
}

@Override
public void setBoolean(int pos, boolean value) {
throw new UnsupportedOperationException("Not support the operation!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.table.data.columnar.vector.BytesColumnVector.Bytes;
import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.bitmap.Bitmap;
import org.apache.flink.types.variant.Variant;

/**
Expand Down Expand Up @@ -161,6 +162,11 @@ public Variant getVariant(int pos) {
return vectorizedColumnBatch.getVariant(rowId, pos);
}

@Override
public Bitmap getBitmap(int pos) {
throw new UnsupportedOperationException("Bitmap is not supported yet.");
}

@Override
public ArrayData getArray(int pos) {
return vectorizedColumnBatch.getArray(rowId, pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.bitmap.Bitmap;
import org.apache.flink.types.variant.Variant;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -259,6 +260,15 @@ public Variant getVariant(int pos) {
}
}

@Override
public Bitmap getBitmap(int pos) {
if (pos < row1.getArity()) {
return row1.getBitmap(pos);
} else {
return row2.getBitmap(pos - row1.getArity());
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Loading