Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,16 @@ jobs:
name: "Java 11"
uses: ./.github/workflows/ci-template.yaml
with:
java-version: "11"
java-version: "11"

build-on-jdk17:
name: "Java 17"
uses: ./.github/workflows/ci-template.yaml
with:
java-version: "17"

build-on-jdk21:
name: "Java 21"
uses: ./.github/workflows/ci-template.yaml
with:
java-version: "21"
Comment on lines +57 to +69
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or test only on one jdk, and only build for others?
Tests for every jdk may be move to nightly?

2 changes: 2 additions & 0 deletions .mvn/jvm.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-XX:+IgnoreUnrecognizedVMOptions
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class TableScan implements Scan {

/** The projected fields to do projection. No projection if is null. */
@Nullable private final int[] projectedColumns;

/** The limited row number to read. No limit if is null. */
@Nullable private final Integer limit;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
class BucketScanStatus {
private long offset; // last consumed position
private long highWatermark; // the high watermark from last fetch

// TODO add resetStrategy and nextAllowedRetryTimeMs.

public BucketScanStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ private void insertInSequenceOrder(
public static final class RecordAppendResult {
public final boolean batchIsFull;
public final boolean newBatchCreated;

/** Whether this record was abort because the new batch created in record accumulator. */
public final boolean abortRecordForNewBatch;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class WriterClient {
private static final Logger LOG = LoggerFactory.getLogger(WriterClient.class);

public static final String SENDER_THREAD_PREFIX = "fluss-write-sender";

/**
* {@link ConfigOptions#CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET} should be less than or
* equal to this value when idempotence producer enabled to ensure message ordering.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBu
"The uncompressed buffer size exceeds the integer limit");

byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
uncompressedBuffer.getBytes(/*index=*/ 0, inBytes);
uncompressedBuffer.getBytes(/* index= */ 0, inBytes);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (InputStream in = new ByteArrayInputStream(inBytes);
OutputStream out = new FlussLZ4BlockOutputStream(baos)) {
Expand Down Expand Up @@ -86,7 +86,7 @@ protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBu

byte[] outBytes = out.toByteArray();
ArrowBuf decompressedBuffer = allocator.buffer(outBytes.length);
decompressedBuffer.setBytes(/*index=*/ 0, outBytes);
decompressedBuffer.setBytes(/* index= */ 0, outBytes);
decompressedBuffer.writerIndex(decompressedLength);
return decompressedBuffer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ public <T extends Enum<T>> T getEnum(
public Map<String, String> getMap(ConfigOption<Map<String, String>> configOption) {
return getOptional(configOption).orElseGet(configOption::defaultValue);
}

// --------------------------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ public class MeterView implements Meter, MetricView {

/** The underlying counter maintaining the count. */
private final Counter counter;

/** The time-span over which the average is calculated. */
private final int timeSpanInSeconds;

/** Circular array containing the history of values. */
private final long[] values;

/** The index in the array for the current time. */
private int time = 0;

/** The last rate we computed. */
private double currentRate = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public static FieldWriter createFieldWriter(DataType fieldType) {

case BIGINT:
return (writer, value) -> writer.writeLong((long) value);
// support for nanoseconds come check again after #1195 merge
// support for nanoseconds come check again after #1195 merge
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (writer, value) -> {
TimestampNtz ts = (TimestampNtz) value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
@PublicEvolving
public class FlussPrincipal implements Principal {
public static final FlussPrincipal ANONYMOUS = new FlussPrincipal("ANONYMOUS", "User");

/** The wildcard principal, which represents all principals. */
public static final FlussPrincipal WILD_CARD_PRINCIPAL = new FlussPrincipal("*", "*");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public static void checkArgument(
throw new IllegalArgumentException(format(errorMessageTemplate, errorMessageArgs));
}
}

// ------------------------------------------------------------------------
// Boolean Condition Checking (State)
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@
public class Projection {
/** the projection indexes including both selected fields and reordering them. */
final int[] projection;

/** the projection indexes that only select fields but not reordering them. */
final int[] projectionInOrder;

/** the indexes to reorder the fields of {@link #projectionInOrder} to {@link #projection}. */
final int[] reorderingIndexes;

/** the flag to indicate whether reordering is needed. */
final boolean reorderingNeeded;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class Tuple2<T0, T1> extends Tuple {

/** Field 0 of the tuple. */
public T0 f0;

/** Field 1 of the tuple. */
public T1 f1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1586,6 +1586,7 @@ public static class CompleteMultipartUploadHandler extends AbstractSSEHandler
protected ServerSideEncryptionResult sseResult() {
return result;
}

/**
* @see com.amazonaws.services.s3.model.CompleteMultipartUploadResult#getExpirationTime()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,10 @@ public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath)
@Override
public List<CatalogPartitionSpec> listPartitions(
ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec)
throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, CatalogException {
throws TableNotExistException,
TableNotPartitionedException,
PartitionSpecInvalidException,
CatalogException {

// TODO lake table should support.
if (objectPath.getObjectName().contains(LAKE_TABLE_SPLITTER)) {
Expand Down Expand Up @@ -555,8 +557,10 @@ public void createPartition(
CatalogPartitionSpec catalogPartitionSpec,
CatalogPartition catalogPartition,
boolean b)
throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionAlreadyExistsException,
throws TableNotExistException,
TableNotPartitionedException,
PartitionSpecInvalidException,
PartitionAlreadyExistsException,
CatalogException {
TablePath tablePath = toTablePath(objectPath);
PartitionSpec partitionSpec = new PartitionSpec(catalogPartitionSpec.getPartitionSpec());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class LakeSnapshotAndFlussLogSplit extends SourceSplitBase {
* lake split via this lake split index.
*/
private int currentLakeSplitIndex;

/** The records to skip when reading a lake split. */
private long recordToSkip;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class FlinkRecordsWithSplitIds implements RecordsWithSplitIds<RecordAndPo

/** SplitId iterator. */
private final Iterator<String> splitIterator;

/** The table buckets of the split in splitIterator. */
private final Iterator<TableBucket> tableBucketIterator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class HybridSnapshotLogSplitState extends SourceSplitState {

/** The records to skip while reading a snapshot. */
private long recordsToSkip;

/** Whether the snapshot reading is finished. */
private boolean snapshotFinished;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class KvSnapshotResource {

/** A scheduler to schedule kv snapshot. */
private final ScheduledExecutorService kvSnapshotScheduler;

/** Thread pool for async snapshot workers. */
private final ExecutorService asyncOperationsThreadPool;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class KvSnapshotHandle {

/** The shared file(like data file) handles of the kv snapshot. */
private final List<KvFileHandleAndLocalPath> sharedFileHandles;

/** The private file(like meta file) handles of the kv snapshot. */
private final List<KvFileHandleAndLocalPath> privateFileHandles;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class PeriodicSnapshotManager implements Closeable {
private volatile boolean started = false;

private final long initialDelay;

/** The table bucket that the snapshot manager is for. */
private final TableBucket tableBucket;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ private static final class SharedKvEntry {

private final long createdBySnapshotID;
private long lastUsedSnapshotID;

/** The shared kv file handle. */
KvFileHandle kvFileHandle;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public abstract class AbstractIndex implements Closeable {

/** The maximum number of entries this index can hold. */
private volatile int maxEntries;

/** The number of entries in this index. */
private volatile int entries;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public final class FetchParams {

private final int minFetchBytes;
private final long maxWaitMs;

// TODO: add more params like epoch etc.

public FetchParams(int replicaId, int maxFetchBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class ListOffsetsParam {
* from follower, it represents listing LocalLogStartOffset.
*/
public static final int EARLIEST_OFFSET_TYPE = 0;

/**
* Latest offset type. If the list offsets request come from client, it represents listing
* HighWatermark. otherwise, the request come from follower, it represents listing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class WriterStateManager {
private final Map<Long, WriterStateEntry> writers = new HashMap<>();

private final File logTabletDir;

/** The same as writers#size, but for lock-free access. */
private volatile int writerIdCount = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class FsRemoteLogOutputStream extends FSDataOutputStream {
private final FsPath basePath;
private final FileSystem fs;
private final byte[] writeBuffer;

/** The file path can be log file, index file or remote log metadata file. */
private final FsPath remoteLogFilePath;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class LogSegmentFiles {
private final Path offsetIndex;
private final Path timeIndex;
private final @Nullable Path writerIdIndex;

// TODO add leader epoch index after introduce leader epoch.

public LogSegmentFiles(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public final class Replica {
private final LogManager logManager;
private final LogTablet logTablet;
private final long replicaMaxLagTime;

/** A closeable registry to register all registered {@link Closeable}s. */
private final CloseableRegistry closeableRegistry;

Expand All @@ -164,6 +165,7 @@ public final class Replica {
private final int localTabletServerId;
private final DelayedOperationManager<DelayedWrite<?>> delayedWriteManager;
private final DelayedOperationManager<DelayedFetchLog> delayedFetchLogManager;

/** The manger to manger the isr expand and shrink. */
private final AdjustIsrManager adjustIsrManager;

Expand All @@ -177,6 +179,7 @@ public final class Replica {
private final Clock clock;

private static final int INIT_KV_TABLET_MAX_RETRY_TIMES = 5;

/**
* storing the remote follower replicas' state, used to update leader's highWatermark and
* replica ISR.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,10 @@ public String toString() {
public static class DelayedBucketStatus<T extends WriteResultForBucket> {
private final long requiredOffset;
private final T writeResultForBucket;

/** Whether this bucket is waiting acks. */
private volatile boolean acksPending;

/** The error code of the delayed operation. */
private volatile Errors delayedError;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ final class RemoteLeaderEndpoint implements LeaderEndpoint {
private final int followerServerId;
private final int remoteServerId;
private final TabletServerGateway tabletServerGateway;

/** The max size for the fetch response. */
private final int maxFetchSize;

/** The max fetch size for a bucket in bytes. */
private final int maxFetchSizeForBucket;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ final class TimingWheel {

private final DelayQueue<TimerTaskList> queue;
private final Clock clock;

/** The upper level timing wheel. */
private volatile TimingWheel overflowWheel;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,7 @@ public void registerPartitionAssignmentAndMetadata(
ops.add(metadataPartitionNode);
zkClient.transaction().forOperations(ops);
}

// --------------------------------------------------------------------------------------------
// Schema
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class RocksDBExtension implements BeforeEachCallback, AfterEachCallback {
private File rockDbDir;

private RocksDBResourceContainer rocksDBResourceContainer;

/** The RocksDB instance object. */
private RocksDBKv rocksDBKv;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ public class TestingLeaderEndpoint implements LeaderEndpoint {

private final ReplicaManager replicaManager;
private final ServerNode localNode;

/** The max size for the fetch response. */
private final int maxFetchSize;

/** The max fetch size for a bucket in bytes. */
private final int maxFetchSizeForBucket;

Expand Down
Loading
Loading