加入心跳消息,50秒一次;加入转换线程,使得不需要等待转换完成也可以回传任务进度;检测主服务器请求改为检测域名而不是写死ip地址;加入临时任务队列,防止多线程读写同一个任务队列导致数据错误;

This commit is contained in:
chuzhongzai 2023-09-04 20:26:07 +08:00
parent d49ba48f2e
commit 8c0ca78dc6
9 changed files with 141 additions and 46 deletions

38
.gitignore vendored Normal file
View File

@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -16,6 +16,8 @@ public class AbstractMessage {
public static final byte IDENTITY_MESSAGE = 6; public static final byte IDENTITY_MESSAGE = 6;
public static final byte MAINTAIN_MESSAGE = 7;
public static final byte GALLERY_REQUEST_MESSAGE = 101; public static final byte GALLERY_REQUEST_MESSAGE = 101;
public byte messageType; public byte messageType;

View File

@ -0,0 +1,7 @@
package lion.Message;
public class MaintainMessage extends AbstractMessage{
{
messageType = MAINTAIN_MESSAGE;
}
}

View File

@ -46,6 +46,7 @@ public class MessageCodec extends ByteToMessageCodec<AbstractMessage> {
case AbstractMessage.DELETE_GALLERY_MESSAGE -> objectMapper.readValue(metadata, DeleteGalleryMessage.class); case AbstractMessage.DELETE_GALLERY_MESSAGE -> objectMapper.readValue(metadata, DeleteGalleryMessage.class);
case AbstractMessage.GALLERY_PAGE_QUERY_MESSAGE -> objectMapper.readValue(metadata, GalleryPageQueryMessage.class); case AbstractMessage.GALLERY_PAGE_QUERY_MESSAGE -> objectMapper.readValue(metadata, GalleryPageQueryMessage.class);
case AbstractMessage.IDENTITY_MESSAGE -> objectMapper.readValue(metadata, IdentityMessage.class); case AbstractMessage.IDENTITY_MESSAGE -> objectMapper.readValue(metadata, IdentityMessage.class);
case AbstractMessage.MAINTAIN_MESSAGE -> objectMapper.readValue(metadata, MaintainMessage.class);
default -> null; default -> null;
}; };

View File

