优化: 代码清理、日志替换为Logback、依赖升级

- 修复 DownloadCheckService 局部锁无效的问题,改用 synchronized
  - 修复 storageNode 中 lock 未用 try/finally 保护的问题
  - 修复 BackupSubServer Get() 未关闭 HttpClient 的资源泄漏
  - 修复 MultiThreadedHTTPServer requestLine 无判空的问题
  - 删除各处的死代码/未用字段(channelFuture, promises, counter等)
  - 提取两个 HTTP Server 的公共方法到 CustomUtil(getRequestHeader, sendErrorResponse, sendFileRange)
  - MessageCodec 序列化改用 writeValueAsBytes 替代 valueToTree
  - 日志从 slf4j-simple 替换为 logback-classic,增加 10MB 轮转+保留10份
  - Lombok 升级至 1.18.40,新增 maven-compiler-plugin 注解处理器配置
  - GalleryTask 状态常量加 final,Config 异常改用 log.error
This commit is contained in:
chuzhongzai 2026-06-07 15:37:59 +08:00
parent 83476dded2
commit b350b5bda9
11 changed files with 214 additions and 260 deletions

30
pom.xml
View File

@ -30,7 +30,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<version>1.18.40</version>
</dependency>
<dependency>
@ -41,9 +41,9 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.14</version>
</dependency>
@ -68,6 +68,20 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.40</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-assembly-plugin</artifactId>-->
@ -94,21 +108,21 @@
<plugin>
<groupId>org.graalvm.buildtools</groupId>
<artifactId>native-maven-plugin</artifactId>
<version>0.9.28</version>
<version>0.10.3</version>
<configuration>
<mainClass>lion.Main</mainClass>
<imageName>storageNode</imageName>
<buildArgs>
<arg>-H:+ReportExceptionStackTraces</arg>
<arg>--gc=G1</arg>
<arg>--enable-url-protocols=https</arg>
<arg>-H:IncludeResources="simplelogger.properties"</arg>
<arg>--initialize-at-build-time=org.slf4j.simple.SimpleLogger,org.slf4j.simple.SimpleLoggerFactory,org.slf4j.simple.SimpleLoggerConfiguration</arg>
<arg>-H:IncludeResources="logback.xml"</arg>
<arg>--initialize-at-build-time=ch.qos.logback.classic,ch.qos.logback.core,ch.qos.logback.classic.pattern,ch.qos.logback.core.pattern</arg>
<arg>-H:ReflectionConfigurationFiles=src/main/resources/reflect-config.json</arg>
</buildArgs>
<metadataRepository>
<enabled>true</enabled>
</metadataRepository>
</configuration>
</plugin>
</plugins>

View File

@ -5,6 +5,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Config {
public static String DouNaiV2ray;
public static String DouNaiClash;
@ -17,7 +20,7 @@ public class Config {
DouNaiV2ray = prop.getProperty("DouNaiV2ray");
DouNaiClash = prop.getProperty("DouNaiClash");
} catch (IOException ex) {
ex.printStackTrace();
log.error("加载配置失败:{}", ex.getMessage());
}
}
}

View File

