将队列从list换成map,能够对任务去重;新增任务时会判断该任务是否下载完成;修改下载请求头;

This commit is contained in:
chuzhongzai 2025-03-22 23:50:25 +08:00
parent 95329a5603
commit c3b96414e9
5 changed files with 80 additions and 17 deletions

7
.idea/encodings.xml Normal file
View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>

View File

@ -1,5 +1,7 @@
package lion;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
@ -12,4 +14,15 @@ public class CustomUtil {
public static AtomicInteger counter = new AtomicInteger();
public static ObjectMapper objectMapper = new ObjectMapper();
public static void notifyMe(String message) {
String url = "https://personal.lionwebsite.xyz/message2me?AuthCode=alone&message=" + message;
HttpRequest request = HttpRequest.post(url);
request.header("User-Agent", "Mozilla/5.0");
try(HttpResponse response = request.execute()) {
if(response.getStatus() != 200) {
System.out.println("通知失败, status code:" + response.getStatus() + ", message:" + message);
}
}
}
}

View File

@ -127,6 +127,7 @@ public class MultiThreadedHTTPServer {
responseWriter.println("Accept-Ranges: bytes");
responseWriter.println("Content-Length: " + (endByte - startByte + 1));
responseWriter.println("Content-Range: bytes " + startByte + "-" + endByte + "/" + fileLength);
responseWriter.println("Content-Disposition: attachment; filename=\"" + fileName + "\"");
responseWriter.println();
// Send the file content

View File

@ -4,6 +4,7 @@ import io.netty.channel.Channel;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Promise;
import lion.CustomUtil;
import lion.Domain.GalleryTask;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.ZipUtil;
@ -21,7 +22,7 @@ import java.util.concurrent.locks.ReentrantLock;
@Slf4j
@Data
public class DownloadCheckService {
ArrayList<GalleryTask> queue;
Map<Integer, GalleryTask> queue;
String downloadPath = "/root/gallery/hentai/download/";
@ -37,7 +38,7 @@ public class DownloadCheckService {
EventLoop eventLoop;
public DownloadCheckService(ArrayList<GalleryTask> queue, HashMap<Integer, Promise<AbstractMessage>> promises){
public DownloadCheckService(Map<Integer, GalleryTask> queue, HashMap<Integer, Promise<AbstractMessage>> promises){
this.queue = queue;
this.promises = promises;
eventLoop = new DefaultEventLoop();
@ -49,7 +50,7 @@ public class DownloadCheckService {
public boolean downloadCheck(){
if(queue.isEmpty())
return false;
log.info("下载检查:" + Arrays.toString(queue.toArray()));
log.info("下载检查:{}", Arrays.toString(queue.values().toArray()));
File downloadDirectory = new File(downloadPath);
File[] fileArray = downloadDirectory.listFiles();
@ -62,7 +63,7 @@ public class DownloadCheckService {
boolean result = false;
//扫描进度
Iterator<File> fileIterator = files.iterator();
for(GalleryTask galleryTask: queue){
for(GalleryTask galleryTask: queue.values()){
//跳过已经下载完成或者压缩完成的任务
if(galleryTask.is_compress_complete() || galleryTask.is_compressing()) {
result = true;
@ -96,7 +97,7 @@ public class DownloadCheckService {
}
//压缩队列
for(GalleryTask galleryTask: queue)
for(GalleryTask galleryTask: queue.values())
if (galleryTask.is_download_complete()) {
galleryTask.setStatus(GalleryTask.COMPRESSING);
compress_queue.add(galleryTask);
@ -139,4 +140,35 @@ public class DownloadCheckService {
}
}
}
/**
* 检查改任务是否为已完成任务如已完成则返回true若未完成则加入队列
* @return true if compress complete, false otherwise
*/
public boolean addToQueue(GalleryTask galleryTask){
//是否含有名字进行中任务一般有名字没有名字则肯定为初始任务存在名字至少在下载路径出现过
if(galleryTask.getName() == null || !galleryTask.getName().isEmpty()){
queue.putIfAbsent(galleryTask.getGid(), galleryTask);
return false;
}
//查询hah下载路径中是否存在该任务下载路径存在则为下载中或下载完成任务加入队列
if(new File(downloadPath + galleryTask.getGid()).isDirectory()){
queue.putIfAbsent(galleryTask.getGid(), galleryTask);
return false;
}
//查询存放路径中是否含有该任务的压缩包存在则为下载完成任务
if(new File(storagePath + galleryTask.getName() + "/" + galleryTask.getName() + ".zip").exists()){
galleryTask.setStatus(GalleryTask.COMPRESS_COMPLETE);
CustomUtil.notifyMe(String.format("任务:%s在添加时已下载完成更新任务状态", galleryTask.getName()));
return true;
}
//异常情况发送通知
CustomUtil.notifyMe(String.format("任务:%s存在名字但是下载路径为空且不存在压缩包", galleryTask.getName() ));
return false;
}
}

View File

@ -17,8 +17,8 @@ import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
@ -33,9 +33,9 @@ public class storageNode {
DownloadCheckService downloadCheckService;
ArrayList<GalleryTask> queue;
Map<Integer, GalleryTask> queue;
ArrayList<GalleryTask> tempQueue;
Map<Integer, GalleryTask> tempQueue;
ScheduledExecutorService checkThreadPool;
@ -48,8 +48,8 @@ public class storageNode {
public static String storagePath = "/root/gallery/gallery/";
public storageNode(){
queue = new ArrayList<>(0);
tempQueue = new ArrayList<>(0);
queue = new HashMap<>();
tempQueue = new HashMap<>();
lock = new ReentrantLock();
counter = 0;
promises = new HashMap<>();
@ -79,7 +79,7 @@ public class storageNode {
try {
lock.lock();
if(!tempQueue.isEmpty()){
queue.addAll(tempQueue);
queue.putAll(tempQueue);
tempQueue.clear();
}
lock.unlock();
@ -88,7 +88,7 @@ public class storageNode {
boolean isSkip = true;
//返回false之后还要额外检查是否有压缩完成的任务
if(!queue.isEmpty())
for (GalleryTask galleryTask : queue)
for (GalleryTask galleryTask : queue.values())
if (galleryTask.is_compress_complete()) {
isSkip = false;
break;
@ -106,11 +106,16 @@ public class storageNode {
//上锁后再发送避免出现发送完之后再下载完成
lock.lock();
DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage();
downloadStatusMessage.setGalleryTasks(queue.toArray(GalleryTask[]::new));
downloadStatusMessage.setGalleryTasks(queue.values().toArray(GalleryTask[]::new));
server.writeAndFlush(downloadStatusMessage);
queue.forEach((gid, galleryTask) -> {
if (galleryTask.is_compress_complete())
queue.remove(gid);
});
log.info("任务状态发送完成");
queue.removeIf(GalleryTask::is_compress_complete);
lock.unlock();
}catch (Exception e){
log.error("发送任务状态时发生异常:{}", e.getMessage());
@ -118,9 +123,9 @@ public class storageNode {
}
class MyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{
ArrayList<GalleryTask> queue;
Map<Integer, GalleryTask> queue;
public MyChannelInboundHandlerAdapter(ArrayList<GalleryTask> queue) {
public MyChannelInboundHandlerAdapter(Map<Integer, GalleryTask> queue) {
this.queue = queue;
}
@ -144,7 +149,12 @@ public class storageNode {
case AbstractMessage.DOWNLOAD_POST_MESSAGE -> {
DownloadPostMessage dpm = (DownloadPostMessage) abstractMessage;
lock.lock();
queue.add(dpm.getGalleryTask());
//添加到队列方法返回真说明该任务已下载完成直接发送下载进度
if(downloadCheckService.addToQueue(dpm.getGalleryTask())){
DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage();
downloadStatusMessage.setGalleryTasks(new GalleryTask[]{dpm.getGalleryTask()});
server.writeAndFlush(downloadStatusMessage);
}
log.info(String.valueOf(queue));
lock.unlock();
ctx.writeAndFlush(new ResponseMessage(dpm.messageId, (byte) 0));