Skip to content

If the remote repository returns a 200 response header but does not return any data and hangs the connection, it will cause thread exhaustion #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.mastfrog.acteur.errors.ResponseException;
import com.mastfrog.acteur.headers.Headers;
Expand Down Expand Up @@ -75,6 +76,7 @@
*
* @author Tim Boudreau
*/
@Singleton
public class DownloaderV2A {

private final HttpClient client;
Expand All @@ -87,7 +89,7 @@ public class DownloaderV2A {
static final AtomicLong counter = new AtomicLong();
private final TempFiles tempFiles;
private final String runId;
private final String userAgent;
// private final String userAgent;
private final ExecutorService pool;

@Inject
Expand All @@ -104,7 +106,7 @@ public DownloaderV2A(HttpClient client, Config config, FileFinder finder,
this.control = control;
this.runId = runId;
this.tempFiles = tempFiles;
userAgent = "tmpx-" + ver.version;
// userAgent = "tmpx-" + ver.version;
}

String nextDownloadId() {
Expand All @@ -115,13 +117,15 @@ public boolean isFailedPath(Path path) {
return failedURLs.getIfPresent(path) != null;
}

CompletableFuture<TempFile> download(Path path, RequestID rid, DownloadReceiver recv) throws URISyntaxException {
CompletableFuture<TempFile> tf = download(path, rid);
CompletableFuture<TempFile> download(Path path, RequestID rid, String userAgent, DownloadReceiver recv) throws URISyntaxException {
CompletableFuture<TempFile> tf = download(path, rid, userAgent);
Logs requestLog = logger.child("download", rid);
tf.whenComplete((file, thrown) -> {
if (thrown != null) {
failedURLs.put(path, path);
// System.out.println("DO FAIL FOR " + thrown + " " + path);
if (!(thrown instanceof CancellationException)) {
failedURLs.put(path, path);
}
recv.failed(GONE, thrown.getMessage());
} else if (file != null) {
HttpResponseStatus status = file.info().map(info -> {
Expand All @@ -137,7 +141,7 @@ CompletableFuture<TempFile> download(Path path, RequestID rid, DownloadReceiver
return tf;
}

public CompletableFuture<TempFile> download(Path path, RequestID rid) throws URISyntaxException {
public CompletableFuture<TempFile> download(Path path, RequestID rid, String userAgent) throws URISyntaxException {
Collection<URL> urls = config.withPath(path);
List<CompletableFuture<TempFile>> futures = new ArrayList<>(urls.size());
Int remainder = Int.createAtomic();
Expand Down Expand Up @@ -216,8 +220,15 @@ public CompletableFuture<TempFile> download(Path path, RequestID rid) throws URI
lr.add("cancelled");
if (remaining == 0) {
// System.out.println("REMAINING 0 COMPLETE 1 " + path);
// result.completeExceptionally(new CancellationException("No result " + path));
// if timeout,cache No result url
result.completeExceptionally(new ResponseException(GONE, "No result " + path));
}
} else if (thrown instanceof ResponseException) {
lr.add("faild");
if (remaining == 0) {
result.completeExceptionally(thrown);
}
} else if (thrown != null) {
if (remaining == 0) {
// System.out.println("REMAINING 0 COMPLETE 2 " + path);
Expand All @@ -235,11 +246,16 @@ public CompletableFuture<TempFile> download(Path path, RequestID rid) throws URI
result.completeExceptionally(ex);
}
// }
});
}).orTimeout(20, TimeUnit.MINUTES);
// fut.whenComplete(onComplete);
BH bh = new BH(dlId, u, fut, perUrl);
futures.add(fut);
client.sendAsync(req, bh);
client.sendAsync(req, bh).orTimeout(15, TimeUnit.MINUTES).exceptionally((e) -> {
if(!fut.isCancelled()){
fut.cancel(true);
}
return null;
});
}
return result;
}
Expand Down Expand Up @@ -276,7 +292,9 @@ public BodySubscriber<TempFile> apply(HttpResponse.ResponseInfo info) {
logs.warn("request-failed")
.add("status", info.statusCode()).close();
// .add("headers", info.headers().map()).close();
result.cancel(true);
result.completeExceptionally(new ResponseException(HttpResponseStatus.valueOf(info.statusCode()),
HttpResponseStatus.valueOf(info.statusCode()).reasonPhrase()));
// result.cancel(true);
return NO_OP;
} else {
logs.info("potential-success")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package com.mastfrog.tinymavenproxy;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.mastfrog.acteur.server.ServerModule;
import static com.mastfrog.tinymavenproxy.GetActeur.isGzipCacheFile;
Expand All @@ -48,6 +49,7 @@
*
* @author Tim Boudreau
*/
@Singleton
public class FileFinder {

private final Config config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public class GetActeur extends Acteur {
Path pth = path.elideEmptyElements();
def.defer((Resumer res) -> {
config.debugLog(" defer and download ", pth);
CompletableFuture<TempFile> l = dl.download(pth, id, new DownloadReceiverImpl(res, config));
CompletableFuture<TempFile> l = dl.download(pth, id, req.header(Headers.USER_AGENT).toString(),
new DownloadReceiverImpl(res, config));
req.channel().closeFuture().addListener(cl -> {
l.cancel(false);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ static SettingsBuilder defaultSettings() {
.add("cors.enabled", false)
.add("download-tmp", System.getProperty("java.io.tmpdir"))
.add(HTTP_COMPRESSION, "false")
.add(SETTINGS_KEY_DOWNLOAD_THREADS, "24")
.add(SETTINGS_KEY_DOWNLOAD_THREADS, 24)
.add(SETTINGS_KEY_ASYNC_LOGGING, false)
.add(LoggingModule.SETTINGS_KEY_LOG_TO_CONSOLE, true)
.add(WORKER_THREADS, "6")
.add(EVENT_THREADS, "3")
.add(WORKER_THREADS, 6)
.add(EVENT_THREADS, 3)
.add(ServerModule.BACKGROUND_THREADS, 4)
.add(ServerModule.SETTINGS_KEY_SOCKET_WRITE_SPIN_COUNT, 32)
.add(SETTINGS_KEY_LOG_LEVEL, "trace")
// .add(MAX_CONTENT_LENGTH, "128") // we don't accept PUTs, no need for a big buffer
Expand Down