Skip to content

Commit d629064

Browse files
committed
Add support for reading warc-zstd files
1 parent ca08abc commit d629064

File tree

10 files changed

+324
-46
lines changed

10 files changed

+324
-46
lines changed

README.md

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -305,26 +305,27 @@ Note: revisit records never have a payload so
305305

306306
## Comparison
307307

308-
| Criteria | jwarc | [JWAT] | [webarchive-commons] |
309-
|---------------------|-------------|-----------------|---------------|
310-
| License | Apache 2 | Apache 2 | Apache 2 |
311-
| Parser based on | Ragel FSM | Hand-rolled FSM | Apache HTTP |
312-
| Push parsing | Low level |||
313-
| Folded headers † ||||
314-
| [Encoded words] † || ✘ (disabled) ||
315-
| Validation | The basics |||
316-
| Strict parsing ‡ ||||
317-
| Lenient parsing | HTTP only |||
318-
| Multi-value headers ||||
319-
| I/O Framework | NIO | IO | IO |
320-
| Record type classes ||||
321-
| Typed accessors ||| Some |
322-
| GZIP detection ||| Filename only |
323-
| WARC writer | Barebones |||
324-
| ARC reader | Auto | Separate API | Factory |
325-
| ARC writer ||||
326-
| Speed * (.warc) | 1x | ~5x slower | ~13x slower |
327-
| Speed * (.warc.gz) | 1x | ~1.4x slower | ~2.8x slower |
308+
| Criteria | jwarc | [JWAT] | [webarchive-commons] |
309+
|---------------------|------------|-----------------|----------------------|
310+
| License | Apache 2 | Apache 2 | Apache 2 |
311+
| Parser based on | Ragel FSM | Hand-rolled FSM | Apache HTTP |
312+
| Push parsing | Low level |||
313+
| Folded headers † ||||
314+
| [Encoded words] † || ✘ (disabled) ||
315+
| Validation | The basics |||
316+
| Strict parsing ‡ ||||
317+
| Lenient parsing | HTTP only |||
318+
| Multi-value headers ||||
319+
| I/O Framework | NIO | IO | IO |
320+
| Record type classes ||||
321+
| Typed accessors ||| Some |
322+
| GZIP detection ||| Filename only |
323+
| [zstd compression] | read-only |||
324+
| WARC writer | Barebones |||
325+
| ARC reader | Auto | Separate API | Factory |
326+
| ARC writer ||||
327+
| Speed * (.warc) | 1x | ~5x slower | ~13x slower |
328+
| Speed * (.warc.gz) | 1x | ~1.4x slower | ~2.8x slower |
328329

