Skip to content

Commit 924f2df

Browse files
XuQianJin-Starswuchong
authored andcommitted
[common] Introduce ARRAY type for ARROW, COMPACTED and INDEXED formats
1 parent ae84521 commit 924f2df

File tree

102 files changed

+10940
-299
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+10940
-299
lines changed

fluss-client/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,15 @@
113113
<include>*:*</include>
114114
</includes>
115115
</artifactSet>
116+
<filters>
117+
<filter>
118+
<artifact>*</artifact>
119+
<excludes>
120+
<exclude>LICENSE-EPL-1.0.txt</exclude>
121+
<exclude>LICENSE-EDL-1.0.txt</exclude>
122+
</excludes>
123+
</filter>
124+
</filters>
116125
</configuration>
117126
</execution>
118127
</executions>

fluss-client/src/main/resources/META-INF/NOTICE

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ This project bundles the following dependencies under the Apache Software Licens
1010
- com.ververica:frocksdbjni:6.20.3-ververica-2.0
1111
- org.apache.commons:commons-lang3:3.18.0
1212
- org.apache.commons:commons-math3:3.6.1
13+
- org.jetbrains:annotations:26.0.1
1314
- org.lz4:lz4-java:1.8.0
1415

1516
This project bundles the following dependencies under the MIT (https://opensource.org/licenses/MIT)
@@ -20,4 +21,10 @@ See bundled license files for details.
2021
This project bundles the following dependencies under BSD License (https://opensource.org/licenses/bsd-license.php).
2122
See bundled license files for details.
2223

23-
- com.github.luben:zstd-jni:1.5.7-1
24+
- com.github.luben:zstd-jni:1.5.7-1
25+
26+
This project bundles the following dependencies under the Eclipse Distribution License 1.0 (https://www.eclipse.org/org/documents/edl-v10.php) and Eclipse Public License 1.0 (https://www.eclipse.org/legal/epl-v10.html).
27+
See bundled license files for details.
28+
29+
- org.eclipse.collections:eclipse-collections-api:11.1.0
30+
- org.eclipse.collections:eclipse-collections:11.1.0

fluss-common/pom.xml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,14 @@
6262
<artifactId>fluss-shaded-arrow</artifactId>
6363
</dependency>
6464

65-
<!-- TODO: these two dependencies need to be shaded. -->
65+
<!-- TODO: these three dependencies need to be shaded. -->
66+
<!-- Use the Arrow compatible version -->
67+
<dependency>
68+
<groupId>org.eclipse.collections</groupId>
69+
<artifactId>eclipse-collections</artifactId>
70+
<version>11.1.0</version>
71+
</dependency>
72+
6673
<dependency>
6774
<groupId>org.lz4</groupId>
6875
<artifactId>lz4-java</artifactId>
@@ -109,6 +116,12 @@
109116
<version>${iceberg.version}</version>
110117
<scope>test</scope>
111118
</dependency>
119+
<dependency>
120+
<groupId>org.jetbrains</groupId>
121+
<artifactId>annotations</artifactId>
122+
<version>26.0.1</version>
123+
<scope>compile</scope>
124+
</dependency>
112125
</dependencies>
113126

114127
<build>

fluss-common/src/main/java/org/apache/fluss/memory/AbstractPagedOutputView.java

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,17 @@ public abstract class AbstractPagedOutputView implements OutputView, MemorySegme
4545
/** The list of memory segments that have been fully written and are immutable now. */
4646
private List<MemorySegmentBytesView> finishedPages;
4747

48-
/** The current memory segment to write to. */
49-
private MemorySegment currentSegment;
48+
/** the current memory segment to write to. */
49+
protected MemorySegment currentSegment;
5050

5151
/** the offset in the current segment. */
52-
private int positionInSegment;
52+
protected int positionInSegment;
53+
54+
/** Flag indicating whether throw BufferExhaustedException or wait for available segment. */
55+
protected boolean waitingSegment;
56+
57+
/** the reusable array for UTF encodings. */
58+
protected byte[] utfBuffer;
5359

5460
protected AbstractPagedOutputView(MemorySegment initialSegment, int pageSize) {
5561
if (initialSegment == null) {
@@ -301,6 +307,62 @@ public void writeDouble(double v) throws IOException {
301307
}
302308
}
303309

310+
@Override
311+
public void write(int b) throws IOException {
312+
writeByte(b);
313+
}
314+
315+
@Override
316+
public void writeChar(int v) throws IOException {
317+
if (positionInSegment < pageSize - 1) {
318+
currentSegment.putChar(positionInSegment, (char) v);
319+
positionInSegment += 2;
320+
} else if (positionInSegment == pageSize) {
321+
advance();
322+
writeChar(v);
323+
} else {
324+
writeByte(v);
325+
writeByte(v >> 8);
326+
}
327+
}
328+
329+
@Override
330+
public void writeBytes(String s) throws IOException {
331+
if (s == null) {
332+
throw new NullPointerException();
333+
}
334+
for (int i = 0; i < s.length(); i++) {
335+
writeByte(s.charAt(i));
336+
}
337+
}
338+
339+
@Override
340+
public void writeChars(String s) throws IOException {
341+
if (s == null) {
342+
throw new NullPointerException();
343+
}
344+
for (int i = 0; i < s.length(); i++) {
345+
writeChar(s.charAt(i));
346+
}
347+
}
348+
349+
@Override
350+
public void writeUTF(String str) throws IOException {
351+
if (str == null) {
352+
throw new NullPointerException();
353+
}
354+
355+
byte[] bytes = str.getBytes(java.nio.charset.StandardCharsets.UTF_8);
356+
int length = bytes.length;
357+
358+
if (length > 65535) {
359+
throw new IOException("String too long for UTF encoding: " + length);
360+
}
361+
362+
writeShort(length);
363+
write(bytes);
364+
}
365+
304366
@Override
305367
public void write(MemorySegment segment, int off, int len) throws IOException {
306368
int remaining = pageSize - positionInSegment;

fluss-common/src/main/java/org/apache/fluss/memory/InputView.java

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
* contributor license agreements. See the NOTICE file distributed with
44
* this work for additional information regarding copyright ownership.
55
* The ASF licenses this file to You under the Apache License, Version 2.0
6-
* (the "License"); you may not use this file except in compliance with
7-
* the License. You may obtain a copy of the License at
6+
* (the "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
88
*
99
* http://www.apache.org/licenses/LICENSE-2.0
1010
*
@@ -23,8 +23,13 @@
2323
import java.io.IOException;
2424

2525
/**
26-
* A very similar interface to {@link java.io.DataInput} but reading multi-bytes primitive types in
27-
* little endian.
26+
* A specialized interface for reading multi-byte primitive types in little endian byte order. This
27+
* interface is designed to be independent of Java's standard DataInput interface to avoid
28+
* endianness confusion and API semantic violations.
29+
*
30+
* <p>Note: This interface uses little endian byte order, which is different from Java's standard
31+
* DataInput interface that uses big endian. If you need big endian compatibility, use the {@link
32+
* #asDataInput()} method to get a DataInput wrapper.
2833
*
2934
* @since 0.2
3035
*/
@@ -54,8 +59,8 @@ public interface InputView {
5459
byte readByte() throws IOException;
5560

5661
/**
57-
* Reads two input bytes and returns a {@code short} value. This method is suitable for reading
58-
* the bytes written by the {@link OutputView#writeShort(int)}.
62+
* Reads two input bytes and returns a {@code short} value in little endian byte order. This
63+
* method is suitable for reading the bytes written by the {@link OutputView#writeShort(int)}.
5964
*
6065
* @return the 16-bit value read.
6166
* @exception EOFException if this stream reaches the end before reading all the bytes.
@@ -64,8 +69,8 @@ public interface InputView {
6469
short readShort() throws IOException;
6570

6671
/**
67-
* Reads four input bytes and returns an {@code int} value. This method is suitable for reading
68-
* bytes written by the {@link OutputView#writeInt(int)}.
72+
* Reads four input bytes and returns an {@code int} value in little endian byte order. This
73+
* method is suitable for reading bytes written by the {@link OutputView#writeInt(int)}.
6974
*
7075
* @return the {@code int} value read.
7176
* @exception EOFException if this stream reaches the end before reading all the bytes.
@@ -74,8 +79,28 @@ public interface InputView {
7479
int readInt() throws IOException;
7580

7681
/**
77-
* Reads eight input bytes and returns a {@code long} value. This method is suitable for reading
78-
* bytes written by the {@link OutputView#writeLong(long)}.
82+
* Reads two input bytes and returns a {@code char} value in little endian byte order. This
83+
* method is suitable for reading bytes written by the {@link OutputView#writeChar(int)}.
84+
*
85+
* @return the {@code char} value read.
86+
* @exception EOFException if this stream reaches the end before reading all the bytes.
87+
* @exception IOException if an I/O error occurs.
88+
*/
89+
char readChar() throws IOException;
90+
91+
/**
92+
* Reads two input bytes and returns an unsigned 16-bit integer in little endian byte order.
93+
* This method is suitable for reading bytes written by the {@link OutputView#writeShort(int)}.
94+
*
95+
* @return the unsigned 16-bit value read.
96+
* @exception EOFException if this stream reaches the end before reading all the bytes.
97+
* @exception IOException if an I/O error occurs.
98+
*/
99+
int readUnsignedShort() throws IOException;
100+
101+
/**
102+
* Reads eight input bytes and returns a {@code long} value in little endian byte order. This
103+
* method is suitable for reading bytes written by the {@link OutputView#writeLong(long)}.
79104
*
80105
* @return the {@code long} value read.
81106
* @exception EOFException if this stream reaches the end before reading all the bytes.
@@ -161,4 +186,28 @@ public interface InputView {
161186
* @exception IOException if an I/O error occurs.
162187
*/
163188
void readFully(byte[] b, int offset, int len) throws IOException;
189+
190+
/**
191+
* Skips over and discards {@code n} bytes of data from the input stream. The skip method may,
192+
* for a variety of reasons, end up skipping over some smaller number of bytes, possibly 0. This
193+
* may result from any of a number of conditions; reaching end of file before {@code n} bytes
194+
* have been skipped is only one possibility. The actual number of bytes skipped is returned. If
195+
* {@code n} is negative, no bytes are skipped.
196+
*
197+
* @param n the number of bytes to be skipped.
198+
* @return the actual number of bytes skipped.
199+
* @exception IOException if an I/O error occurs.
200+
*/
201+
int skipBytes(int n) throws IOException;
202+
203+
/**
204+
* Reads a string that has been encoded using a modified UTF-8 format in little endian byte
205+
* order. This method is suitable for reading strings written by the {@link
206+
* OutputView#writeUTF(String)}.
207+
*
208+
* @return a Unicode string.
209+
* @exception EOFException if this stream reaches the end before reading all the bytes.
210+
* @exception IOException if an I/O error occurs.
211+
*/
212+
String readUTF() throws IOException;
164213
}

fluss-common/src/main/java/org/apache/fluss/memory/ManagedPagedOutputView.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.memory;
1919

2020
import java.io.IOException;
21+
import java.io.UTFDataFormatException;
2122
import java.util.ArrayList;
2223
import java.util.List;
2324

@@ -47,4 +48,94 @@ protected MemorySegment nextSegment() throws IOException {
4748
public List<MemorySegment> allocatedPooledSegments() {
4849
return pooledSegments;
4950
}
51+
52+
@Override
53+
public void write(int b) throws IOException {
54+
writeByte(b);
55+
}
56+
57+
@Override
58+
public void writeChar(int v) throws IOException {
59+
if (this.positionInSegment < this.pageSize - 1) {
60+
this.currentSegment.putCharBigEndian(this.positionInSegment, (char) v);
61+
this.positionInSegment += 2;
62+
} else if (this.positionInSegment == this.pageSize) {
63+
advance();
64+
writeChar(v);
65+
} else {
66+
writeByte(v >> 8);
67+
writeByte(v);
68+
}
69+
}
70+
71+
@Override
72+
public void writeBytes(String s) throws IOException {
73+
for (int i = 0; i < s.length(); i++) {
74+
writeByte(s.charAt(i));
75+
}
76+
}
77+
78+
@Override
79+
public void writeChars(String s) throws IOException {
80+
for (int i = 0; i < s.length(); i++) {
81+
writeChar(s.charAt(i));
82+
}
83+
}
84+
85+
@Override
86+
public void writeUTF(String str) throws IOException {
87+
int strlen = str.length();
88+
int utflen = 0;
89+
int c, count = 0;
90+
91+
/* use charAt instead of copying String to char array */
92+
for (int i = 0; i < strlen; i++) {
93+
c = str.charAt(i);
94+
if ((c >= 0x0001) && (c <= 0x007F)) {
95+
utflen++;
96+
} else if (c > 0x07FF) {
97+
utflen += 3;
98+
} else {
99+
utflen += 2;
100+
}
101+
}
102+
103+
if (utflen > 65535) {
104+
throw new UTFDataFormatException("encoded string too long: " + utflen + " memory");
105+
}
106+
107+
if (this.utfBuffer == null || this.utfBuffer.length < utflen + 2) {
108+
this.utfBuffer = new byte[utflen + 2];
109+
}
110+
final byte[] bytearr = this.utfBuffer;
111+
112+
bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
113+
bytearr[count++] = (byte) (utflen & 0xFF);
114+
115+
int i;
116+
for (i = 0; i < strlen; i++) {
117+
c = str.charAt(i);
118+
if (!((c >= 0x0001) && (c <= 0x007F))) {
119+
break;
120+
}
121+
bytearr[count++] = (byte) c;
122+
}
123+
124+
for (; i < strlen; i++) {
125+
c = str.charAt(i);
126+
if ((c >= 0x0001) && (c <= 0x007F)) {
127+
bytearr[count++] = (byte) c;
128+
129+
} else if (c > 0x07FF) {
130+
bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
131+
bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
132+
bytearr[count++] = (byte) (0x80 | (c & 0x3F));
133+
} else {
134+
bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
135+
bytearr[count++] = (byte) (0x80 | (c & 0x3F));
136+
}
137+
}
138+
139+
write(bytearr, 0, utflen + 2);
140+
}
50141
}

fluss-common/src/main/java/org/apache/fluss/memory/MemorySegment.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.DataInput;
2525
import java.io.DataOutput;
2626
import java.io.IOException;
27+
import java.io.Serializable;
2728
import java.nio.BufferOverflowException;
2829
import java.nio.BufferUnderflowException;
2930
import java.nio.ByteBuffer;
@@ -65,7 +66,7 @@
6566
* implementations on invocations of abstract methods.
6667
*/
6768
@Internal
68-
public final class MemorySegment {
69+
public final class MemorySegment implements Serializable {
6970

7071
/** The unsafe handle for transparent memory copied (heap / off-heap). */
7172
private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;

0 commit comments

Comments
 (0)