@ -1,10 +1,7 @@
package lion; package lion;
import java.io.*; import java.io.*;
import java.net.ServerSocket; import java.net.*;
import java.net.Socket;
import java.net.SocketException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
@ -18,13 +15,20 @@ public class MultiThreadedHTTPServer {
public static void main(String[] args) { public static void main(String[] args) {
ExecutorService threadPool = Executors.newCachedThreadPool(); ExecutorService threadPool = Executors.newCachedThreadPool();
String real_ip;
try{
real_ip = InetAddress.getByName("lionwebsite.xyz").getHostAddress();
}
catch (UnknownHostException ignored){
real_ip = "207.60.50.74";
}
try(ServerSocket serverSocket = new ServerSocket(PORT)) { try(ServerSocket serverSocket = new ServerSocket(PORT)) {
System.out.println("Server listening on port " + PORT); System.out.println("Server listening on port " + PORT);
while (true) { while (true) {
Socket clientSocket = serverSocket.accept(); Socket clientSocket = serverSocket.accept();
String ip = clientSocket.getInetAddress().getHostAddress(); String ip = clientSocket.getInetAddress().getHostAddress();
if(ip.equals("194.36.27.28")){ if(ip.equals(real_ip)){
System.out.println("Client connected: " + ip); System.out.println("Client connected");
// 线程池处理下载请求 // 线程池处理下载请求
threadPool.submit(() -> handleClientRequest(clientSocket)); threadPool.submit(() -> handleClientRequest(clientSocket));
}else{ }else{

View File

@ -50,7 +50,7 @@ public class DeliveryService {
return ErrorCode.FILE_NOT_FOUND; return ErrorCode.FILE_NOT_FOUND;
singleThreadPool.submit(() -> { singleThreadPool.submit(() -> {
try(SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("194.36.27.28", port)); try(SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("lionwebsite.xyz", port));
FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) { FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) {
ByteBuffer buffer = ByteBuffer.allocate(1024); ByteBuffer buffer = ByteBuffer.allocate(1024);
while (fileChannel.read(buffer)!=-1){ while (fileChannel.read(buffer)!=-1){
@ -83,8 +83,10 @@ public class DeliveryService {
public static byte pageCache(String name){ public static byte pageCache(String name){
File directory = new File(storagePath, name); File directory = new File(storagePath, name);
if(!directory.isDirectory()) if(!directory.isDirectory()) {
System.out.printf("文件夹%s没找到\n", directory.getAbsolutePath());
return ErrorCode.FILE_NOT_FOUND; return ErrorCode.FILE_NOT_FOUND;
}
ArrayList<String> pageList = new ArrayList<>(); ArrayList<String> pageList = new ArrayList<>();
File[] files = directory.listFiles(((dir, name1) -> !name1.equals("galleryinfo.txt") && !name1.equals("thumbnail.webp") && !name1.endsWith(".zip"))); File[] files = directory.listFiles(((dir, name1) -> !name1.equals("galleryinfo.txt") && !name1.equals("thumbnail.webp") && !name1.endsWith(".zip")));

View File

@ -13,6 +13,11 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@Log4j @Log4j
@ -23,8 +28,15 @@ public class DownloadCheckService {
String storagePath = "/root/gallery/gallery/"; String storagePath = "/root/gallery/gallery/";
ScheduledThreadPoolExecutor convert_thread;
ArrayList<GalleryTask> convert_queue;
public DownloadCheckService(ArrayList<GalleryTask> queue){ public DownloadCheckService(ArrayList<GalleryTask> queue){
this.queue = queue; this.queue = queue;
convert_queue = new ArrayList<>(0);
convert_thread = new ScheduledThreadPoolExecutor(1);
convert_thread.scheduleAtFixedRate(this::convert, 0, 5, TimeUnit.SECONDS);
} }
public boolean downloadCheck(){ public boolean downloadCheck(){
@ -41,11 +53,14 @@ public class DownloadCheckService {
ArrayList<File> files = new ArrayList<>(Arrays.asList(fileArray)); ArrayList<File> files = new ArrayList<>(Arrays.asList(fileArray));
//扫描进度 //扫描进度
for(File file: files) Iterator<File> fileIterator = files.iterator();
for(GalleryTask galleryTask: queue){ for(GalleryTask galleryTask: queue){
while(fileIterator.hasNext()){
File file = fileIterator.next();
if(!file.getName().contains(String.valueOf(galleryTask.getGid()))) if(!file.getName().contains(String.valueOf(galleryTask.getGid())))
continue; continue;
fileIterator.remove();
galleryTask.setName(file.getName()); galleryTask.setName(file.getName());
File[] pages = file.listFiles((dir, name) -> !name.equals("galleryinfo.txt")); File[] pages = file.listFiles((dir, name) -> !name.equals("galleryinfo.txt"));
@ -58,27 +73,37 @@ public class DownloadCheckService {
galleryTask.setPath(file.getPath()); galleryTask.setPath(file.getPath());
} }
} }
fileIterator = files.iterator();
}
//转格式队列
for(GalleryTask galleryTask: queue)
if (galleryTask.is_download_complete())
convert_queue.add(galleryTask);
//转格式 return true;
}
/**
* 转换线程将转换队列的任务复制一份进行转换
*/
public void convert() {
if(convert_queue.isEmpty())
return;
ConvertCmd convertCmd = new ConvertCmd(true); ConvertCmd convertCmd = new ConvertCmd(true);
for(GalleryTask galleryTask: queue){ ReentrantLock reentrantLock = new ReentrantLock();
//跳过未完成 reentrantLock.lock();
if (!galleryTask.is_download_complete()) ArrayList<GalleryTask> galleryTasks = new ArrayList<>(convert_queue);
continue; convert_queue.clear();
reentrantLock.unlock();
for (GalleryTask galleryTask : galleryTasks) {
File[] images = new File(galleryTask.getPath()).listFiles((dir, name) -> name.endsWith(".jpg") || name.endsWith(".png")); File[] images = new File(galleryTask.getPath()).listFiles((dir, name) -> name.endsWith(".jpg") || name.endsWith(".png"));
if (images == null) { if (images == null) {
galleryTask.setStatus(ErrorCode.COMPRESS_ERROR); galleryTask.setStatus(ErrorCode.COMPRESS_ERROR);
continue; continue;
} }
//长度相同比较字典序否则比较长度 //长度相同比较字典序否则比较长度
images = Arrays.stream(images).sorted((f1, f2) -> { images = Arrays.stream(images).sorted(Comparator.naturalOrder()).toArray(File[]::new);
if(f1.getName().length() == f2.getName().length())
return f1.compareTo(f2);
else
return f1.getName().length() - f2.getName().length();
}).toArray(File[]::new);
//创建文件夹 //创建文件夹
File file = new File(storagePath + galleryTask.getName()); File file = new File(storagePath + galleryTask.getName());
@ -105,29 +130,27 @@ public class DownloadCheckService {
} }
if ((galleryTask.getType() & GalleryTask.DOWNLOAD_PREVIEW) != 0) if ((galleryTask.getType() & GalleryTask.DOWNLOAD_PREVIEW) != 0)
for (File image : images) { for (int i = 0; i < images.length; i++) {
log.info("文件" + image.getName() + "转换为webp"); log.info("文件" + images[i].getName() + "转换为webp[" + i + "/" + images.length + "]");
operation = new IMOperation(); operation = new IMOperation();
operation.addImage(image.getAbsolutePath()); operation.addImage(images[i].getAbsolutePath());
operation.format("webp"); operation.format("webp");
operation.addImage(storagePath + galleryTask.getName() + "/" + image.getName().replace(".png", ".webp").replace(".jpg", ".webp")); operation.addImage(storagePath + galleryTask.getName() + "/" + images[i].getName().replace(".png", ".webp").replace(".jpg", ".webp"));
try { try {
convertCmd.run(operation); convertCmd.run(operation);
} catch (IOException | InterruptedException | IM4JavaException e) { } catch (IOException | InterruptedException | IM4JavaException e) {
log.error("文件" + image.getName() + "转换失败"); log.error("文件" + images[i].getName() + "转换失败");
galleryTask.setStatus(ErrorCode.COMPRESS_ERROR); galleryTask.setStatus(ErrorCode.COMPRESS_ERROR);
break; break;
} }
} }
if ((galleryTask.getType() & GalleryTask.DOWNLOAD_SOURCE) != 0) {
if((galleryTask.getType() & GalleryTask.DOWNLOAD_SOURCE) != 0)
ZipUtil.zip(galleryTask.getPath(), storagePath + galleryTask.getName() + "/" + galleryTask.getName() + ".zip"); ZipUtil.zip(galleryTask.getPath(), storagePath + galleryTask.getName() + "/" + galleryTask.getName() + ".zip");
log.info(galleryTask.getName() + "压缩完成" );
}
FileUtil.del(galleryTask.getPath()); FileUtil.del(galleryTask.getPath());
galleryTask.setStatus(GalleryTask.DOWNLOAD_COMPLETE); galleryTask.setStatus(GalleryTask.DOWNLOAD_COMPLETE);
} }
return true;
} }
} }

View File

@ -12,7 +12,6 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.log4j.Log4j; import lombok.extern.log4j.Log4j;
@ -40,6 +39,7 @@ public class storageNode {
ScheduledExecutorService checkThreadPool; ScheduledExecutorService checkThreadPool;
int counter;
ReentrantLock lock; ReentrantLock lock;
@ -47,7 +47,10 @@ public class storageNode {
public storageNode(){ public storageNode(){
queue = new ArrayList<>(0); queue = new ArrayList<>(0);
tempQueue = new ArrayList<>(0);
lock = new ReentrantLock(); lock = new ReentrantLock();
counter = 0;
channelFuture = new ServerBootstrap() channelFuture = new ServerBootstrap()
.channel(NioServerSocketChannel.class) .channel(NioServerSocketChannel.class)
.group(new NioEventLoopGroup()) .group(new NioEventLoopGroup())
@ -56,7 +59,6 @@ public class storageNode {
protected void initChannel(NioSocketChannel channel) { protected void initChannel(NioSocketChannel channel) {
channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(100000000, 1, 4)); channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(100000000, 1, 4));
channel.pipeline().addLast(new MessageCodec()); channel.pipeline().addLast(new MessageCodec());
channel.pipeline().addLast(new LoggingHandler());
channel.pipeline().addLast(new MyChannelInboundHandlerAdapter(tempQueue)); channel.pipeline().addLast(new MyChannelInboundHandlerAdapter(tempQueue));
} }
@ -66,7 +68,7 @@ public class storageNode {
log.info("listen port:8080"); log.info("listen port:8080");
try(Socket socket = new Socket()) { try(Socket socket = new Socket()) {
socket.connect(new InetSocketAddress("194.36.27.28", 26322)); socket.connect(new InetSocketAddress("lionwebsite.xyz", 26322));
} catch (Exception ignored) {} } catch (Exception ignored) {}
downloadCheckService = new DownloadCheckService(queue); downloadCheckService = new DownloadCheckService(queue);
checkThreadPool = Executors.newScheduledThreadPool(1); checkThreadPool = Executors.newScheduledThreadPool(1);
@ -75,9 +77,21 @@ public class storageNode {
public void mainThread(){ public void mainThread(){
try { try {
lock.lock();
if(!tempQueue.isEmpty()){
queue.addAll(tempQueue);
tempQueue.clear();
}
lock.unlock();
//检查 //检查
if (!downloadCheckService.downloadCheck()) if (!downloadCheckService.downloadCheck()) {
counter++;
if(server != null && server.isActive() && counter > 10) {
server.writeAndFlush(new MaintainMessage());
counter = 0;
}
return; return;
}
//发送 //发送
DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage(); DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage();
downloadStatusMessage.setGalleryTasks(queue.toArray(GalleryTask[]::new)); downloadStatusMessage.setGalleryTasks(queue.toArray(GalleryTask[]::new));
@ -90,10 +104,6 @@ public class storageNode {
if (galleryTask.is_download_complete()) if (galleryTask.is_download_complete())
listIterator.remove(); listIterator.remove();
} }
if(!tempQueue.isEmpty()){
queue.addAll(tempQueue);
tempQueue.clear();
}
lock.unlock(); lock.unlock();
log.info("任务状态发送完成"); log.info("任务状态发送完成");
}catch (Exception e){ }catch (Exception e){