diff --git a/pom.xml b/pom.xml index eb53fe3..4c89862 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,7 @@ org.projectlombok lombok - 1.18.30 + 1.18.40 @@ -41,9 +41,9 @@ - org.slf4j - slf4j-simple - 2.0.7 + ch.qos.logback + logback-classic + 1.4.14 @@ -68,6 +68,20 @@ + + org.apache.maven.plugins + maven-compiler-plugin + 3.13.0 + + + + org.projectlombok + lombok + 1.18.40 + + + + @@ -94,21 +108,21 @@ org.graalvm.buildtools native-maven-plugin - 0.9.28 + 0.10.3 lion.Main storageNode -H:+ReportExceptionStackTraces - --gc=G1 --enable-url-protocols=https - -H:IncludeResources="simplelogger.properties" - --initialize-at-build-time=org.slf4j.simple.SimpleLogger,org.slf4j.simple.SimpleLoggerFactory,org.slf4j.simple.SimpleLoggerConfiguration + -H:IncludeResources="logback.xml" + --initialize-at-build-time=ch.qos.logback.classic,ch.qos.logback.core,ch.qos.logback.classic.pattern,ch.qos.logback.core.pattern -H:ReflectionConfigurationFiles=src/main/resources/reflect-config.json true + diff --git a/src/main/java/lion/Config/Config.java b/src/main/java/lion/Config/Config.java index 8462194..441c055 100644 --- a/src/main/java/lion/Config/Config.java +++ b/src/main/java/lion/Config/Config.java @@ -5,6 +5,9 @@ import java.io.IOException; import java.io.InputStream; import java.util.Properties; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class Config { public static String DouNaiV2ray; public static String DouNaiClash; @@ -17,7 +20,7 @@ public class Config { DouNaiV2ray = prop.getProperty("DouNaiV2ray"); DouNaiClash = prop.getProperty("DouNaiClash"); } catch (IOException ex) { - ex.printStackTrace(); + log.error("加载配置失败:{}", ex.getMessage()); } } } diff --git a/src/main/java/lion/CustomUtil.java b/src/main/java/lion/CustomUtil.java index 07b71ee..db3fc86 100644 --- a/src/main/java/lion/CustomUtil.java +++ b/src/main/java/lion/CustomUtil.java @@ -3,18 +3,16 @@ package lion; import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpResponse; import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.Data; +import lombok.extern.slf4j.Slf4j; -import java.io.IOException; +import java.io.*; import java.net.ServerSocket; -import java.util.concurrent.atomic.AtomicInteger; +import java.net.Socket; +import java.net.SocketException; - -@Data +@Slf4j public class CustomUtil { - public static AtomicInteger counter = new AtomicInteger(); - public static ObjectMapper objectMapper = new ObjectMapper(); public static void notifyMe(String message) { @@ -31,11 +29,66 @@ public class CustomUtil { public static int _findIdlePort(int port) { for(int i=port; i<65535; i++){ try(ServerSocket ignored = new ServerSocket(i)){ - ignored.close(); - return i; + return i; }catch (IOException ignored) { } } return -1; } -} + + public static String getRequestHeader(BufferedReader requestReader) throws IOException { + String line; + while ((line = requestReader.readLine()) != null) { + if (line.trim().isEmpty()) { + break; + } + if (line.startsWith("Range:")) { + return line.substring("Range".length() + 1).trim(); + } + } + return null; + } + + public static void sendErrorResponse(Socket clientSocket, String statusCode) throws IOException { + OutputStream responseStream = clientSocket.getOutputStream(); + PrintWriter responseWriter = new PrintWriter(responseStream, true); + responseWriter.println("HTTP/1.1 " + statusCode); + responseWriter.println("Content-Type: text/html"); + responseWriter.println(); + responseWriter.println("

" + statusCode + "

"); + responseStream.close(); + } + + public static void sendFileRange(Socket clientSocket, File file, long startByte, long endByte) throws IOException { + sendFileRange(clientSocket, file, startByte, endByte, null); + } + + public static void sendFileRange(Socket clientSocket, File file, long startByte, long endByte, String contentDispositionFileName) throws IOException { + long fileLength = file.length(); + OutputStream responseStream = clientSocket.getOutputStream(); + PrintWriter responseWriter = new PrintWriter(responseStream, true); + responseWriter.println("HTTP/1.1 206 Partial Content"); + responseWriter.println("Content-Type: application/octet-stream"); + responseWriter.println("Accept-Ranges: bytes"); + responseWriter.println("Content-Length: " + (endByte - startByte + 1)); + responseWriter.println("Content-Range: bytes " + startByte + "-" + endByte + "/" + fileLength); + if (contentDispositionFileName != null) { + responseWriter.println("Content-Disposition: attachment; filename=\"" + contentDispositionFileName + "\""); + } + responseWriter.println(); + + try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { + randomAccessFile.seek(startByte); + byte[] buffer = new byte[1024]; + int bytesRead; + long bytesRemaining = endByte - startByte + 1; + while (bytesRemaining > 0 && (bytesRead = randomAccessFile.read(buffer, 0, (int) Math.min(buffer.length, bytesRemaining))) != -1) { + responseStream.write(buffer, 0, bytesRead); + bytesRemaining -= bytesRead; + } + } catch (SocketException ignore) { + } finally { + responseStream.close(); + } + } +} \ No newline at end of file diff --git a/src/main/java/lion/Domain/GalleryTask.java b/src/main/java/lion/Domain/GalleryTask.java index 22aa4a8..0c96818 100644 --- a/src/main/java/lion/Domain/GalleryTask.java +++ b/src/main/java/lion/Domain/GalleryTask.java @@ -6,10 +6,10 @@ import lombok.Data; @Data public class GalleryTask { - public static byte DOWNLOADING = 1; - public static byte DOWNLOAD_COMPLETE = 2; - public static byte COMPRESSING = 3; - public static byte COMPRESS_COMPLETE = 4; + public static final byte DOWNLOADING = 1; + public static final byte DOWNLOAD_COMPLETE = 2; + public static final byte COMPRESSING = 3; + public static final byte COMPRESS_COMPLETE = 4; @JsonInclude(JsonInclude.Include.NON_NULL) private String name; diff --git a/src/main/java/lion/Externel/BackupSubServer.java b/src/main/java/lion/Externel/BackupSubServer.java index ba4611c..6246377 100644 --- a/src/main/java/lion/Externel/BackupSubServer.java +++ b/src/main/java/lion/Externel/BackupSubServer.java @@ -12,12 +12,12 @@ import java.net.*; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; -import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.*; import java.util.regex.Matcher; import java.util.regex.Pattern; +import lion.CustomUtil; import static lion.Config.Config.DouNaiClash; import static lion.Config.Config.DouNaiV2ray; @@ -36,7 +36,6 @@ public class BackupSubServer { Socket clientSocket = serverSocket.accept(); ip = clientSocket.getInetAddress().getHostAddress(); log.info("Client connected:{}", ip); - // 线程池处理下载请求 handleClientRequest(clientSocket); } } catch (IOException e) { @@ -72,7 +71,6 @@ public class BackupSubServer { Matcher matcher = pattern.matcher(name.substring(name.indexOf("(") + 1, name.indexOf(")"))); if (matcher.find()) { - // 将匹配到的数字添加到列表中 float ratio = Float.parseFloat(matcher.group()); if(ratio <= 2) { stringBuilder.append(node).append("\n"); @@ -127,26 +125,22 @@ public class BackupSubServer { public static ArrayList Get(String url) throws IOException { - CloseableHttpClient httpClient = HttpClients.createDefault(); - CloseableHttpResponse httpResponse; - HttpGet httpGet = new HttpGet(url); + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(url); + try (CloseableHttpResponse httpResponse = httpClient.execute(httpGet)) { + HttpEntity responseEntity = httpResponse.getEntity(); + int statusCode = httpResponse.getStatusLine().getStatusCode(); + ArrayList temp = new ArrayList<>(); - httpResponse = httpClient.execute(httpGet); - - HttpEntity responseEntity = httpResponse.getEntity(); - int statusCode = httpResponse.getStatusLine().getStatusCode(); - ArrayList temp = new ArrayList<>(); - - if (statusCode == 200) { - BufferedReader reader = new BufferedReader(new InputStreamReader(responseEntity.getContent())); - String str; - while ((str = reader.readLine()) != null) - temp.add(str); + if (statusCode == 200) { + BufferedReader reader = new BufferedReader(new InputStreamReader(responseEntity.getContent())); + String str; + while ((str = reader.readLine()) != null) + temp.add(str); + } + return temp; + } } - - httpClient.close(); - httpResponse.close(); - return temp; } private static void handleClientRequest(Socket clientSocket) { @@ -160,7 +154,7 @@ public class BackupSubServer { String method = requestParts[0]; Map paramMap = parseRequestLine(requestParts[1]);//path if(paramMap == null){ - sendErrorResponse(clientSocket, "404"); + CustomUtil.sendErrorResponse(clientSocket, "404"); return; } log.info(Arrays.toString(requestParts)); @@ -183,7 +177,7 @@ public class BackupSubServer { // Get the range information for resuming download long startByte = 0; long endByte = fileLength - 1; - String rangeHeader = getRequestHeader(requestReader); + String rangeHeader = CustomUtil.getRequestHeader(requestReader); if (rangeHeader != null && rangeHeader.startsWith("bytes=")) { String[] rangeValues = rangeHeader.substring(6).split("-"); startByte = Long.parseLong(rangeValues[0]); @@ -192,39 +186,14 @@ public class BackupSubServer { } } - // Send the HTTP response headers - OutputStream responseStream = clientSocket.getOutputStream(); - PrintWriter responseWriter = new PrintWriter(responseStream, true); - responseWriter.println("HTTP/1.1 206 Partial Content"); - responseWriter.println("Content-Type: application/octet-stream"); - responseWriter.println("Accept-Ranges: bytes"); - responseWriter.println("Content-Length: " + (endByte - startByte + 1)); - responseWriter.println("Content-Range: bytes " + startByte + "-" + endByte + "/" + fileLength); - responseWriter.println(); - - // Send the file content - try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { - randomAccessFile.seek(startByte); - byte[] buffer = new byte[1024]; - int bytesRead; - long bytesRemaining = endByte - startByte + 1; - while (bytesRemaining > 0 && (bytesRead = randomAccessFile.read(buffer, 0, (int) Math.min(buffer.length, bytesRemaining))) != -1) { - responseStream.write(buffer, 0, bytesRead); - bytesRemaining -= bytesRead; - } - }catch (SocketException ignore){ - - } - - // Close the response output stream - responseStream.close(); + CustomUtil.sendFileRange(clientSocket, file, startByte, endByte); } else { // File not found or not readable, send 404 response - sendErrorResponse(clientSocket, "404 Not Found"); + CustomUtil.sendErrorResponse(clientSocket, "404 Not Found"); } } else { // Non-GET requests, send 501 response - sendErrorResponse(clientSocket, "501 Not Implemented"); + CustomUtil.sendErrorResponse(clientSocket, "501 Not Implemented"); } // Close the request reader and client socket @@ -235,20 +204,6 @@ public class BackupSubServer { } } - private static String getRequestHeader(BufferedReader requestReader) throws IOException { - String line; - while ((line = requestReader.readLine()) != null) { - if (line.trim().isEmpty()) { - break; - } - - if (line.startsWith("Range" + ":")) { - return line.substring("Range".length() + 1).trim(); - } - } - return null; - } - public static Map parseRequestLine(String requestLine) { Map pathParams = new HashMap<>(); @@ -277,14 +232,4 @@ public class BackupSubServer { return pathParams; } - - private static void sendErrorResponse(Socket clientSocket, String statusCode) throws IOException { - OutputStream responseStream = clientSocket.getOutputStream(); - PrintWriter responseWriter = new PrintWriter(responseStream, true); - responseWriter.println("HTTP/1.1 " + statusCode); - responseWriter.println("Content-Type: text/html"); - responseWriter.println(); - responseWriter.println("

" + statusCode + "

"); - responseStream.close(); - } -} +} \ No newline at end of file diff --git a/src/main/java/lion/Message/MessageCodec.java b/src/main/java/lion/Message/MessageCodec.java index 7106a95..d1f5689 100644 --- a/src/main/java/lion/Message/MessageCodec.java +++ b/src/main/java/lion/Message/MessageCodec.java @@ -25,9 +25,13 @@ public class MessageCodec extends ByteToMessageCodec { protected void encode(ChannelHandlerContext channelHandlerContext, AbstractMessage abstractMessage, ByteBuf byteBuf) { byteBuf.writeByte(abstractMessage.messageType); - byte[] bytes = objectMapper.valueToTree(abstractMessage).toString().getBytes(StandardCharsets.UTF_8); - byteBuf.writeInt(bytes.length); - byteBuf.writeBytes(bytes); + try { + byte[] bytes = objectMapper.writeValueAsBytes(abstractMessage); + byteBuf.writeInt(bytes.length); + byteBuf.writeBytes(bytes); + } catch (Exception e) { + log.error("序列化消息失败:{}", e.getMessage()); + } } @Override diff --git a/src/main/java/lion/MultiThreadedHTTPServer.java b/src/main/java/lion/MultiThreadedHTTPServer.java index a7d33eb..1ef84c7 100644 --- a/src/main/java/lion/MultiThreadedHTTPServer.java +++ b/src/main/java/lion/MultiThreadedHTTPServer.java @@ -10,10 +10,10 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; + @Slf4j public class MultiThreadedHTTPServer { private static final int PORT = 8888; - private static final int BUFFER_SIZE = 1024; public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); @@ -30,13 +30,11 @@ public class MultiThreadedHTTPServer { String ip = clientSocket.getInetAddress().getHostAddress(); if(ip.equals(real_ip)){ log.info("Client connected"); - // 线程池处理下载请求 threadPool.submit(() -> handleClientRequest(clientSocket)); }else{ log.info("unknown ip: " + ip); clientSocket.close(); } - } } catch (IOException e) { log.error("处理http请求时出错,IP:{},ERROR:{}", real_ip, e.getMessage()); @@ -49,6 +47,11 @@ public class MultiThreadedHTTPServer { BufferedReader requestReader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); String requestLine = requestReader.readLine(); + if (requestLine == null) { + clientSocket.close(); + return; + } + // Parse the request line to get the method and path String[] requestParts = requestLine.split(" "); String method = requestParts[0]; @@ -97,7 +100,7 @@ public class MultiThreadedHTTPServer { } } else{ - sendErrorResponse(clientSocket, "403 Forbidden"); + CustomUtil.sendErrorResponse(clientSocket, "403 Forbidden"); return; } fileName = file.getName(); @@ -110,7 +113,7 @@ public class MultiThreadedHTTPServer { // Get the range information for resuming download long startByte = 0; long endByte = fileLength - 1; - String rangeHeader = getRequestHeader(requestReader); + String rangeHeader = CustomUtil.getRequestHeader(requestReader); if (rangeHeader != null && rangeHeader.startsWith("bytes=")) { String[] rangeValues = rangeHeader.substring(6).split("-"); startByte = Long.parseLong(rangeValues[0]); @@ -119,40 +122,14 @@ public class MultiThreadedHTTPServer { } } - // Send the HTTP response headers - OutputStream responseStream = clientSocket.getOutputStream(); - PrintWriter responseWriter = new PrintWriter(responseStream, true); - responseWriter.println("HTTP/1.1 206 Partial Content"); - responseWriter.println("Content-Type: application/octet-stream"); - responseWriter.println("Accept-Ranges: bytes"); - responseWriter.println("Content-Length: " + (endByte - startByte + 1)); - responseWriter.println("Content-Range: bytes " + startByte + "-" + endByte + "/" + fileLength); - responseWriter.println("Content-Disposition: attachment; filename=\"" + fileName + "\""); - responseWriter.println(); - - // Send the file content - try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { - randomAccessFile.seek(startByte); - byte[] buffer = new byte[BUFFER_SIZE]; - int bytesRead; - long bytesRemaining = endByte - startByte + 1; - while (bytesRemaining > 0 && (bytesRead = randomAccessFile.read(buffer, 0, (int) Math.min(buffer.length, bytesRemaining))) != -1) { - responseStream.write(buffer, 0, bytesRead); - bytesRemaining -= bytesRead; - } - }catch (SocketException ignore){ - - } - - // Close the response output stream - responseStream.close(); + CustomUtil.sendFileRange(clientSocket, file, startByte, endByte, fileName); } else { // File not found or not readable, send 404 response - sendErrorResponse(clientSocket, "404 Not Found"); + CustomUtil.sendErrorResponse(clientSocket, "404 Not Found"); } } else { // Non-GET requests, send 501 response - sendErrorResponse(clientSocket, "501 Not Implemented"); + CustomUtil.sendErrorResponse(clientSocket, "501 Not Implemented"); } // Close the request reader and client socket @@ -163,20 +140,6 @@ public class MultiThreadedHTTPServer { } } - private static String getRequestHeader(BufferedReader requestReader) throws IOException { - String line; - while ((line = requestReader.readLine()) != null) { - if (line.trim().isEmpty()) { - break; - } - - if (line.startsWith("Range" + ":")) { - return line.substring("Range".length() + 1).trim(); - } - } - return null; - } - public static Map parseRequestLine(String requestLine) { Map queryParams = new HashMap<>(); @@ -206,14 +169,4 @@ public class MultiThreadedHTTPServer { } return queryParams; } - - private static void sendErrorResponse(Socket clientSocket, String statusCode) throws IOException { - OutputStream responseStream = clientSocket.getOutputStream(); - PrintWriter responseWriter = new PrintWriter(responseStream, true); - responseWriter.println("HTTP/1.1 " + statusCode); - responseWriter.println("Content-Type: text/html"); - responseWriter.println(); - responseWriter.println("

" + statusCode + "

"); - responseStream.close(); - } } \ No newline at end of file diff --git a/src/main/java/lion/Service/DownloadCheckService.java b/src/main/java/lion/Service/DownloadCheckService.java index ed8152f..da4774f 100644 --- a/src/main/java/lion/Service/DownloadCheckService.java +++ b/src/main/java/lion/Service/DownloadCheckService.java @@ -1,26 +1,17 @@ package lion.Service; -import io.netty.channel.Channel; -import io.netty.channel.DefaultEventLoop; -import io.netty.channel.EventLoop; -import io.netty.util.concurrent.Promise; import lion.CustomUtil; import lion.Domain.GalleryTask; import cn.hutool.core.io.FileUtil; import cn.hutool.core.util.ZipUtil; -import lion.Message.AbstractMessage; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.io.*; import java.util.*; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; - @Slf4j -@Data public class DownloadCheckService { Map queue; @@ -30,18 +21,10 @@ public class DownloadCheckService { ScheduledThreadPoolExecutor convert_thread; - ArrayList compress_queue; + final ArrayList compress_queue; - Channel node; - - HashMap> promises; - - EventLoop eventLoop; - - public DownloadCheckService(Map queue, HashMap> promises){ + public DownloadCheckService(Map queue){ this.queue = queue; - this.promises = promises; - eventLoop = new DefaultEventLoop(); compress_queue = new ArrayList<>(0); convert_thread = new ScheduledThreadPoolExecutor(1); convert_thread.scheduleAtFixedRate(this::compress, 0, 5, TimeUnit.SECONDS); @@ -100,7 +83,9 @@ public class DownloadCheckService { for(GalleryTask galleryTask: queue.values()) if (galleryTask.is_download_complete()) { galleryTask.setStatus(GalleryTask.COMPRESSING); - compress_queue.add(galleryTask); + synchronized (compress_queue) { + compress_queue.add(galleryTask); + } } return result; @@ -112,62 +97,51 @@ public class DownloadCheckService { public void compress() { if(compress_queue.isEmpty()) return; - ReentrantLock reentrantLock = new ReentrantLock(); - reentrantLock.lock(); - ArrayList galleryTasks = new ArrayList<>(compress_queue); - compress_queue.clear(); - reentrantLock.unlock(); - for (GalleryTask galleryTask : galleryTasks) { - try { - log.info("开始压缩:{}", galleryTask.getName()); - //创建文件夹 - File file = new File(storagePath + galleryTask.getName()); - if (file.isDirectory() || file.mkdirs()) { - log.info("{}文件夹创建成功", galleryTask.getName()); - } else { - log.error("{}文件夹创建失败", galleryTask.getName()); - continue; - } - - //生成压缩包 - ZipUtil.zip(galleryTask.getPath(), storagePath + galleryTask.getName() + "/" + galleryTask.getName() + ".zip"); - log.info("{}压缩完成", galleryTask.getName()); - - FileUtil.del(galleryTask.getPath()); - galleryTask.setStatus(GalleryTask.COMPRESS_COMPLETE); - } catch (Exception e){ - log.error("{}压缩失败:{}", galleryTask, e.getMessage()); + ArrayList galleryTasks; + synchronized (compress_queue) { + galleryTasks = new ArrayList<>(compress_queue); + compress_queue.clear(); + } + for (GalleryTask galleryTask : galleryTasks) { + try { + log.info("开始压缩:{}", galleryTask.getName()); + File file = new File(storagePath + galleryTask.getName()); + if (file.isDirectory() || file.mkdirs()) { + log.info("{}文件夹创建成功", galleryTask.getName()); + } else { + log.error("{}文件夹创建失败", galleryTask.getName()); + continue; } + + ZipUtil.zip(galleryTask.getPath(), storagePath + galleryTask.getName() + "/" + galleryTask.getName() + ".zip"); + log.info("{}压缩完成", galleryTask.getName()); + + FileUtil.del(galleryTask.getPath()); + galleryTask.setStatus(GalleryTask.COMPRESS_COMPLETE); + } catch (Exception e){ + log.error("{}压缩失败:{}", galleryTask, e.getMessage()); } + } } - /** - * 检查改任务是否为已完成任务,如已完成则返回true,若未完成则加入队列 - * @return true if compress complete, false otherwise - */ public boolean addToQueue(GalleryTask galleryTask){ - //是否含有名字,进行中任务一般有名字,没有名字则肯定为初始任务,存在名字至少在下载路径出现过 if(galleryTask.getName() == null || galleryTask.getName().isEmpty()){ queue.putIfAbsent(galleryTask.getGid(), galleryTask); return false; } - //查询hah下载路径中,是否存在该任务下载路径,存在则为下载中或下载完成任务,加入队列 if(new File(downloadPath + galleryTask.getGid()).isDirectory()){ queue.putIfAbsent(galleryTask.getGid(), galleryTask); return false; } - - //查询存放路径中是否含有该任务的压缩包,存在则为下载完成任务 if(new File(storagePath + galleryTask.getName() + "/" + galleryTask.getName() + ".zip").exists()){ galleryTask.setStatus(GalleryTask.COMPRESS_COMPLETE); CustomUtil.notifyMe(String.format("任务:%s在添加时已下载完成,更新任务状态", galleryTask.getName())); return true; } - queue.putIfAbsent(galleryTask.getGid(), galleryTask); return false; } -} +} \ No newline at end of file diff --git a/src/main/java/lion/storageNode.java b/src/main/java/lion/storageNode.java index e051f56..5907ddb 100644 --- a/src/main/java/lion/storageNode.java +++ b/src/main/java/lion/storageNode.java @@ -1,6 +1,5 @@ package lion; -import io.netty.util.concurrent.Promise; import lion.Domain.GalleryTask; import lion.Message.*; import lion.Message.Main.*; @@ -14,7 +13,6 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import lombok.extern.slf4j.Slf4j; - import java.net.InetSocketAddress; import java.net.Socket; import java.util.HashMap; @@ -25,8 +23,6 @@ import java.util.concurrent.locks.ReentrantLock; @Slf4j public class storageNode { - ChannelFuture channelFuture; - Channel server; Channel node; @@ -39,10 +35,6 @@ public class storageNode { ScheduledExecutorService checkThreadPool; - HashMap> promises; - - int counter; - ReentrantLock lock; public static String storagePath = "/root/gallery/gallery/"; @@ -51,12 +43,10 @@ public class storageNode { queue = new HashMap<>(); tempQueue = new HashMap<>(); lock = new ReentrantLock(); - counter = 0; - promises = new HashMap<>(); int real_port = CustomUtil._findIdlePort(26321); - channelFuture = new ServerBootstrap() + new ServerBootstrap() .channel(NioServerSocketChannel.class) .group(new NioEventLoopGroup()) .childHandler(new ChannelInitializer() { @@ -85,19 +75,22 @@ public class storageNode { if (i==20) { log.info("server connect failed"); } - downloadCheckService = new DownloadCheckService(queue, promises); + downloadCheckService = new DownloadCheckService(queue); checkThreadPool = Executors.newScheduledThreadPool(1); checkThreadPool.scheduleAtFixedRate(this::mainThread, 5, 5, TimeUnit.SECONDS); } - + public void mainThread(){ try { lock.lock(); - if(!tempQueue.isEmpty()){ - queue.putAll(tempQueue); - tempQueue.clear(); + try { + if(!tempQueue.isEmpty()){ + queue.putAll(tempQueue); + tempQueue.clear(); + } + } finally { + lock.unlock(); } - lock.unlock(); //检查,当任务状态发生变化即方法返回true时,再更新,否则return if (!downloadCheckService.downloadCheck()) { boolean isSkip = true; @@ -120,20 +113,23 @@ public class storageNode { //发送 //上锁后再发送,避免出现发送完之后再下载完成 lock.lock(); - DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage(); - downloadStatusMessage.setGalleryTasks(queue.values().toArray(GalleryTask[]::new)); - server.writeAndFlush(downloadStatusMessage); + try { + DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage(); + downloadStatusMessage.setGalleryTasks(queue.values().toArray(GalleryTask[]::new)); + server.writeAndFlush(downloadStatusMessage); - queue.entrySet().removeIf(entry -> entry.getValue().is_compress_complete()); - log.info("任务状态发送完成"); - - - lock.unlock(); + queue.entrySet().removeIf(entry -> entry.getValue().is_compress_complete()); + log.info("任务状态发送完成"); + } finally { + lock.unlock(); + } }catch (Exception e){ log.error("发送任务状态时发生异常:{}", e.getMessage()); } } + int counter; + class MyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{ Map queue; @@ -155,20 +151,22 @@ public class storageNode { } else if(identityMessage.getIdentity().equals("lionwebsiteside")){ node = ctx.channel(); log.info("node上线"); - downloadCheckService.setNode(node); } } case AbstractMessage.DOWNLOAD_POST_MESSAGE -> { DownloadPostMessage dpm = (DownloadPostMessage) abstractMessage; lock.lock(); - //添加到队列方法返回真说明该任务已下载完成,直接发送下载进度 - if(downloadCheckService.addToQueue(dpm.getGalleryTask())){ - DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage(); - downloadStatusMessage.setGalleryTasks(new GalleryTask[]{dpm.getGalleryTask()}); - server.writeAndFlush(downloadStatusMessage); + try { + //添加到队列方法返回真说明该任务已下载完成,直接发送下载进度 + if(downloadCheckService.addToQueue(dpm.getGalleryTask())){ + DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage(); + downloadStatusMessage.setGalleryTasks(new GalleryTask[]{dpm.getGalleryTask()}); + server.writeAndFlush(downloadStatusMessage); + } + log.info(String.valueOf(queue)); + } finally { + lock.unlock(); } - log.info(String.valueOf(queue)); - lock.unlock(); ctx.writeAndFlush(new ResponseMessage(dpm.messageId, (byte) 0)); } case AbstractMessage.DELETE_GALLERY_MESSAGE -> { @@ -197,9 +195,7 @@ public class storageNode { } else if(ctx.channel().equals(node)){ log.info("node 下线"); node = null; - downloadCheckService.setNode(null); } } } -} - +} \ No newline at end of file diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..5b831d9 --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,21 @@ + + + + run.out + + run.%i.out + 1 + 10 + + + 10MB + + + %d{yyyy-MM-dd HH:mm:ss} [%level] %msg%n + + + + + + + \ No newline at end of file diff --git a/src/main/resources/simplelogger.properties b/src/main/resources/simplelogger.properties deleted file mode 100644 index 53cffde..0000000 --- a/src/main/resources/simplelogger.properties +++ /dev/null @@ -1,9 +0,0 @@ - -org.slf4j.simpleLogger.defaultLogLevel=info - -org.slf4j.simpleLogger.showDateTime=true - -org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss - -org.slf4j.simpleLogger.logFile=run.out -