Skip to content

Commit 9cdbe2b

Browse files
committed
Add WarcCaptureReader and WarcCapture (experimental)
This is a higher level reader API which operates on capture events rather than individual records. It is intended to make the common case of listing the captures in a WARC file easier by not having to handle the individual record types.
1 parent 9eafca9 commit 9cdbe2b

File tree

3 files changed

+392
-0
lines changed

3 files changed

+392
-0
lines changed
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (C) 2025 National Library of Australia
4+
*/
5+
6+
package org.netpreserve.jwarc;
7+
8+
import java.io.IOException;
9+
import java.net.URI;
10+
import java.time.Instant;
11+
import java.util.Collections;
12+
import java.util.List;
13+
import java.util.Optional;
14+
15+
/**
16+
* Represents a capture event in a WARC file, which consists of one or more sequential WARC records linked by
17+
* WARC-Concurrent-To headers.
18+
* <p>
19+
* EXPERIMENTAL API: May change or be removed without notice.
20+
*/
21+
public class WarcCapture {
22+
private final WarcCaptureReader reader;
23+
private final ConcurrentRecordSet concurrentSet;
24+
private final WarcCaptureRecord mainRecord;
25+
private final List<WarcCaptureRecord> records;
26+
private final Warcinfo warcinfo;
27+
28+
WarcCapture(WarcCaptureReader reader, ConcurrentRecordSet concurrentSet, WarcCaptureRecord mainRecord,
29+
List<WarcCaptureRecord> records, Warcinfo warcinfo) {
30+
this.reader = reader;
31+
this.concurrentSet = concurrentSet;
32+
this.mainRecord = mainRecord;
33+
this.records = records;
34+
this.warcinfo = warcinfo;
35+
}
36+
37+
public String target() {
38+
return mainRecord.target();
39+
}
40+
41+
public URI targetURI() {
42+
return mainRecord.targetURI();
43+
}
44+
45+
public Instant date() {
46+
return mainRecord.date();
47+
}
48+
49+
/**
50+
* Returns the HTTP request method, if available.
51+
*/
52+
public Optional<String> method() {
53+
try {
54+
Optional<WarcRequest> request = request();
55+
if (request.isPresent()) {
56+
return Optional.of(request.get().http().method());
57+
} else {
58+
return Optional.empty();
59+
}
60+
} catch (Exception e) {
61+
return Optional.empty();
62+
}
63+
}
64+
65+
private Optional<HttpResponse> httpResponse() {
66+
try {
67+
if (mainRecord instanceof WarcResponse) {
68+
return Optional.of(((WarcResponse) mainRecord).http());
69+
} else if (mainRecord instanceof WarcRevisit) {
70+
return Optional.of(((WarcRevisit) mainRecord).http());
71+
} else {
72+
return Optional.empty();
73+
}
74+
} catch (Exception e) {
75+
return Optional.empty();
76+
}
77+
}
78+
79+
/**
80+
* Returns the HTTP response status code, if available.
81+
*/
82+
public Optional<Integer> status() {
83+
return httpResponse().map(HttpResponse::status);
84+
}
85+
86+
/**
87+
* Returns the content type of the payload, if available.
88+
*/
89+
public Optional<MediaType> contentType() {
90+
try {
91+
// TODO: support non-HTTP records
92+
if (mainRecord instanceof WarcResponse) {
93+
return ((WarcResponse) mainRecord).http().headers()
94+
.first("Content-Type").map(MediaType::parseLeniently);
95+
} else if (mainRecord instanceof WarcRevisit) {
96+
return ((WarcRevisit) mainRecord).http().headers()
97+
.first("Content-Type").map(MediaType::parseLeniently);
98+
} else if (mainRecord instanceof WarcResource) {
99+
return mainRecord.headers().first("Content-Type").map(MediaType::parseLeniently);
100+
} else {
101+
return Optional.empty();
102+
}
103+
} catch (Exception e) {
104+
return Optional.empty();
105+
}
106+
}
107+
108+
/**
109+
* Returns the main WARC capture record for the current capture event. This will typically be the resource,
110+
* response or revisit record. If none of these are present, returns the first record in the capture event.
111+
*/
112+
public WarcCaptureRecord record() throws IOException {
113+
return mainRecord;
114+
}
115+
116+
/**
117+
* Returns the list of all WARC records associated with this capture event.
118+
*/
119+
public List<WarcCaptureRecord> records() throws IOException {
120+
while (true) {
121+
WarcCaptureRecord record = reader.readConcurrentTo(concurrentSet);
122+
if (record == null) break;
123+
records.add(record);
124+
// TODO: buffer secondary records
125+
}
126+
return Collections.unmodifiableList(records);
127+
}
128+
129+
/**
130+
* Returns the first metadata record associated with the current capture event, if present.
131+
*
132+
* @return an {@link Optional} containing the WARC metadata record if found, otherwise an empty {@link Optional}.
133+
* @throws IOException if an error occurs while reading WARC records.
134+
*/
135+
public Optional<WarcMetadata> metadata() throws IOException {
136+
return findRecord(WarcMetadata.class);
137+
}
138+
139+
/**
140+
* Returns the first request record associated with the current capture event, if present.
141+
*
142+
* @return an {@link Optional} containing the WARC request record if found, otherwise an empty {@link Optional}.
143+
* @throws IOException if an error occurs while reading WARC records.
144+
*/
145+
public Optional<WarcRequest> request() throws IOException {
146+
return findRecord(WarcRequest.class);
147+
}
148+
149+
/**
150+
* Returns the warcinfo record associated with the current capture event, if present.
151+
*/
152+
public Optional<Warcinfo> warcinfo() throws IOException {
153+
return Optional.ofNullable(warcinfo);
154+
}
155+
156+
private <T extends WarcCaptureRecord> Optional<T> findRecord(Class<T> type) throws IOException {
157+
for (WarcCaptureRecord record : records) {
158+
if (type.isInstance(record)) return Optional.of(type.cast(record));
159+
}
160+
while (true) {
161+
WarcCaptureRecord record = reader.readConcurrentTo(concurrentSet);
162+
records.add(record);
163+
// TODO: buffer secondary records
164+
if (record == null) break;
165+
if (type.isInstance(record)) return Optional.of(type.cast(record));
166+
}
167+
return Optional.empty();
168+
}
169+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (C) 2025 National Library of Australia
4+
*/
5+
6+
package org.netpreserve.jwarc;
7+
8+
import java.io.Closeable;
9+
import java.io.IOException;
10+
import java.io.InputStream;
11+
import java.nio.channels.ReadableByteChannel;
12+
import java.nio.file.Path;
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
import java.util.Optional;
16+
17+
/**
18+
* A WARC file reader which groups a sequence of concurrent records into capture events.
19+
* <p>
20+
* EXPERIMENTAL API: May change or be removed without notice.
21+
*/
22+
public class WarcCaptureReader implements Closeable {
23+
private final WarcReader reader;
24+
private WarcCaptureRecord nextRecord;
25+
private Warcinfo warcinfo;
26+
27+
public WarcCaptureReader(WarcReader reader) {
28+
this.reader = reader;
29+
}
30+
31+
public WarcCaptureReader(ReadableByteChannel channel) throws IOException {
32+
this(new WarcReader(channel));
33+
}
34+
35+
public WarcCaptureReader(InputStream stream) throws IOException {
36+
this(new WarcReader(stream));
37+
}
38+
39+
public WarcCaptureReader(Path path) throws IOException {
40+
this(new WarcReader(path));
41+
}
42+
43+
private WarcCaptureRecord peek() throws IOException {
44+
while (nextRecord == null) {
45+
WarcRecord record = reader.next().orElse(null);
46+
if (record == null) return null;
47+
if (record instanceof WarcCaptureRecord) {
48+
nextRecord = (WarcCaptureRecord) record;
49+
} else if (record instanceof Warcinfo) {
50+
warcinfo = (Warcinfo) record;
51+
}
52+
}
53+
return nextRecord;
54+
}
55+
56+
private WarcCapture read() throws IOException {
57+
WarcCaptureRecord record;
58+
ConcurrentRecordSet concurrentSet = new ConcurrentRecordSet();
59+
List<WarcCaptureRecord> records = new ArrayList<>();
60+
// read until we find a main record (response, resource or revisit)
61+
do {
62+
record = peek();
63+
if (record == null) return null;
64+
if (!concurrentSet.contains(record)) {
65+
concurrentSet.clear();
66+
concurrentSet.add(record);
67+
records.clear();
68+
}
69+
records.add(record);
70+
// TODO: buffer secondary records
71+
nextRecord = null;
72+
} while (!(record instanceof WarcResponse || record instanceof WarcResource || record instanceof WarcRevisit));
73+
return new WarcCapture(this, concurrentSet, record, records, warcinfo);
74+
}
75+
76+
public Optional<WarcCapture> next() throws IOException {
77+
return Optional.ofNullable(read());
78+
}
79+
80+
WarcCaptureRecord readConcurrentTo(ConcurrentRecordSet concurrentSet) throws IOException {
81+
WarcCaptureRecord record = peek();
82+
if (record != null && concurrentSet.contains(record)) {
83+
nextRecord = null;
84+
return record;
85+
}
86+
return null;
87+
}
88+
89+
/**
90+
* Closes the underlying WarcReader.
91+
* @throws IOException if an I/O error occurs during the close operation.
92+
*/
93+
@Override
94+
public void close() throws IOException {
95+
reader.close();
96+
}
97+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (C) 2025 National Library of Australia
4+
*/
5+
6+
package org.netpreserve.jwarc;
7+
8+
import org.junit.Test;
9+
10+
import java.io.ByteArrayInputStream;
11+
import java.io.ByteArrayOutputStream;
12+
import java.io.IOException;
13+
import java.net.URI;
14+
import java.nio.channels.Channels;
15+
import java.util.List;
16+
import java.util.Optional;
17+
18+
import static org.junit.Assert.*;
19+
20+
public class WarcCaptureReaderTest {
21+
22+
@Test
23+
public void test() throws IOException {
24+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
25+
WarcWriter writer = new WarcWriter(Channels.newChannel(baos));
26+
27+
writer.write(new Warcinfo.Builder().filename("test.warc").build());
28+
29+
URI target = URI.create("http://example.org/");
30+
31+
WarcRequest request = new WarcRequest.Builder(target)
32+
.build();
33+
writer.write(request);
34+
35+
WarcMetadata metadata = new WarcMetadata.Builder()
36+
.targetURI(target)
37+
.concurrentTo(request.id())
38+
.build();
39+
writer.write(metadata);
40+
41+
WarcResponse response = new WarcResponse.Builder(target)
42+
.concurrentTo(request.id())
43+
.build();
44+
writer.write(response);
45+
46+
// A second capture
47+
URI target2 = URI.create("http://example.com/");
48+
WarcRequest request2 = new WarcRequest.Builder(target2)
49+
.build();
50+
writer.write(request2);
51+
52+
WarcResponse response2 = new WarcResponse.Builder(target2)
53+
.concurrentTo(request2.id())
54+
.build();
55+
writer.write(response2);
56+
57+
WarcCaptureReader reader = new WarcCaptureReader(new ByteArrayInputStream(baos.toByteArray()));
58+
WarcCapture capture = reader.next().get();
59+
assertEquals(Optional.of("test.warc"), capture.warcinfo().get().filename());
60+
assertEquals(target.toString(), capture.target());
61+
List<WarcCaptureRecord> records = capture.records();
62+
assertEquals(3, records.size());
63+
assertTrue(records.get(0) instanceof WarcRequest);
64+
assertTrue(records.get(1) instanceof WarcMetadata);
65+
assertTrue(records.get(2) instanceof WarcResponse);
66+
67+
capture = reader.next().get();
68+
assertEquals(target2.toString(), capture.target());
69+
records = capture.records();
70+
assertEquals(2, records.size());
71+
assertTrue(records.get(0) instanceof WarcRequest);
72+
assertTrue(records.get(1) instanceof WarcResponse);
73+
74+
assertFalse(reader.next().isPresent());
75+
}
76+
77+
@Test
78+
public void testRevisit() throws IOException {
79+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
80+
WarcWriter writer = new WarcWriter(Channels.newChannel(baos));
81+
82+
URI target = URI.create("http://example.org/");
83+
84+
WarcRequest request = new WarcRequest.Builder(target)
85+
.build();
86+
writer.write(request);
87+
88+
WarcRevisit revisit = new WarcRevisit.Builder(target, WarcRevisit.IDENTICAL_PAYLOAD_DIGEST_1_0)
89+
.concurrentTo(request.id())
90+
.build();
91+
writer.write(revisit);
92+
93+
WarcCaptureReader reader = new WarcCaptureReader(new ByteArrayInputStream(baos.toByteArray()));
94+
WarcCapture capture = reader.next().get();
95+
assertEquals(target.toString(), capture.target());
96+
List<WarcCaptureRecord> records = capture.records();
97+
assertEquals(2, records.size());
98+
assertTrue(records.get(0) instanceof WarcRequest);
99+
assertTrue(records.get(1) instanceof WarcRevisit);
100+
}
101+
102+
@Test
103+
public void testMetadataFirst() throws IOException {
104+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
105+
WarcWriter writer = new WarcWriter(Channels.newChannel(baos));
106+
107+
URI target = URI.create("http://example.org/");
108+
109+
WarcMetadata metadata = new WarcMetadata.Builder()
110+
.targetURI(target)
111+
.build();
112+
writer.write(metadata);
113+
114+
WarcResponse response = new WarcResponse.Builder(target)
115+
.concurrentTo(metadata.id())
116+
.build();
117+
writer.write(response);
118+
119+
WarcCaptureReader reader = new WarcCaptureReader(new ByteArrayInputStream(baos.toByteArray()));
120+
WarcCapture capture = reader.next().get();
121+
List<WarcCaptureRecord> records = capture.records();
122+
assertEquals(2, records.size());
123+
assertTrue(records.get(0) instanceof WarcMetadata);
124+
assertTrue(records.get(1) instanceof WarcResponse);
125+
}
126+
}

0 commit comments

Comments
 (0)