Skip to content

Commit b068e42

Browse files
nddipiazzaNicholas DiPiazza
and
Nicholas DiPiazza
authored
TIKA-4252: add request metadata (#1753)
Co-authored-by: Nicholas DiPiazza <[email protected]>
1 parent 32baf23 commit b068e42

File tree

12 files changed

+52
-42
lines changed

12 files changed

+52
-42
lines changed

tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -455,33 +455,33 @@ private Fetcher getFetcher(FetchEmitTuple t) {
455455
}
456456
}
457457

458-
protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple t, Fetcher fetcher) {
459-
FetchKey fetchKey = t.getFetchKey();
458+
protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple fetchEmitTuple, Fetcher fetcher) {
459+
FetchKey fetchKey = fetchEmitTuple.getFetchKey();
460+
Metadata fetchResponseMetadata = new Metadata();
461+
Metadata fetchRequestMetadata = fetchEmitTuple.getMetadata();
460462
if (fetchKey.hasRange()) {
461463
if (!(fetcher instanceof RangeFetcher)) {
462464
throw new IllegalArgumentException(
463465
"fetch key has a range, but the fetcher is not a range fetcher");
464466
}
465-
Metadata metadata = t.getMetadata() == null ? new Metadata() : t.getMetadata();
466467
try (InputStream stream = ((RangeFetcher) fetcher).fetch(fetchKey.getFetchKey(),
467-
fetchKey.getRangeStart(), fetchKey.getRangeEnd(), metadata)) {
468-
return parseWithStream(t, stream, metadata);
468+
fetchKey.getRangeStart(), fetchKey.getRangeEnd(), fetchRequestMetadata, fetchResponseMetadata)) {
469+
return parseWithStream(fetchEmitTuple, stream, fetchResponseMetadata);
469470
} catch (SecurityException e) {
470-
LOG.error("security exception " + t.getId(), e);
471+
LOG.error("security exception " + fetchEmitTuple.getId(), e);
471472
throw e;
472473
} catch (TikaException | IOException e) {
473-
LOG.warn("fetch exception " + t.getId(), e);
474+
LOG.warn("fetch exception " + fetchEmitTuple.getId(), e);
474475
write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
475476
}
476477
} else {
477-
Metadata metadata = t.getMetadata() == null ? new Metadata() : t.getMetadata();
478-
try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
479-
return parseWithStream(t, stream, metadata);
478+
try (InputStream stream = fetcher.fetch(fetchEmitTuple.getFetchKey().getFetchKey(), fetchRequestMetadata, fetchResponseMetadata)) {
479+
return parseWithStream(fetchEmitTuple, stream, fetchResponseMetadata);
480480
} catch (SecurityException e) {
481-
LOG.error("security exception " + t.getId(), e);
481+
LOG.error("security exception " + fetchEmitTuple.getId(), e);
482482
throw e;
483483
} catch (TikaException | IOException e) {
484-
LOG.warn("fetch exception " + t.getId(), e);
484+
LOG.warn("fetch exception " + fetchEmitTuple.getId(), e);
485485
write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
486486
}
487487
}

tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public String getName() {
3030
}
3131

3232
@Override
33-
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
33+
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
3434
return null;
3535
}
3636
}

tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,11 @@ public interface Fetcher {
3333

3434
String getName();
3535

36-
InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException;
36+
default InputStream fetch(String fetchKey, Metadata fetchResponseMetadata)
37+
throws TikaException, IOException {
38+
return fetch(fetchKey, new Metadata(), fetchResponseMetadata);
39+
}
40+
41+
InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
42+
throws TikaException, IOException;
3743
}

tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@
2828
public interface RangeFetcher extends Fetcher {
2929
//At some point, Tika 3.x?, we may want to add optional ranges to the fetchKey?
3030

31-
InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata metadata)
32-
throws TikaException, IOException;
31+
default InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata fetchResponseMetadata)
32+
throws TikaException, IOException {
33+
return fetch(fetchKey, startOffset, endOffset, new Metadata(), fetchResponseMetadata);
34+
}
3335

36+
InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
37+
throws TikaException, IOException;
3438
}

tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ static boolean isDescendant(Path root, Path descendant) {
5858
}
5959

6060
@Override
61-
public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {
61+
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException {
6262

6363
if (fetchKey.contains("\u0000")) {
6464
throw new IllegalArgumentException("Path must not contain \u0000. " +
@@ -76,8 +76,8 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws IOException,
7676
p = Paths.get(fetchKey);
7777
}
7878

79-
metadata.set(TikaCoreProperties.SOURCE_PATH, fetchKey);
80-
updateFileSystemMetadata(p, metadata);
79+
fetchRequestMetadata.set(TikaCoreProperties.SOURCE_PATH, fetchKey);
80+
updateFileSystemMetadata(p, fetchRequestMetadata);
8181

8282
if (!Files.isRegularFile(p)) {
8383
if (basePath != null && !Files.isDirectory(basePath)) {
@@ -87,7 +87,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws IOException,
8787
}
8888
}
8989

90-
return TikaInputStream.get(p, metadata);
90+
return TikaInputStream.get(p, fetchRequestMetadata);
9191
}
9292

9393
private void updateFileSystemMetadata(Path p, Metadata metadata) throws IOException {

tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
public class UrlFetcher extends AbstractFetcher {
3636

3737
@Override
38-
public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {
38+
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException {
3939
if (fetchKey.contains("\u0000")) {
4040
throw new IllegalArgumentException("URL must not contain \u0000. " +
4141
"Please review the life decisions that led you to requesting " +
@@ -46,7 +46,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws IOException,
4646
"The UrlFetcher does not fetch from file shares; " +
4747
"please use the FileSystemFetcher");
4848
}
49-
return TikaInputStream.get(new URL(fetchKey), metadata);
49+
return TikaInputStream.get(new URL(fetchKey), fetchRequestMetadata);
5050
}
5151

5252
}

tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public String getName() {
3737
}
3838

3939
@Override
40-
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
40+
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
4141
return new ByteArrayInputStream(BYTES);
4242
}
4343
}

tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void checkInitialization(InitializableProblemHandler problemHandler)
6464

6565

6666
@Override
67-
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
67+
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
6868
return byteString == null ? new ByteArrayInputStream(new byte[0]) :
6969
new ByteArrayInputStream(byteString.getBytes(StandardCharsets.UTF_8));
7070
}

tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class AZBlobFetcher extends AbstractFetcher implements Initializable {
7070
private boolean spoolToTemp = true;
7171

7272
@Override
73-
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
73+
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
7474

7575
LOGGER.debug("about to fetch fetchkey={} from endpoint ({})", fetchKey, endpoint);
7676

@@ -81,7 +81,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws TikaExceptio
8181
BlobProperties properties = blobClient.getProperties();
8282
if (properties.getMetadata() != null) {
8383
for (Map.Entry<String, String> e : properties.getMetadata().entrySet()) {
84-
metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
84+
fetchRequestMetadata.add(PREFIX + ":" + e.getKey(), e.getValue());
8585
}
8686
}
8787
}
@@ -94,7 +94,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws TikaExceptio
9494
try (OutputStream os = Files.newOutputStream(tmp)) {
9595
blobClient.download(os);
9696
}
97-
TikaInputStream tis = TikaInputStream.get(tmp, metadata, tmpResources);
97+
TikaInputStream tis = TikaInputStream.get(tmp, fetchRequestMetadata, tmpResources);
9898
long elapsed = System.currentTimeMillis() - start;
9999
LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
100100
return tis;

tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class GCSFetcher extends AbstractFetcher implements Initializable {
5555
private boolean spoolToTemp = true;
5656

5757
@Override
58-
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
58+
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
5959

6060
LOGGER.debug("about to fetch fetchkey={} from bucket ({})", fetchKey, bucket);
6161

@@ -65,7 +65,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws TikaExceptio
6565
if (extractUserMetadata) {
6666
if (blob.getMetadata() != null) {
6767
for (Map.Entry<String, String> e : blob.getMetadata().entrySet()) {
68-
metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
68+
fetchRequestMetadata.add(PREFIX + ":" + e.getKey(), e.getValue());
6969
}
7070
}
7171
}
@@ -76,7 +76,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws TikaExceptio
7676
TemporaryResources tmpResources = new TemporaryResources();
7777
Path tmp = tmpResources.createTempFile();
7878
blob.downloadTo(tmp);
79-
TikaInputStream tis = TikaInputStream.get(tmp, metadata, tmpResources);
79+
TikaInputStream tis = TikaInputStream.get(tmp, fetchRequestMetadata, tmpResources);
8080
long elapsed = System.currentTimeMillis() - start;
8181
LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
8282
return tis;

tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public class HttpFetcher extends AbstractFetcher implements Initializable, Range
136136

137137

138138
@Override
139-
public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {
139+
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException {
140140
HttpGet get = new HttpGet(fetchKey);
141141
RequestConfig requestConfig =
142142
RequestConfig.custom()
@@ -146,21 +146,21 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws IOException,
146146
if (! StringUtils.isBlank(userAgent)) {
147147
get.setHeader(USER_AGENT, userAgent);
148148
}
149-
return execute(get, metadata, httpClient, true);
149+
return execute(get, fetchResponseMetadata, httpClient, true);
150150
}
151151

152152
@Override
153-
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
153+
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
154154
throws IOException {
155155
HttpGet get = new HttpGet(fetchKey);
156156
if (! StringUtils.isBlank(userAgent)) {
157157
get.setHeader(USER_AGENT, userAgent);
158158
}
159159
get.setHeader("Range", "bytes=" + startRange + "-" + endRange);
160-
return execute(get, metadata, httpClient, true);
160+
return execute(get, fetchResponseMetadata, httpClient, true);
161161
}
162162

163-
private InputStream execute(HttpGet get, Metadata metadata, HttpClient client,
163+
private InputStream execute(HttpGet get, Metadata fetchRequestMetadata, HttpClient client,
164164
boolean retryOnBadLength) throws IOException {
165165
HttpClientContext context = HttpClientContext.create();
166166
HttpResponse response = null;
@@ -183,15 +183,15 @@ public void run() {
183183
}
184184
response = client.execute(get, context);
185185

186-
updateMetadata(get.getURI().toString(), response, context, metadata);
186+
updateMetadata(get.getURI().toString(), response, context, fetchRequestMetadata);
187187

188188
int code = response.getStatusLine().getStatusCode();
189189
if (code < 200 || code > 299) {
190190
throw new IOException("bad status code: " + code + " :: " +
191191
responseToString(response));
192192
}
193193
try (InputStream is = response.getEntity().getContent()) {
194-
return spool(is, metadata);
194+
return spool(is, fetchRequestMetadata);
195195
}
196196
} catch (ConnectionClosedException e) {
197197

@@ -202,7 +202,7 @@ public void run() {
202202
//and then compresses the stream. See HTTPCLIENT-2176
203203
LOG.warn("premature end of content-length delimited message; retrying with " +
204204
"content compression disabled for {}", get.getURI());
205-
return execute(get, metadata, noCompressHttpClient, false);
205+
return execute(get, fetchRequestMetadata, noCompressHttpClient, false);
206206
}
207207
throw e;
208208
} catch (IOException e) {

tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,12 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
106106
private boolean pathStyleAccessEnabled = false;
107107

108108
@Override
109-
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
110-
return fetch(fetchKey, -1, -1, metadata);
109+
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
110+
return fetch(fetchKey, -1, -1, fetchRequestMetadata);
111111
}
112112

113113
@Override
114-
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
114+
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
115115
throws TikaException, IOException {
116116
String theFetchKey = StringUtils.isBlank(prefix) ? fetchKey : prefix + fetchKey;
117117

@@ -129,7 +129,7 @@ public InputStream fetch(String fetchKey, long startRange, long endRange, Metada
129129
do {
130130
try {
131131
long start = System.currentTimeMillis();
132-
InputStream is = _fetch(theFetchKey, metadata, startRange, endRange);
132+
InputStream is = _fetch(theFetchKey, fetchResponseMetadata, startRange, endRange);
133133
long elapsed = System.currentTimeMillis() - start;
134134
LOGGER.debug("total to fetch {}", elapsed);
135135
return is;

0 commit comments

Comments
 (0)