1616import java .nio .ByteBuffer ;
1717import java .nio .channels .FileChannel ;
1818import java .nio .channels .WritableByteChannel ;
19+ import java .nio .file .Files ;
1920import java .nio .file .Path ;
2021import java .nio .file .Paths ;
2122import java .time .Instant ;
22- import java .util .ArrayList ;
23- import java .util .LinkedHashMap ;
24- import java .util .List ;
25- import java .util .Map ;
23+ import java .util .* ;
24+ import java .util .concurrent .* ;
25+ import java .util .concurrent . atomic . AtomicBoolean ;
26+ import java .util .concurrent . atomic . AtomicLong ;
2627
2728import static java .nio .charset .StandardCharsets .UTF_8 ;
2829import static java .nio .file .StandardOpenOption .*;
@@ -33,7 +34,9 @@ public class DedupeTool {
3334 private boolean verbose ;
3435 private boolean dryRun ;
3536 private boolean quiet ;
36- private LruCache <WarcDigest , CacheValue > digestCache ;
37+ private int threads = Runtime .getRuntime ().availableProcessors ();
38+ private Map <WarcDigest , CacheValue > digestCache ;
39+ private final AtomicLong errors = new AtomicLong ();
3740
3841 private static class LruCache <K , V > extends LinkedHashMap <K , V > {
3942 private final int maxSize ;
@@ -151,8 +154,6 @@ record = reader.next().orElse(null);
151154 }
152155 }
153156
154-
155-
156157 private String formatBytes (long bytes ) {
157158 if (bytes < 1024 ) return bytes + "B" ;
158159 if (bytes < 1024 * 1024 ) return String .format ("%.2fKB" , bytes / 1024.0 );
@@ -264,6 +265,10 @@ public static void main(String[] args) throws IOException {
264265 case "--minimum-size" :
265266 dedupeTool .setMinimumSize (Long .parseLong (args [++i ]));
266267 break ;
268+ case "-j" :
269+ case "--threads" :
270+ dedupeTool .setThreads (Integer .parseInt (args [++i ]));
271+ break ;
267272 case "-h" :
268273 case "--help" :
269274 System .out .println ("Usage: jwarc dedupe [options] [warc-files...]" );
@@ -272,6 +277,7 @@ public static void main(String[] args) throws IOException {
272277 System .out .println (" --cache-size N Cache N digests for de-duplication (enables cross-URI de-duplication)" );
273278 System .out .println (" --cdx-server URL De-deduplicate against a remote CDX server" );
274279 System .out .println (" --minimum-size BYTES Minimum payload size to consider de-duplicating (default " + dedupeTool .minimumSize + ")" );
280+ System .out .println (" -j, --threads N Number of threads for parallel processing (default " + dedupeTool .threads + ")" );
275281 System .out .println (" -n, --dry-run Don't write output, just calculate and print deduplication statistics" );
276282 System .out .println (" -q, --quiet Don't print deduplication statistics" );
277283 System .out .println (" -v, --verbose Verbose output" );
@@ -299,21 +305,42 @@ public static void main(String[] args) throws IOException {
299305 }
300306 }
301307
302- for (Path infile : infiles ) {
303- try {
304- Path outfile = dedupeTool .dryRun ? null : determineOutputPath (infile );
305- dedupeTool .deduplicateWarcFile (infile , outfile );
306- } catch (IOException e ) {
307- System .err .println ("Failed to deduplicate " + infile + ": " + e .getMessage ());
308- if (!dedupeTool .quiet ) e .printStackTrace (System .err );
309- System .exit (1 );
310- return ;
308+ dedupeTool .run (infiles );
309+ }
310+
311+ private void run (List <Path > infiles ) throws IOException {
312+ ForkJoinPool pool = new ForkJoinPool (threads );
313+ try {
314+ pool .submit (() -> infiles .parallelStream ().forEach (this ::deduplicateWarcFile )).get (); // wait for everything
315+ } catch (InterruptedException |ExecutionException e ) {
316+ Thread .currentThread ().interrupt ();
317+ System .exit (1 );
318+ } finally {
319+ pool .shutdown ();
320+ }
321+ if (errors .get () > 0 ) System .exit (1 );
322+ }
323+
324+ private void deduplicateWarcFile (Path infile ) {
325+ Path outfile = dryRun ? null : determineOutputPath (infile );
326+ try {
327+ deduplicateWarcFile (infile , outfile );
328+ } catch (IOException e ) {
329+ System .err .println ("Failed to dedupe " + infile + ": " + e );
330+ if (outfile != null ) {
331+ try {
332+ Files .deleteIfExists (outfile );
333+ } catch (IOException ex ) {
334+ System .err .println ("Failed to delete " + outfile + ": " + ex );
335+ }
311336 }
337+ if (!quiet ) e .printStackTrace (System .err );
338+ errors .incrementAndGet ();
312339 }
313340 }
314341
315342 public void setCacheSize (int cacheSize ) {
316- digestCache = cacheSize > 0 ? new LruCache <>(cacheSize ) : null ;
343+ digestCache = cacheSize > 0 ? Collections . synchronizedMap ( new LruCache <>(cacheSize ) ) : null ;
317344 }
318345
319346 public void setMinimumSize (long minimumSize ) {
@@ -331,4 +358,8 @@ public void setDryRun(boolean dryRun) {
331358 public void setQuiet (boolean quiet ) {
332359 this .quiet = quiet ;
333360 }
361+
362+ public void setThreads (int threads ) {
363+ this .threads = Math .max (1 , threads );
364+ }
334365}
0 commit comments