329330
(†) WARC features copied from HTTP that have since been deprecated in HTTP. I'm not aware of any software that writes
330331
WARCs using these features and usage of them should probably be avoided. JWAT behaves differently from jwarc and
@@ -345,6 +346,7 @@ See also: [Unaffiliated benchmark against other languages](https://code402.com/h
345346
[JWAT]: https://sbforge.org/display/JWAT/JWAT
346347
[webarchive-commons]: https://github.com/iipc/webarchive-commons
347348
[Encoded words]: https://www.ietf.org/rfc/rfc2047.txt
349+
[zstd compression]: https://iipc.github.io/warc-specifications/specifications/warc-zstd/
348350
349351
### Other WARC libraries
350352

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@
7777
<scope>compile</scope>
7878
<optional>true</optional>
7979
</dependency>
80+
<dependency>
81+
<groupId>com.github.luben</groupId>
82+
<artifactId>zstd-jni</artifactId>
83+
<version>1.5.7-4</version>
84+
</dependency>
8085
</dependencies>
8186

8287
<properties>
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (C) 2025 National Library of Australia and the jwarc contributors
4+
*/
5+
6+
package org.netpreserve.jwarc;
7+
8+
import java.io.IOException;
9+
10+
interface DecompressingChannel {
11+
12+
/**
13+
* Number of bytes read from the underlying channel.
14+
*/
15+
long inputPosition();
16+
17+
/**
18+
* Reset the decompressor, discarding any buffered data.
19+
*/
20+
void reset() throws IOException;
21+
}

src/org/netpreserve/jwarc/GunzipChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
* SPDX-License-Identifier: Apache-2.0
3-
* Copyright (C) 2018-2022 National Library of Australia and the jwarc contributors
3+
* Copyright (C) 2018-2025 National Library of Australia and the jwarc contributors
44
*/
55

66
package org.netpreserve.jwarc;
@@ -15,7 +15,7 @@
1515
import java.util.zip.Inflater;
1616
import java.util.zip.ZipException;
1717

18-
class GunzipChannel implements ReadableByteChannel {
18+
class GunzipChannel implements ReadableByteChannel, DecompressingChannel {
1919
private static final int FHCRC = 2;
2020
private static final int FEXTRA = 4;
2121
private static final int FNAME = 8;

src/org/netpreserve/jwarc/IOUtils.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
/*
22
* SPDX-License-Identifier: Apache-2.0
3-
* Copyright (C) 2018 National Library of Australia and the jwarc contributors
3+
* Copyright (C) 2018-2025 National Library of Australia and the jwarc contributors
44
*/
55

66
package org.netpreserve.jwarc;
77

88
import javax.net.ssl.SSLSocketFactory;
9+
import java.io.EOFException;
910
import java.io.IOException;
1011
import java.io.InputStream;
1112
import java.io.OutputStream;
@@ -22,6 +23,7 @@ public final class IOUtils {
2223

2324
/**
2425
* Transfers as many bytes as possible from src to dst.
26+
*
2527
* @return the number of bytes transferred.
2628
*/
2729
static int transfer(ByteBuffer src, ByteBuffer dst) {
@@ -30,10 +32,11 @@ static int transfer(ByteBuffer src, ByteBuffer dst) {
3032

3133
/**
3234
* Transfers up to limits from src to dst.
35+
*
3336
* @return the number of bytes transferred.
3437
*/
3538
static int transfer(ByteBuffer src, ByteBuffer dst, long limit) {
36-
return transferExactly(src, dst, (int)Math.min(Math.min(src.remaining(), dst.remaining()), limit));
39+
return transferExactly(src, dst, (int) Math.min(Math.min(src.remaining(), dst.remaining()), limit));
3740
}
3841

3942
private static int transferExactly(ByteBuffer src, ByteBuffer dst, int n) {
@@ -125,7 +128,7 @@ static Socket connect(String scheme, String host, int port) throws IOException {
125128

126129
public static byte[] readNBytes(InputStream stream, int n) throws IOException {
127130
byte[] buffer = new byte[n];
128-
for (int remaining = n; remaining > 0;) {
131+
for (int remaining = n; remaining > 0; ) {
129132
int read = stream.read(buffer, buffer.length - remaining, remaining);
130133
if (read < 0) {
131134
return Arrays.copyOf(buffer, buffer.length - remaining);
@@ -134,4 +137,26 @@ public static byte[] readNBytes(InputStream stream, int n) throws IOException {
134137
}
135138
return buffer;
136139
}
140+
141+
/**
142+
* Ensures that at least the specified number of bytes are available in the buffer by reading from the provided channel
143+
* if necessary. If the buffer already contains enough bytes, no data is read. If the channel reaches EOF before the
144+
* required number of bytes is available, an EOFException is thrown.
145+
*
146+
* @return the total number of bytes read from the channel to ensure the required availability in the buffer.
147+
* @throws IOException if an I/O error occurs while reading from the channel.
148+
* @throws EOFException if the channel reaches EOF before the required number of bytes are available.
149+
*/
150+
static int ensureAvailable(ReadableByteChannel channel, ByteBuffer buffer, int needed) throws IOException {
151+
int totalRead = 0;
152+
if (buffer.remaining() >= needed) return 0;
153+
buffer.compact();
154+
while (buffer.position() < needed) {
155+
int n = channel.read(buffer);
156+
if (n == -1) throw new EOFException("expected " + (needed - totalRead) + " more bytes in channel");
157+
totalRead += n;
158+
}
159+
buffer.flip();
160+
return totalRead;
161+
}
137162
}

src/org/netpreserve/jwarc/WarcCompression.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
/*
22
* SPDX-License-Identifier: Apache-2.0
3-
* Copyright (C) 2018 National Library of Australia and the jwarc contributors
3+
* Copyright (C) 2018-2025 National Library of Australia and the jwarc contributors
44
*/
55

66
package org.netpreserve.jwarc;
77

88
import java.nio.file.Path;
99

1010
public enum WarcCompression {
11-
NONE, GZIP;
11+
NONE, GZIP, ZSTD;
1212

1313
static WarcCompression forPath(Path path) {
14-
if (path.getFileName().toString().endsWith(".gz")) {
14+
String filename = path.getFileName().toString();
15+
if (filename.endsWith(".gz")) {
1516
return GZIP;
17+
} else if (filename.endsWith(".zst")) {
18+
return ZSTD;
1619
} else {
1720
return NONE;
1821
}

src/org/netpreserve/jwarc/WarcReader.java

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
/*
22
* SPDX-License-Identifier: Apache-2.0
3-
* Copyright (C) 2018-2022 National Library of Australia and the jwarc contributors
3+
* Copyright (C) 2018-2025 National Library of Australia and the jwarc contributors
44
*/
55

66
package org.netpreserve.jwarc;
77

88
import java.io.*;
9+
import java.lang.reflect.InvocationTargetException;
910
import java.nio.ByteBuffer;
1011
import java.nio.ByteOrder;
1112
import java.nio.channels.Channels;
@@ -61,7 +62,7 @@ public WarcReader(ReadableByteChannel channel, ByteBuffer buffer) throws IOExcep
6162
startPosition = tryPosition(channel);
6263
position = startPosition;
6364

64-
while (buffer.remaining() < 2) {
65+
while (buffer.remaining() < 4) {
6566
buffer.compact();
6667
int n = channel.read(buffer);
6768
buffer.flip();
@@ -78,17 +79,40 @@ public WarcReader(ReadableByteChannel channel, ByteBuffer buffer) throws IOExcep
7879
}
7980
}
8081

81-
if ((buffer.order() == ByteOrder.LITTLE_ENDIAN && buffer.getShort(buffer.position()) == (short) 0x8b1f)
82-
|| (buffer.order() == ByteOrder.BIG_ENDIAN && buffer.getShort(buffer.position()) == 0x1f8b)) {
83-
this.channel = new GunzipChannel(channel, buffer);
84-
this.buffer = ByteBuffer.allocate(8192);
85-
this.buffer.flip();
86-
compression = WarcCompression.GZIP;
87-
} else {
88-
this.channel = channel;
89-
this.buffer = buffer;
90-
compression = WarcCompression.NONE;
82+
ByteOrder originalOrder = buffer.order();
83+
try {
84+
buffer.order(ByteOrder.LITTLE_ENDIAN);
85+
86+
if (buffer.getShort(buffer.position()) == GzipChannel.GZIP_MAGIC) {
87+
this.channel = new GunzipChannel(channel, buffer);
88+
this.buffer = ByteBuffer.allocate(8192);
89+
this.buffer.flip();
90+
compression = WarcCompression.GZIP;
91+
} else if (buffer.getInt(buffer.position()) == 0xfd2fb528 || buffer.getInt(buffer.position()) == 0x184D2A5D) {
92+
try {
93+
this.channel = (ReadableByteChannel) Class.forName("org.netpreserve.jwarc.ZstdDecompressingChannel")
94+
.getConstructor(ReadableByteChannel.class, ByteBuffer.class)
95+
.newInstance(channel, buffer);
96+
} catch (InstantiationException | IllegalAccessException | InvocationTargetException |
97+
NoSuchMethodException | ClassNotFoundException e) {
98+
throw new IOException(e);
99+
}
100+
this.buffer = ByteBuffer.allocate(8192);
101+
this.buffer.flip();
102+
compression = WarcCompression.ZSTD;
103+
104+
// update position in case we read a dictionary frame
105+
position = ((DecompressingChannel) this.channel).inputPosition();
106+
} else {
107+
this.channel = channel;
108+
this.buffer = buffer;
109+
compression = WarcCompression.NONE;
110+
}
111+
112+
} finally {
113+
buffer.order(originalOrder);
91114
}
115+
92116
underlyingChannel = channel;
93117
}
94118

@@ -150,8 +174,8 @@ public Optional<WarcRecord> next() throws IOException {
150174
record.body().close();
151175
long trailerLength = consumeTrailer();
152176

153-
if (channel instanceof GunzipChannel) {
154-
position = startPosition + ((GunzipChannel) channel).inputPosition();
177+
if (channel instanceof DecompressingChannel) {
178+
position = startPosition + ((DecompressingChannel) channel).inputPosition();
155179
} else {
156180
position += headerLength + record.body().size() + trailerLength;
157181
}
@@ -309,8 +333,8 @@ public void position(long newPosition) throws IOException {
309333
record.body().close();
310334
record = null;
311335
}
312-
if (compression == WarcCompression.GZIP) {
313-
((GunzipChannel)channel).reset();
336+
if (channel instanceof DecompressingChannel) {
337+
((DecompressingChannel)channel).reset();
314338
}
315339
}
316340

src/org/netpreserve/jwarc/WarcWriter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
* SPDX-License-Identifier: Apache-2.0
3-
* Copyright (C) 2018-2023 National Library of Australia and the jwarc contributors
3+
* Copyright (C) 2018-2025 National Library of Australia and the jwarc contributors
44
*/
55

66
package org.netpreserve.jwarc;
@@ -47,8 +47,10 @@ public WarcWriter(WritableByteChannel channel, WarcCompression compression) thro
4747
this.compression = compression;
4848
if (compression == WarcCompression.GZIP) {
4949
this.channel = new GzipChannel(channel);
50-
} else {
50+
} else if (compression == WarcCompression.NONE || compression == null) {
5151
this.channel = channel;
52+
} else {
53+
throw new IllegalArgumentException("Unsupported compression: " + compression);
5254
}
5355

5456
if (channel instanceof SeekableByteChannel) {

0 commit comments

Comments
 (0)