加入节点检测,节点下线时不能下载;节点下线时监听端口,等待节点上线;允许手动重复未完成任务

This commit is contained in:
chuzhongzai 2023-08-08 15:24:54 +08:00
parent f0f3e576d6
commit 9fcf9d0982
9 changed files with 118 additions and 32 deletions

View File

@ -123,4 +123,9 @@ public class GalleryManageController {
return galleryManageService.shareGallery(gid, expireHour); return galleryManageService.shareGallery(gid, expireHour);
return Response._failure("非法访问"); return Response._failure("非法访问");
} }
@PostMapping("/reset")
public String resetUndone(){
return galleryManageService.resetUndone();
}
} }

View File

@ -3,6 +3,7 @@ package com.lion.lionwebsite.Controller;
import com.lion.lionwebsite.Domain.User; import com.lion.lionwebsite.Domain.User;
import com.lion.lionwebsite.Service.PublicServiceImpl; import com.lion.lionwebsite.Service.PublicServiceImpl;
import com.lion.lionwebsite.Service.RemoteService;
import com.lion.lionwebsite.Util.FileDownload; import com.lion.lionwebsite.Util.FileDownload;
import com.lion.lionwebsite.Util.Response; import com.lion.lionwebsite.Util.Response;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
@ -25,6 +26,9 @@ public class PublicController {
@Resource @Resource
PublicServiceImpl publicService; PublicServiceImpl publicService;
@Resource
RemoteService remoteService;
@GetMapping("/ip") @GetMapping("/ip")
public void ip(HttpServletRequest request, String auth, HttpServletResponse response) throws IOException { public void ip(HttpServletRequest request, String auth, HttpServletResponse response) throws IOException {
String ip = request.getHeader("X-Forwarded-For"); String ip = request.getHeader("X-Forwarded-For");
@ -80,7 +84,10 @@ public class PublicController {
public String validate(String AuthCode){ public String validate(String AuthCode){
Response response = Response.generateResponse(); Response response = Response.generateResponse();
User user = publicService.getUserId(AuthCode); User user = publicService.getUserId(AuthCode);
response.success(String.format("{\"userId\": %d, \"username\": \"%s\"}", user.getId(), user.getUsername())); String isAvailable = remoteService.isDead() ? "false": "true";
response.success(String.format("{\"userId\": %d, " +
"\"username\": \"%s\", " +
"\"isAvailable\": %s}", user.getId(), user.getUsername(), isAvailable));
return response.toJSONString(); return response.toJSONString();
} }

View File

@ -76,6 +76,9 @@ public class GalleryManageService {
public String createTask(String link, String targetResolution, String AuthCode, List<Integer> tidS, byte mode){ public String createTask(String link, String targetResolution, String AuthCode, List<Integer> tidS, byte mode){
Response response = Response.generateResponse(); Response response = Response.generateResponse();
// return Response._failure("调试中,请勿提交任务"); // return Response._failure("调试中,请勿提交任务");
if(remoteService.isDead())
return Response._failure("节点挂了,找狮子处理");
int gid; int gid;
try { try {
gid = Integer.parseInt(link.split("/")[4]); gid = Integer.parseInt(link.split("/")[4]);
@ -592,6 +595,27 @@ public class GalleryManageService {
return response.toJSONString(); return response.toJSONString();
} }
public String resetUndone(){
Response response = Response.generateResponse();
if (remoteService.isDead()){
response.failure("节点不在线,无法重置");
return response.toJSONString();
}
Gallery[] galleries = galleryMapper.selectUnDoneGalleries();
if(galleries != null && galleries.length != 0) {
log.info("发送未下载完成本子至节点,{}本", galleries.length);
for (Gallery gallery : galleries)
remoteService.addGalleryToQueue(gallery, gallery.getMode());
response.success(String.format("发送未下载完成本子至节点,%s本", galleries.length));
}else{
response.failure("当前没有未下载完成的本子");
}
return response.toJSONString();
}
public static Integer parseGid(String link){ public static Integer parseGid(String link){
try { try {
return Integer.parseInt(link.split("/g/")[1].split("/")[0]); return Integer.parseInt(link.split("/g/")[1].split("/")[0]);

View File

@ -22,7 +22,7 @@ import org.springframework.stereotype.Service;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
@ -59,6 +59,8 @@ public class RemoteService {
ExecutorService downloadThread; ExecutorService downloadThread;
Thread monitor;
AtomicInteger atomicInteger; AtomicInteger atomicInteger;
public RemoteService(){ public RemoteService(){
@ -66,6 +68,14 @@ public class RemoteService {
eventLoopGroup = new DefaultEventLoop(); eventLoopGroup = new DefaultEventLoop();
downloadThread = Executors.newCachedThreadPool(); downloadThread = Executors.newCachedThreadPool();
promiseHashMap = new HashMap<>(); promiseHashMap = new HashMap<>();
if(!initChannel()){ //如果远程服务器连接失败则开启本地监听
monitor = new Thread(this::monitorFunc);
monitor.start();
}
}
public boolean initChannel(){
try { try {
channelFuture = new Bootstrap() channelFuture = new Bootstrap()
.channel(NioSocketChannel.class) .channel(NioSocketChannel.class)
@ -82,13 +92,15 @@ public class RemoteService {
log.info("connect success"); log.info("connect success");
channel = channelFuture.channel(); channel = channelFuture.channel();
channel.writeAndFlush(new IdentityMessage()); channel.writeAndFlush(new IdentityMessage());
}catch (InterruptedException e){ return true;
e.printStackTrace(); }catch (Exception e){
log.info("connect node failed, wait for node back online");
return false;
} }
} }
public boolean isAlive(){ public boolean isDead(){
return !(channelFuture.channel() == null) || channelFuture.channel().isActive(); return channelFuture.channel() == null || !channelFuture.channel().isActive();
} }
public byte addGalleryToQueue(Gallery gallery, byte type){ public byte addGalleryToQueue(Gallery gallery, byte type){
@ -253,6 +265,25 @@ public class RemoteService {
} }
} }
public void monitorFunc(){
System.out.println("监听端口: " + (port + 1) + " 等待节点上线");
try(ServerSocket socket = new ServerSocket(port + 1)) {
Socket client;
while(true){
client = socket.accept();
client.close();
if(client.getInetAddress().getHostAddress().equals("5.255.110.45")){
System.out.println("尝试连接");
initChannel();
socket.close();
break;
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
class MyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{ class MyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { public void channelRead(ChannelHandlerContext ctx, Object msg) {
@ -283,7 +314,15 @@ public class RemoteService {
promiseHashMap.get(rsm.messageId).setSuccess(rsm); promiseHashMap.get(rsm.messageId).setSuccess(rsm);
else if(msg instanceof GalleryPageQueryMessage gpqm) else if(msg instanceof GalleryPageQueryMessage gpqm)
promiseHashMap.get(gpqm.messageId).setSuccess(gpqm); promiseHashMap.get(gpqm.messageId).setSuccess(gpqm);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
if(ctx.channel() != null && ctx.channel().remoteAddress().toString().equals(channel.remoteAddress().toString())){
System.out.println("activate monitor thread, waiting for node back online");
monitor = new Thread(RemoteService.this::monitorFunc);
monitor.start();
}
} }
} }
} }

View File

@ -60,6 +60,18 @@ public class Response {
return response.toJSONString(); return response.toJSONString();
} }
public static String _success(String result){
Response response = generateResponse();
response.success(result);
return response.toJSONString();
}
public static String _success(){
Response response = generateResponse();
response.success();
return response.toJSONString();
}
public static String _default(){ public static String _default(){
Response response = Response.generateResponse(); Response response = Response.generateResponse();
response.failure("参数错误"); response.failure("参数错误");

View File

@ -31,7 +31,6 @@ gallery-manage-service:
remote-service: remote-service:
ip: 5.255.110.45 ip: 5.255.110.45
port: 8080

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long