Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
de19244ab8
@ -1,5 +1,7 @@
|
||||
package lion;
|
||||
|
||||
import cn.hutool.http.HttpRequest;
|
||||
import cn.hutool.http.HttpResponse;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.Data;
|
||||
|
||||
@ -12,4 +14,15 @@ public class CustomUtil {
|
||||
public static AtomicInteger counter = new AtomicInteger();
|
||||
|
||||
public static ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
public static void notifyMe(String message) {
|
||||
String url = "https://personal.lionwebsite.xyz/message2me?AuthCode=alone&message=" + message;
|
||||
HttpRequest request = HttpRequest.post(url);
|
||||
request.header("User-Agent", "Mozilla/5.0");
|
||||
try(HttpResponse response = request.execute()) {
|
||||
if(response.getStatus() != 200) {
|
||||
System.out.println("通知失败, status code:" + response.getStatus() + ", message:" + message);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -127,6 +127,7 @@ public class MultiThreadedHTTPServer {
|
||||
responseWriter.println("Accept-Ranges: bytes");
|
||||
responseWriter.println("Content-Length: " + (endByte - startByte + 1));
|
||||
responseWriter.println("Content-Range: bytes " + startByte + "-" + endByte + "/" + fileLength);
|
||||
responseWriter.println("Content-Disposition: attachment; filename=\"" + fileName + "\"");
|
||||
responseWriter.println();
|
||||
|
||||
// Send the file content
|
||||
|
||||
@ -4,6 +4,7 @@ import io.netty.channel.Channel;
|
||||
import io.netty.channel.DefaultEventLoop;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import lion.CustomUtil;
|
||||
import lion.Domain.GalleryTask;
|
||||
import cn.hutool.core.io.FileUtil;
|
||||
import cn.hutool.core.util.ZipUtil;
|
||||
@ -21,7 +22,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
@Slf4j
|
||||
@Data
|
||||
public class DownloadCheckService {
|
||||
ArrayList<GalleryTask> queue;
|
||||
Map<Integer, GalleryTask> queue;
|
||||
|
||||
String downloadPath = "/root/gallery/hentai/download/";
|
||||
|
||||
@ -37,7 +38,7 @@ public class DownloadCheckService {
|
||||
|
||||
EventLoop eventLoop;
|
||||
|
||||
public DownloadCheckService(ArrayList<GalleryTask> queue, HashMap<Integer, Promise<AbstractMessage>> promises){
|
||||
public DownloadCheckService(Map<Integer, GalleryTask> queue, HashMap<Integer, Promise<AbstractMessage>> promises){
|
||||
this.queue = queue;
|
||||
this.promises = promises;
|
||||
eventLoop = new DefaultEventLoop();
|
||||
@ -49,7 +50,7 @@ public class DownloadCheckService {
|
||||
public boolean downloadCheck(){
|
||||
if(queue.isEmpty())
|
||||
return false;
|
||||
log.info("下载检查:" + Arrays.toString(queue.toArray()));
|
||||
log.info("下载检查:{}", Arrays.toString(queue.values().toArray()));
|
||||
|
||||
File downloadDirectory = new File(downloadPath);
|
||||
File[] fileArray = downloadDirectory.listFiles();
|
||||
@ -62,7 +63,7 @@ public class DownloadCheckService {
|
||||
boolean result = false;
|
||||
//扫描进度
|
||||
Iterator<File> fileIterator = files.iterator();
|
||||
for(GalleryTask galleryTask: queue){
|
||||
for(GalleryTask galleryTask: queue.values()){
|
||||
//跳过已经下载完成或者压缩完成的任务
|
||||
if(galleryTask.is_compress_complete() || galleryTask.is_compressing()) {
|
||||
result = true;
|
||||
@ -96,7 +97,7 @@ public class DownloadCheckService {
|
||||
}
|
||||
|
||||
//压缩队列
|
||||
for(GalleryTask galleryTask: queue)
|
||||
for(GalleryTask galleryTask: queue.values())
|
||||
if (galleryTask.is_download_complete()) {
|
||||
galleryTask.setStatus(GalleryTask.COMPRESSING);
|
||||
compress_queue.add(galleryTask);
|
||||
@ -139,4 +140,35 @@ public class DownloadCheckService {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查改任务是否为已完成任务,如已完成则返回true,若未完成则加入队列
|
||||
* @return true if compress complete, false otherwise
|
||||
*/
|
||||
public boolean addToQueue(GalleryTask galleryTask){
|
||||
//是否含有名字,进行中任务一般有名字,没有名字则肯定为初始任务,存在名字至少在下载路径出现过
|
||||
if(galleryTask.getName() == null || !galleryTask.getName().isEmpty()){
|
||||
queue.putIfAbsent(galleryTask.getGid(), galleryTask);
|
||||
return false;
|
||||
}
|
||||
|
||||
//查询hah下载路径中,是否存在该任务下载路径,存在则为下载中或下载完成任务,加入队列
|
||||
if(new File(downloadPath + galleryTask.getGid()).isDirectory()){
|
||||
queue.putIfAbsent(galleryTask.getGid(), galleryTask);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
//查询存放路径中是否含有该任务的压缩包,存在则为下载完成任务
|
||||
if(new File(storagePath + galleryTask.getName() + "/" + galleryTask.getName() + ".zip").exists()){
|
||||
galleryTask.setStatus(GalleryTask.COMPRESS_COMPLETE);
|
||||
CustomUtil.notifyMe(String.format("任务:%s在添加时已下载完成,更新任务状态", galleryTask.getName()));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
//异常情况,发送通知
|
||||
CustomUtil.notifyMe(String.format("任务:%s存在名字,但是下载路径为空且不存在压缩包", galleryTask.getName() ));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,8 +17,8 @@ import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
@ -33,9 +33,9 @@ public class storageNode {
|
||||
|
||||
DownloadCheckService downloadCheckService;
|
||||
|
||||
ArrayList<GalleryTask> queue;
|
||||
Map<Integer, GalleryTask> queue;
|
||||
|
||||
ArrayList<GalleryTask> tempQueue;
|
||||
Map<Integer, GalleryTask> tempQueue;
|
||||
|
||||
ScheduledExecutorService checkThreadPool;
|
||||
|
||||
@ -48,8 +48,8 @@ public class storageNode {
|
||||
public static String storagePath = "/root/gallery/gallery/";
|
||||
|
||||
public storageNode(){
|
||||
queue = new ArrayList<>(0);
|
||||
tempQueue = new ArrayList<>(0);
|
||||
queue = new HashMap<>();
|
||||
tempQueue = new HashMap<>();
|
||||
lock = new ReentrantLock();
|
||||
counter = 0;
|
||||
promises = new HashMap<>();
|
||||
@ -79,7 +79,7 @@ public class storageNode {
|
||||
try {
|
||||
lock.lock();
|
||||
if(!tempQueue.isEmpty()){
|
||||
queue.addAll(tempQueue);
|
||||
queue.putAll(tempQueue);
|
||||
tempQueue.clear();
|
||||
}
|
||||
lock.unlock();
|
||||
@ -88,7 +88,7 @@ public class storageNode {
|
||||
boolean isSkip = true;
|
||||
//返回false之后,还要额外检查是否有压缩完成的任务
|
||||
if(!queue.isEmpty())
|
||||
for (GalleryTask galleryTask : queue)
|
||||
for (GalleryTask galleryTask : queue.values())
|
||||
if (galleryTask.is_compress_complete()) {
|
||||
isSkip = false;
|
||||
break;
|
||||
@ -106,11 +106,13 @@ public class storageNode {
|
||||
//上锁后再发送,避免出现发送完之后再下载完成
|
||||
lock.lock();
|
||||
DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage();
|
||||
downloadStatusMessage.setGalleryTasks(queue.toArray(GalleryTask[]::new));
|
||||
downloadStatusMessage.setGalleryTasks(queue.values().toArray(GalleryTask[]::new));
|
||||
server.writeAndFlush(downloadStatusMessage);
|
||||
|
||||
queue.entrySet().removeIf(entry -> entry.getValue().is_compress_complete());
|
||||
log.info("任务状态发送完成");
|
||||
|
||||
queue.removeIf(GalleryTask::is_compress_complete);
|
||||
|
||||
lock.unlock();
|
||||
}catch (Exception e){
|
||||
log.error("发送任务状态时发生异常:{}", e.getMessage());
|
||||
@ -118,9 +120,9 @@ public class storageNode {
|
||||
}
|
||||
|
||||
class MyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{
|
||||
ArrayList<GalleryTask> queue;
|
||||
Map<Integer, GalleryTask> queue;
|
||||
|
||||
public MyChannelInboundHandlerAdapter(ArrayList<GalleryTask> queue) {
|
||||
public MyChannelInboundHandlerAdapter(Map<Integer, GalleryTask> queue) {
|
||||
this.queue = queue;
|
||||
}
|
||||
|
||||
@ -144,7 +146,12 @@ public class storageNode {
|
||||
case AbstractMessage.DOWNLOAD_POST_MESSAGE -> {
|
||||
DownloadPostMessage dpm = (DownloadPostMessage) abstractMessage;
|
||||
lock.lock();
|
||||
queue.add(dpm.getGalleryTask());
|
||||
//添加到队列方法返回真说明该任务已下载完成,直接发送下载进度
|
||||
if(downloadCheckService.addToQueue(dpm.getGalleryTask())){
|
||||
DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage();
|
||||
downloadStatusMessage.setGalleryTasks(new GalleryTask[]{dpm.getGalleryTask()});
|
||||
server.writeAndFlush(downloadStatusMessage);
|
||||
}
|
||||
log.info(String.valueOf(queue));
|
||||
lock.unlock();
|
||||
ctx.writeAndFlush(new ResponseMessage(dpm.messageId, (byte) 0));
|
||||
|
||||
Loading…
Reference in New Issue
Block a user