@ -3,18 +3,16 @@ package lion;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.*;
import java.net.ServerSocket;
import java.util.concurrent.atomic.AtomicInteger;
import java.net.Socket;
import java.net.SocketException;
@Data
@Slf4j
public class CustomUtil {
public static AtomicInteger counter = new AtomicInteger();
public static ObjectMapper objectMapper = new ObjectMapper();
public static void notifyMe(String message) {
@ -31,11 +29,66 @@ public class CustomUtil {
public static int _findIdlePort(int port) {
for(int i=port; i<65535; i++){
try(ServerSocket ignored = new ServerSocket(i)){
ignored.close();
return i;
return i;
}catch (IOException ignored) {
}
}
return -1;
}
}
public static String getRequestHeader(BufferedReader requestReader) throws IOException {
String line;
while ((line = requestReader.readLine()) != null) {
if (line.trim().isEmpty()) {
break;
}
if (line.startsWith("Range:")) {
return line.substring("Range".length() + 1).trim();
}
}
return null;
}
public static void sendErrorResponse(Socket clientSocket, String statusCode) throws IOException {
OutputStream responseStream = clientSocket.getOutputStream();
PrintWriter responseWriter = new PrintWriter(responseStream, true);
responseWriter.println("HTTP/1.1 " + statusCode);
responseWriter.println("Content-Type: text/html");
responseWriter.println();
responseWriter.println("<h1>" + statusCode + "</h1>");
responseStream.close();
}
public static void sendFileRange(Socket clientSocket, File file, long startByte, long endByte) throws IOException {
sendFileRange(clientSocket, file, startByte, endByte, null);
}
public static void sendFileRange(Socket clientSocket, File file, long startByte, long endByte, String contentDispositionFileName) throws IOException {
long fileLength = file.length();
OutputStream responseStream = clientSocket.getOutputStream();
PrintWriter responseWriter = new PrintWriter(responseStream, true);
responseWriter.println("HTTP/1.1 206 Partial Content");
responseWriter.println("Content-Type: application/octet-stream");
responseWriter.println("Accept-Ranges: bytes");
responseWriter.println("Content-Length: " + (endByte - startByte + 1));
responseWriter.println("Content-Range: bytes " + startByte + "-" + endByte + "/" + fileLength);
if (contentDispositionFileName != null) {
responseWriter.println("Content-Disposition: attachment; filename=\"" + contentDispositionFileName + "\"");
}
responseWriter.println();
try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
randomAccessFile.seek(startByte);
byte[] buffer = new byte[1024];
int bytesRead;
long bytesRemaining = endByte - startByte + 1;
while (bytesRemaining > 0 && (bytesRead = randomAccessFile.read(buffer, 0, (int) Math.min(buffer.length, bytesRemaining))) != -1) {
responseStream.write(buffer, 0, bytesRead);
bytesRemaining -= bytesRead;
}
} catch (SocketException ignore) {
} finally {
responseStream.close();
}
}
}

View File

