去除不需要的任务状态;检查时对比进度是否变化,未变化则不推送进度;优化部分代码;

This commit is contained in:
chuzhongzai 2023-12-28 15:54:55 +08:00
parent f815acd334
commit dd0b7e608c
6 changed files with 43 additions and 60 deletions

View File

@ -3,8 +3,6 @@ package lion;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.concurrent.atomic.AtomicInteger;
@ -14,19 +12,4 @@ public class CustomUtil {
public static AtomicInteger counter = new AtomicInteger();
public static ObjectMapper objectMapper = new ObjectMapper();
/**
* 寻找一定数量的可用端口
*
* @return 可用端口的起始位置 -1为没有几乎没有可能
*/
public static short _findIdlePort(){
for(int i=20000; i<65535; i++){
try(ServerSocket ignored = new ServerSocket(i)){
return (short) i;
}catch (IOException ignored) {
}
}
return -1;
}
}

View File

@ -6,17 +6,11 @@ import lombok.Data;
@Data
public class GalleryTask {
public static byte DOWNLOADING = 1;
public static byte DOWNLOAD_COMPLETE = 2;
public static byte DOWNLOAD_QUEUED = 3;
public static byte COMPRESSING = 3;
public static byte COMPRESS_COMPLETE = 4;
public static byte COMPRESSING = 5;
@JsonInclude(JsonInclude.Include.NON_NULL)
private String name;

View File

@ -3,7 +3,4 @@ package lion.ErrorCode;
public class ErrorCode {
public static final byte IO_ERROR = 1;
public static final byte FILE_NOT_FOUND = 2;
public static final byte COMPRESS_ERROR = 3;
}

View File

@ -106,7 +106,7 @@ public class MultiThreadedHTTPServer {
// Get the range information for resuming download
long startByte = 0;
long endByte = fileLength - 1;
String rangeHeader = getRequestHeader(requestReader, "Range");
String rangeHeader = getRequestHeader(requestReader);
if (rangeHeader != null && rangeHeader.startsWith("bytes=")) {
String[] rangeValues = rangeHeader.substring(6).split("-");
startByte = Long.parseLong(rangeValues[0]);
@ -158,15 +158,15 @@ public class MultiThreadedHTTPServer {
}
}
private static String getRequestHeader(BufferedReader requestReader, String headerName) throws IOException {
private static String getRequestHeader(BufferedReader requestReader) throws IOException {
String line;
while ((line = requestReader.readLine()) != null) {
if (line.trim().isEmpty()) {
break;
}
if (line.startsWith(headerName + ":")) {
return line.substring(headerName.length() + 1).trim();
if (line.startsWith("Range" + ":")) {
return line.substring("Range".length() + 1).trim();
}
}
return null;

View File

@ -5,7 +5,6 @@ import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Promise;
import lion.Domain.GalleryTask;
import lion.ErrorCode.ErrorCode;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.ZipUtil;
import lion.Message.AbstractMessage;
@ -30,7 +29,7 @@ public class DownloadCheckService {
ScheduledThreadPoolExecutor convert_thread;
ArrayList<GalleryTask> convert_queue;
ArrayList<GalleryTask> compress_queue;
Channel node;
@ -42,9 +41,9 @@ public class DownloadCheckService {
this.queue = queue;
this.promises = promises;
eventLoop = new DefaultEventLoop();
convert_queue = new ArrayList<>(0);
compress_queue = new ArrayList<>(0);
convert_thread = new ScheduledThreadPoolExecutor(1);
convert_thread.scheduleAtFixedRate(this::convert, 0, 5, TimeUnit.SECONDS);
convert_thread.scheduleAtFixedRate(this::compress, 0, 5, TimeUnit.SECONDS);
}
public boolean downloadCheck(){
@ -60,29 +59,35 @@ public class DownloadCheckService {
ArrayList<File> files = new ArrayList<>(Arrays.asList(fileArray));
boolean result = false;
//扫描进度
Iterator<File> fileIterator = files.iterator();
for(GalleryTask galleryTask: queue){
//跳过已经下载完成或者压缩完成的任务
if(galleryTask.is_download_complete()
|| galleryTask.is_compress_complete()
|| galleryTask.is_compressing())
if(galleryTask.is_compress_complete() || galleryTask.is_compressing()) {
result = true;
continue;
}
while(fileIterator.hasNext()){
File file = fileIterator.next();
if(!file.getName().contains(String.valueOf(galleryTask.getGid())))
continue;
galleryTask.setStatus(GalleryTask.DOWNLOADING);
fileIterator.remove();
if(galleryTask.getName() == null || !galleryTask.getName().equals(file.getName()))
galleryTask.setName(file.getName());
File[] pages = file.listFiles((dir, name) -> !name.equals("galleryinfo.txt"));
if (pages == null || pages.length == 0)
continue;
if(galleryTask.getProceeding() != pages.length)
result = true;
galleryTask.setProceeding(pages.length);
if (new File(file.getPath(), "galleryinfo.txt").exists()) {
result = true;
galleryTask.setStatus(GalleryTask.DOWNLOAD_COMPLETE);
galleryTask.setPath(file.getPath());
}
@ -90,34 +95,28 @@ public class DownloadCheckService {
fileIterator = files.iterator();
}
//转格式队列
//压缩队列
for(GalleryTask galleryTask: queue)
if (galleryTask.is_download_complete()) {
galleryTask.setStatus(GalleryTask.COMPRESSING);
convert_queue.add(galleryTask);
compress_queue.add(galleryTask);
}
return true;
return result;
}
/**
* 转换线程将转换队列的任务复制一份进行转换
* 压缩线程将压缩队列的任务复制一份进行转换
*/
public void convert() {
if(convert_queue.isEmpty())
public void compress() {
if(compress_queue.isEmpty())
return;
ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
ArrayList<GalleryTask> galleryTasks = new ArrayList<>(convert_queue);
convert_queue.clear();
ArrayList<GalleryTask> galleryTasks = new ArrayList<>(compress_queue);
compress_queue.clear();
reentrantLock.unlock();
for (GalleryTask galleryTask : galleryTasks) {
File[] images = new File(galleryTask.getPath()).listFiles((dir, name) -> name.endsWith(".jpg") || name.endsWith(".png"));
if (images == null) {
galleryTask.setStatus(ErrorCode.COMPRESS_ERROR);
continue;
}
//创建文件夹
File file = new File(storagePath + galleryTask.getName());
if (file.isDirectory() || file.mkdirs()) {

View File

@ -85,15 +85,25 @@ public class storageNode {
tempQueue.clear();
}
lock.unlock();
//检查
if (!downloadCheckService.downloadCheck() && queue.isEmpty()) {
//检查当任务状态发生变化即方法返回true时再更新否则return
if (!downloadCheckService.downloadCheck()) {
boolean isSkip = true;
//返回false之后还要额外检查是否有压缩完成的任务
if(!queue.isEmpty())
for (GalleryTask galleryTask : queue)
if (galleryTask.is_compress_complete()) {
isSkip = false;
break;
}
if(isSkip) {
counter++;
if(server != null && server.isActive() && counter > 10) {
if (server != null && server.isActive() && counter > 10) {
server.writeAndFlush(new MaintainMessage());
counter = 0;
}
return;
}
}
//发送
//上锁后再发送避免出现发送完之后再下载完成
lock.lock();