[FIX] Фикс скачивания мелких файлов

This commit is contained in:
Gravit 2019-12-19 10:04:29 +07:00
parent 081b86ff23
commit 9eb130b6e6
No known key found for this signature in database
GPG key ID: 061981E1E85D3216
2 changed files with 55 additions and 28 deletions

View file

@ -9,6 +9,7 @@
import java.net.URL;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
@ -18,6 +19,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@ -36,7 +38,7 @@
public class ListDownloader {
private static final AtomicInteger COUNTER_THR = new AtomicInteger(0);
private static final ThreadFactory FACTORY = r -> CommonHelper.newThread("Downloader Thread #" + COUNTER_THR.incrementAndGet(), true, r);
public static final int THREADS = 4;
private static ExecutorService newExecutor() {
return new ThreadPoolExecutor(0, VerifyHelper.verifyInt(Integer.parseInt(System.getProperty("launcher.downloadThreads", "3")), VerifyHelper.POSITIVE, "Thread max count must be positive."), 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), FACTORY);
}
@ -62,11 +64,57 @@ public DownloadTask(String apply, long size) {
}
public void download(String base, List<DownloadTask> applies, Path dstDirFile, DownloadCallback callback, DownloadTotalCallback totalCallback) throws IOException, URISyntaxException {
List<List<DownloadTask>> tasks = new ArrayList<>();
applies.sort(Comparator.comparingLong((e) -> e.size));
for(int i=0;i<THREADS;++i)
{
tasks.add(new ArrayList<>());
}
int n = 0;
for(DownloadTask t : applies)
{
if(n == THREADS) n = 0;
tasks.get(n).add(t);
n++;
}
Thread[] threads = new Thread[THREADS];
AtomicReference<Exception> excRef = new AtomicReference<>();
for(int i=0;i<THREADS;++i)
{
List<DownloadTask> task = tasks.get(i);
threads[i] = CommonHelper.newThread(String.format("Downloader #%d", n), true, () -> {
try {
downloadInOneThread(base, task, dstDirFile, callback, totalCallback);
} catch (IOException | URISyntaxException e) {
LogHelper.error(e);
excRef.set(e);
}
});
threads[i].start();
}
for(int i=0;i<THREADS;++i)
{
try {
threads[i].wait();
if(excRef.get() != null)
{
Exception ex = excRef.get();
for(int j=0;j<THREADS;++j) if(threads[j].isAlive()) threads[i].interrupt();
if(ex instanceof IOException) throw (IOException)ex;
if(ex instanceof URISyntaxException) throw (URISyntaxException)ex;
if(ex instanceof RuntimeException) throw (RuntimeException)ex;
LogHelper.error(ex);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public void downloadInOneThread(String base, List<DownloadTask> applies, Path dstDirFile, DownloadCallback callback, DownloadTotalCallback totalCallback) throws IOException, URISyntaxException {
try (CloseableHttpClient httpclient = HttpClients.custom().setUserAgent(IOHelper.USER_AGENT)
.setRedirectStrategy(new LaxRedirectStrategy())
.build()) {
applies.sort((a,b) -> Long.compare(a.size, b.size));
List<Callable<Void>> toExec = new ArrayList<>();
URI baseUri = new URI(base);
String scheme = baseUri.getScheme();
String host = baseUri.getHost();
@ -74,34 +122,13 @@ public void download(String base, List<DownloadTask> applies, Path dstDirFile, D
if (port != -1)
host = host + ":" + port;
String path = baseUri.getPath();
List<IOException> excs = new CopyOnWriteArrayList<>();
for (DownloadTask apply : applies) {
URI u = new URI(scheme, host, path + apply.apply, "", "");
callback.stateChanged(apply.apply, 0L, apply.size);
Path targetPath = dstDirFile.resolve(apply.apply);
toExec.add(() -> {
if (LogHelper.isDebugEnabled())
LogHelper.debug("Download URL: %s to file %s dir: %s", u.toString(), targetPath.toAbsolutePath().toString(), dstDirFile.toAbsolutePath().toString());
try {
httpclient.execute(new HttpGet(u), new FileDownloadResponseHandler(targetPath, apply, callback, totalCallback, false));
} catch (IOException e) {
excs.add(e);
}
return null;
});
}
try {
ExecutorService e = newExecutor();
e.invokeAll(toExec);
e.shutdown();
e.awaitTermination(4, TimeUnit.HOURS);
} catch (InterruptedException t) {
LogHelper.error(t);
}
if (!excs.isEmpty()) {
IOException toThrow = excs.remove(0);
excs.forEach(toThrow::addSuppressed);
throw toThrow;
}
}
}

@ -1 +1 @@
Subproject commit 6d0b3e25fa59503d67f4177b587934ad288218ff
Subproject commit 3bee30b78fb6a2e92fc97e3b6a257a8c4e9d4d54