@ -6,10 +6,10 @@ import lombok.Data;
@Data
public class GalleryTask {
public static byte DOWNLOADING = 1;
public static byte DOWNLOAD_COMPLETE = 2;
public static byte COMPRESSING = 3;
public static byte COMPRESS_COMPLETE = 4;
public static final byte DOWNLOADING = 1;
public static final byte DOWNLOAD_COMPLETE = 2;
public static final byte COMPRESSING = 3;
public static final byte COMPRESS_COMPLETE = 4;
@JsonInclude(JsonInclude.Include.NON_NULL)
private String name;

View File

@ -12,12 +12,12 @@ import java.net.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lion.CustomUtil;
import static lion.Config.Config.DouNaiClash;
import static lion.Config.Config.DouNaiV2ray;
@ -36,7 +36,6 @@ public class BackupSubServer {
Socket clientSocket = serverSocket.accept();
ip = clientSocket.getInetAddress().getHostAddress();
log.info("Client connected:{}", ip);
// 线程池处理下载请求
handleClientRequest(clientSocket);
}
} catch (IOException e) {
@ -72,7 +71,6 @@ public class BackupSubServer {
Matcher matcher = pattern.matcher(name.substring(name.indexOf("(") + 1, name.indexOf(")")));
if (matcher.find()) {
// 将匹配到的数字添加到列表中
float ratio = Float.parseFloat(matcher.group());
if(ratio <= 2) {
stringBuilder.append(node).append("\n");
@ -127,26 +125,22 @@ public class BackupSubServer {
public static ArrayList<String> Get(String url) throws IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse httpResponse;
HttpGet httpGet = new HttpGet(url);
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet(url);
try (CloseableHttpResponse httpResponse = httpClient.execute(httpGet)) {
HttpEntity responseEntity = httpResponse.getEntity();
int statusCode = httpResponse.getStatusLine().getStatusCode();
ArrayList<String> temp = new ArrayList<>();
httpResponse = httpClient.execute(httpGet);
HttpEntity responseEntity = httpResponse.getEntity();
int statusCode = httpResponse.getStatusLine().getStatusCode();
ArrayList<String> temp = new ArrayList<>();
if (statusCode == 200) {
BufferedReader reader = new BufferedReader(new InputStreamReader(responseEntity.getContent()));
String str;
while ((str = reader.readLine()) != null)
temp.add(str);
if (statusCode == 200) {
BufferedReader reader = new BufferedReader(new InputStreamReader(responseEntity.getContent()));
String str;
while ((str = reader.readLine()) != null)
temp.add(str);
}
return temp;
}
}
httpClient.close();
httpResponse.close();
return temp;
}
private static void handleClientRequest(Socket clientSocket) {
@ -160,7 +154,7 @@ public class BackupSubServer {
String method = requestParts[0];
Map<String, String> paramMap = parseRequestLine(requestParts[1]);//path
if(paramMap == null){
sendErrorResponse(clientSocket, "404");
CustomUtil.sendErrorResponse(clientSocket, "404");
return;
}
log.info(Arrays.toString(requestParts));
@ -183,7 +177,7 @@ public class BackupSubServer {
// Get the range information for resuming download
long startByte = 0;
long endByte = fileLength - 1;
String rangeHeader = getRequestHeader(requestReader);
String rangeHeader = CustomUtil.getRequestHeader(requestReader);
if (rangeHeader != null && rangeHeader.startsWith("bytes=")) {
String[] rangeValues = rangeHeader.substring(6).split("-");
startByte = Long.parseLong(rangeValues[0]);
@ -192,39 +186,14 @@ public class BackupSubServer {
}
}
// Send the HTTP response headers
OutputStream responseStream = clientSocket.getOutputStream();
PrintWriter responseWriter = new PrintWriter(responseStream, true);
responseWriter.println("HTTP/1.1 206 Partial Content");
responseWriter.println("Content-Type: application/octet-stream");
responseWriter.println("Accept-Ranges: bytes");
responseWriter.println("Content-Length: " + (endByte - startByte + 1));
responseWriter.println("Content-Range: bytes " + startByte + "-" + endByte + "/" + fileLength);
responseWriter.println();
// Send the file content
try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
randomAccessFile.seek(startByte);
byte[] buffer = new byte[1024];
int bytesRead;
long bytesRemaining = endByte - startByte + 1;
while (bytesRemaining > 0 && (bytesRead = randomAccessFile.read(buffer, 0, (int) Math.min(buffer.length, bytesRemaining))) != -1) {
responseStream.write(buffer, 0, bytesRead);
bytesRemaining -= bytesRead;
}
}catch (SocketException ignore){
}
// Close the response output stream
responseStream.close();
CustomUtil.sendFileRange(clientSocket, file, startByte, endByte);
} else {
// File not found or not readable, send 404 response
sendErrorResponse(clientSocket, "404 Not Found");
CustomUtil.sendErrorResponse(clientSocket, "404 Not Found");
}
} else {
// Non-GET requests, send 501 response
sendErrorResponse(clientSocket, "501 Not Implemented");
CustomUtil.sendErrorResponse(clientSocket, "501 Not Implemented");
}
// Close the request reader and client socket
@ -235,20 +204,6 @@ public class BackupSubServer {
}
}
private static String getRequestHeader(BufferedReader requestReader) throws IOException {
String line;
while ((line = requestReader.readLine()) != null) {
if (line.trim().isEmpty()) {
break;
}
if (line.startsWith("Range" + ":")) {
return line.substring("Range".length() + 1).trim();
}
}
return null;
}
public static Map<String, String> parseRequestLine(String requestLine) {
Map<String, String> pathParams = new HashMap<>();
@ -277,14 +232,4 @@ public class BackupSubServer {
return pathParams;
}
private static void sendErrorResponse(Socket clientSocket, String statusCode) throws IOException {
OutputStream responseStream = clientSocket.getOutputStream();
PrintWriter responseWriter = new PrintWriter(responseStream, true);
responseWriter.println("HTTP/1.1 " + statusCode);
responseWriter.println("Content-Type: text/html");
responseWriter.println();
responseWriter.println("<h1>" + statusCode + "</h1>");
responseStream.close();
}
}
}

View File

@ -25,9 +25,13 @@ public class MessageCodec extends ByteToMessageCodec<AbstractMessage> {
protected void encode(ChannelHandlerContext channelHandlerContext, AbstractMessage abstractMessage, ByteBuf byteBuf) {
byteBuf.writeByte(abstractMessage.messageType);
byte[] bytes = objectMapper.valueToTree(abstractMessage).toString().getBytes(StandardCharsets.UTF_8);
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
try {
byte[] bytes = objectMapper.writeValueAsBytes(abstractMessage);
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
} catch (Exception e) {
log.error("序列化消息失败:{}", e.getMessage());
}
}
@Override

View File

@ -10,10 +10,10 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class MultiThreadedHTTPServer {
private static final int PORT = 8888;
private static final int BUFFER_SIZE = 1024;
public static void main(String[] args) {
ExecutorService threadPool = Executors.newCachedThreadPool();
@ -30,13 +30,11 @@ public class MultiThreadedHTTPServer {
String ip = clientSocket.getInetAddress().getHostAddress();
if(ip.equals(real_ip)){
log.info("Client connected");
// 线程池处理下载请求
threadPool.submit(() -> handleClientRequest(clientSocket));
}else{
log.info("unknown ip: " + ip);
clientSocket.close();
}
}
} catch (IOException e) {
log.error("处理http请求时出错,IP:{},ERROR:{}", real_ip, e.getMessage());
@ -49,6 +47,11 @@ public class MultiThreadedHTTPServer {
BufferedReader requestReader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
String requestLine = requestReader.readLine();
if (requestLine == null) {
clientSocket.close();
return;
}
// Parse the request line to get the method and path
String[] requestParts = requestLine.split(" ");
String method = requestParts[0];
@ -97,7 +100,7 @@ public class MultiThreadedHTTPServer {
}
}
else{
sendErrorResponse(clientSocket, "403 Forbidden");
CustomUtil.sendErrorResponse(clientSocket, "403 Forbidden");
return;
}
fileName = file.getName();
@ -110,7 +113,7 @@ public class MultiThreadedHTTPServer {
// Get the range information for resuming download
long startByte = 0;
long endByte = fileLength - 1;
String rangeHeader = getRequestHeader(requestReader);
String rangeHeader = CustomUtil.getRequestHeader(requestReader);
if (rangeHeader != null && rangeHeader.startsWith("bytes=")) {
String[] rangeValues = rangeHeader.substring(6).split("-");
startByte = Long.parseLong(rangeValues[0]);
@ -119,40 +122,14 @@ public class MultiThreadedHTTPServer {
}
}
// Send the HTTP response headers
OutputStream responseStream = clientSocket.getOutputStream();
PrintWriter responseWriter = new PrintWriter(responseStream, true);
responseWriter.println("HTTP/1.1 206 Partial Content");
responseWriter.println("Content-Type: application/octet-stream");
responseWriter.println("Accept-Ranges: bytes");
responseWriter.println("Content-Length: " + (endByte - startByte + 1));
responseWriter.println("Content-Range: bytes " + startByte + "-" + endByte + "/" + fileLength);
responseWriter.println("Content-Disposition: attachment; filename=\"" + fileName + "\"");
responseWriter.println();
// Send the file content
try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
randomAccessFile.seek(startByte);
byte[] buffer = new byte[BUFFER_SIZE];
int bytesRead;
long bytesRemaining = endByte - startByte + 1;
while (bytesRemaining > 0 && (bytesRead = randomAccessFile.read(buffer, 0, (int) Math.min(buffer.length, bytesRemaining))) != -1) {
responseStream.write(buffer, 0, bytesRead);
bytesRemaining -= bytesRead;
}
}catch (SocketException ignore){
}
// Close the response output stream
responseStream.close();
CustomUtil.sendFileRange(clientSocket, file, startByte, endByte, fileName);
} else {
// File not found or not readable, send 404 response
sendErrorResponse(clientSocket, "404 Not Found");
CustomUtil.sendErrorResponse(clientSocket, "404 Not Found");
}
} else {
// Non-GET requests, send 501 response
sendErrorResponse(clientSocket, "501 Not Implemented");
CustomUtil.sendErrorResponse(clientSocket, "501 Not Implemented");
}
// Close the request reader and client socket
@ -163,20 +140,6 @@ public class MultiThreadedHTTPServer {
}
}
private static String getRequestHeader(BufferedReader requestReader) throws IOException {
String line;
while ((line = requestReader.readLine()) != null) {
if (line.trim().isEmpty()) {
break;
}
if (line.startsWith("Range" + ":")) {
return line.substring("Range".length() + 1).trim();
}
}
return null;
}
public static Map<String, String> parseRequestLine(String requestLine) {
Map<String, String> queryParams = new HashMap<>();
@ -206,14 +169,4 @@ public class MultiThreadedHTTPServer {
}
return queryParams;
}
private static void sendErrorResponse(Socket clientSocket, String statusCode) throws IOException {
OutputStream responseStream = clientSocket.getOutputStream();
PrintWriter responseWriter = new PrintWriter(responseStream, true);
responseWriter.println("HTTP/1.1 " + statusCode);
responseWriter.println("Content-Type: text/html");
responseWriter.println();
responseWriter.println("<h1>" + statusCode + "</h1>");
responseStream.close();
}
}

View File

@ -1,26 +1,17 @@
package lion.Service;
import io.netty.channel.Channel;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Promise;
import lion.CustomUtil;
import lion.Domain.GalleryTask;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.ZipUtil;
import lion.Message.AbstractMessage;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
@Data
public class DownloadCheckService {
Map<Integer, GalleryTask> queue;
@ -30,18 +21,10 @@ public class DownloadCheckService {
ScheduledThreadPoolExecutor convert_thread;
ArrayList<GalleryTask> compress_queue;
final ArrayList<GalleryTask> compress_queue;
Channel node;
HashMap<Integer, Promise<AbstractMessage>> promises;
EventLoop eventLoop;
public DownloadCheckService(Map<Integer, GalleryTask> queue, HashMap<Integer, Promise<AbstractMessage>> promises){
public DownloadCheckService(Map<Integer, GalleryTask> queue){
this.queue = queue;
this.promises = promises;
eventLoop = new DefaultEventLoop();
compress_queue = new ArrayList<>(0);
convert_thread = new ScheduledThreadPoolExecutor(1);
convert_thread.scheduleAtFixedRate(this::compress, 0, 5, TimeUnit.SECONDS);
@ -100,7 +83,9 @@ public class DownloadCheckService {
for(GalleryTask galleryTask: queue.values())
if (galleryTask.is_download_complete()) {
galleryTask.setStatus(GalleryTask.COMPRESSING);
compress_queue.add(galleryTask);
synchronized (compress_queue) {
compress_queue.add(galleryTask);
}
}
return result;
@ -112,62 +97,51 @@ public class DownloadCheckService {
public void compress() {
if(compress_queue.isEmpty())
return;
ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
ArrayList<GalleryTask> galleryTasks = new ArrayList<>(compress_queue);
compress_queue.clear();
reentrantLock.unlock();
for (GalleryTask galleryTask : galleryTasks) {
try {
log.info("开始压缩:{}", galleryTask.getName());
//创建文件夹
File file = new File(storagePath + galleryTask.getName());
if (file.isDirectory() || file.mkdirs()) {
log.info("{}文件夹创建成功", galleryTask.getName());
} else {
log.error("{}文件夹创建失败", galleryTask.getName());
continue;
}
//生成压缩包
ZipUtil.zip(galleryTask.getPath(), storagePath + galleryTask.getName() + "/" + galleryTask.getName() + ".zip");
log.info("{}压缩完成", galleryTask.getName());
FileUtil.del(galleryTask.getPath());
galleryTask.setStatus(GalleryTask.COMPRESS_COMPLETE);
} catch (Exception e){
log.error("{}压缩失败:{}", galleryTask, e.getMessage());
ArrayList<GalleryTask> galleryTasks;
synchronized (compress_queue) {
galleryTasks = new ArrayList<>(compress_queue);
compress_queue.clear();
}
for (GalleryTask galleryTask : galleryTasks) {
try {
log.info("开始压缩:{}", galleryTask.getName());
File file = new File(storagePath + galleryTask.getName());
if (file.isDirectory() || file.mkdirs()) {
log.info("{}文件夹创建成功", galleryTask.getName());
} else {
log.error("{}文件夹创建失败", galleryTask.getName());
continue;
}
ZipUtil.zip(galleryTask.getPath(), storagePath + galleryTask.getName() + "/" + galleryTask.getName() + ".zip");
log.info("{}压缩完成", galleryTask.getName());
FileUtil.del(galleryTask.getPath());
galleryTask.setStatus(GalleryTask.COMPRESS_COMPLETE);
} catch (Exception e){
log.error("{}压缩失败:{}", galleryTask, e.getMessage());
}
}
}
/**
* 检查改任务是否为已完成任务如已完成则返回true若未完成则加入队列
* @return true if compress complete, false otherwise
*/
public boolean addToQueue(GalleryTask galleryTask){
//是否含有名字进行中任务一般有名字没有名字则肯定为初始任务存在名字至少在下载路径出现过
if(galleryTask.getName() == null || galleryTask.getName().isEmpty()){
queue.putIfAbsent(galleryTask.getGid(), galleryTask);
return false;
}
//查询hah下载路径中是否存在该任务下载路径存在则为下载中或下载完成任务加入队列
if(new File(downloadPath + galleryTask.getGid()).isDirectory()){
queue.putIfAbsent(galleryTask.getGid(), galleryTask);
return false;
}
//查询存放路径中是否含有该任务的压缩包存在则为下载完成任务
if(new File(storagePath + galleryTask.getName() + "/" + galleryTask.getName() + ".zip").exists()){
galleryTask.setStatus(GalleryTask.COMPRESS_COMPLETE);
CustomUtil.notifyMe(String.format("任务:%s在添加时已下载完成更新任务状态", galleryTask.getName()));
return true;
}
queue.putIfAbsent(galleryTask.getGid(), galleryTask);
return false;
}
}
}

View File

@ -1,6 +1,5 @@
package lion;
import io.netty.util.concurrent.Promise;
import lion.Domain.GalleryTask;
import lion.Message.*;
import lion.Message.Main.*;
@ -14,7 +13,6 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.HashMap;
@ -25,8 +23,6 @@ import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class storageNode {
ChannelFuture channelFuture;
Channel server;
Channel node;
@ -39,10 +35,6 @@ public class storageNode {
ScheduledExecutorService checkThreadPool;
HashMap<Integer, Promise<AbstractMessage>> promises;
int counter;
ReentrantLock lock;
public static String storagePath = "/root/gallery/gallery/";
@ -51,12 +43,10 @@ public class storageNode {
queue = new HashMap<>();
tempQueue = new HashMap<>();
lock = new ReentrantLock();
counter = 0;
promises = new HashMap<>();
int real_port = CustomUtil._findIdlePort(26321);
channelFuture = new ServerBootstrap()
new ServerBootstrap()
.channel(NioServerSocketChannel.class)
.group(new NioEventLoopGroup())
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@ -85,19 +75,22 @@ public class storageNode {
if (i==20) {
log.info("server connect failed");
}
downloadCheckService = new DownloadCheckService(queue, promises);
downloadCheckService = new DownloadCheckService(queue);
checkThreadPool = Executors.newScheduledThreadPool(1);
checkThreadPool.scheduleAtFixedRate(this::mainThread, 5, 5, TimeUnit.SECONDS);
}
public void mainThread(){
try {
lock.lock();
if(!tempQueue.isEmpty()){
queue.putAll(tempQueue);
tempQueue.clear();
try {
if(!tempQueue.isEmpty()){
queue.putAll(tempQueue);
tempQueue.clear();
}
} finally {
lock.unlock();
}
lock.unlock();
//检查当任务状态发生变化即方法返回true时再更新否则return
if (!downloadCheckService.downloadCheck()) {
boolean isSkip = true;
@ -120,20 +113,23 @@ public class storageNode {
//发送
//上锁后再发送避免出现发送完之后再下载完成
lock.lock();
DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage();
downloadStatusMessage.setGalleryTasks(queue.values().toArray(GalleryTask[]::new));
server.writeAndFlush(downloadStatusMessage);
try {
DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage();
downloadStatusMessage.setGalleryTasks(queue.values().toArray(GalleryTask[]::new));
server.writeAndFlush(downloadStatusMessage);
queue.entrySet().removeIf(entry -> entry.getValue().is_compress_complete());
log.info("任务状态发送完成");
lock.unlock();
queue.entrySet().removeIf(entry -> entry.getValue().is_compress_complete());
log.info("任务状态发送完成");
} finally {
lock.unlock();
}
}catch (Exception e){
log.error("发送任务状态时发生异常:{}", e.getMessage());
}
}
int counter;
class MyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{
Map<Integer, GalleryTask> queue;
@ -155,20 +151,22 @@ public class storageNode {
} else if(identityMessage.getIdentity().equals("lionwebsiteside")){
node = ctx.channel();
log.info("node上线");
downloadCheckService.setNode(node);
}
}
case AbstractMessage.DOWNLOAD_POST_MESSAGE -> {
DownloadPostMessage dpm = (DownloadPostMessage) abstractMessage;
lock.lock();
//添加到队列方法返回真说明该任务已下载完成直接发送下载进度
if(downloadCheckService.addToQueue(dpm.getGalleryTask())){
DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage();
downloadStatusMessage.setGalleryTasks(new GalleryTask[]{dpm.getGalleryTask()});
server.writeAndFlush(downloadStatusMessage);
try {
//添加到队列方法返回真说明该任务已下载完成直接发送下载进度
if(downloadCheckService.addToQueue(dpm.getGalleryTask())){
DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage();
downloadStatusMessage.setGalleryTasks(new GalleryTask[]{dpm.getGalleryTask()});
server.writeAndFlush(downloadStatusMessage);
}
log.info(String.valueOf(queue));
} finally {
lock.unlock();
}
log.info(String.valueOf(queue));
lock.unlock();
ctx.writeAndFlush(new ResponseMessage(dpm.messageId, (byte) 0));
}
case AbstractMessage.DELETE_GALLERY_MESSAGE -> {
@ -197,9 +195,7 @@ public class storageNode {
} else if(ctx.channel().equals(node)){
log.info("node 下线");
node = null;
downloadCheckService.setNode(null);
}
}
}
}
}

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>run.out</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>run.%i.out</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>10MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%level] %msg%n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="FILE"/>
</root>
</configuration>

View File

@ -1,9 +0,0 @@
org.slf4j.simpleLogger.defaultLogLevel=info
org.slf4j.simpleLogger.showDateTime=true
org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss
org.slf4j.simpleLogger.logFile=run.out