storageNode/src/main/java/lion/storageNode.java

187 lines
7.0 KiB
Java

package lion;
import lion.Domain.GalleryTask;
import lion.Message.*;
import lion.Service.DeleteService;
import lion.Service.DeliveryService;
import lion.Service.DownloadCheckService;
import cn.hutool.core.io.FileUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.log4j.Log4j;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.ListIterator;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
@Log4j
public class storageNode {
ChannelFuture channelFuture;
Channel server;
DownloadCheckService downloadCheckService;
ArrayList<GalleryTask> queue;
ArrayList<GalleryTask> tempQueue;
ScheduledExecutorService checkThreadPool;
ReentrantLock lock;
public static String storagePath = "/root/gallery/gallery/";
public storageNode(){
queue = new ArrayList<>(0);
lock = new ReentrantLock();
channelFuture = new ServerBootstrap()
.channel(NioServerSocketChannel.class)
.group(new NioEventLoopGroup())
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) {
channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(100000000, 1, 4));
channel.pipeline().addLast(new MessageCodec());
channel.pipeline().addLast(new LoggingHandler());
channel.pipeline().addLast(new MyChannelInboundHandlerAdapter(tempQueue));
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.bind(26321);
log.info("listen port:8080");
try(Socket socket = new Socket()) {
socket.connect(new InetSocketAddress("194.36.27.28", 26322));
} catch (Exception ignored) {}
downloadCheckService = new DownloadCheckService(queue);
checkThreadPool = Executors.newScheduledThreadPool(1);
checkThreadPool.scheduleAtFixedRate(this::mainThread, 5, 5, TimeUnit.SECONDS);
}
public void mainThread(){
try {
//检查
if (!downloadCheckService.downloadCheck())
return;
//发送
DownloadStatusMessage downloadStatusMessage = new DownloadStatusMessage();
downloadStatusMessage.setGalleryTasks(queue.toArray(GalleryTask[]::new));
server.writeAndFlush(downloadStatusMessage);
ListIterator<GalleryTask> listIterator = queue.listIterator();
lock.lock();
while (listIterator.hasNext()) {
GalleryTask galleryTask = listIterator.next();
if (galleryTask.is_download_complete())
listIterator.remove();
}
if(!tempQueue.isEmpty()){
queue.addAll(tempQueue);
tempQueue.clear();
}
lock.unlock();
log.info("任务状态发送完成");
}catch (Exception e){
e.printStackTrace();
try (OutputStream outputStream = new FileOutputStream("/root/gallery/storageNode/err.txt")){
outputStream.write(e.getMessage().getBytes());
channelFuture.channel().close().sync();
System.exit(-1);
}catch (Exception ex){
ex.printStackTrace();
}
}
}
class MyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{
ArrayList<GalleryTask> queue;
public MyChannelInboundHandlerAdapter(ArrayList<GalleryTask> queue) {
super();
this.queue = queue;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(msg);
if(msg instanceof IdentityMessage) {
server = ctx.channel();
log.info("server 上线");
//提交下载
}else if(msg instanceof DownloadPostMessage dpm){
lock.lock();
queue.add(dpm.getGalleryTask());
System.out.println(queue);
lock.unlock();
ctx.writeAndFlush(new ResponseMessage(dpm.messageId, (byte) 0));
//删除本子 全部/预览/源文件
}else if(msg instanceof DeleteGalleryMessage deleteGalleryMessage){
byte result = switch (deleteGalleryMessage.getDeleteType()){
case DeleteGalleryMessage.DELETE_ALL ->
DeleteService.deleteAll(storagePath + deleteGalleryMessage.getGalleryName());
case DeleteGalleryMessage.DELETE_PREVIEW ->
DeleteService.deletePreview(storagePath + deleteGalleryMessage.getGalleryName());
case DeleteGalleryMessage.DELETE_SOURCE ->
DeleteService.deleteSource(storagePath + deleteGalleryMessage.getGalleryName());
default -> -1;
};
ResponseMessage responseMessage = new ResponseMessage(deleteGalleryMessage.messageId, result);
ctx.writeAndFlush(responseMessage);
//请求预览
}else if(msg instanceof GalleryRequestMessage grm){
byte result = DeliveryService.deliveryPreview(grm.getGalleryName(), grm.getPage(), grm.getPort());
ResponseMessage responseMessage = new ResponseMessage();
responseMessage.messageId = grm.messageId;
responseMessage.setResult(result);
ctx.writeAndFlush(responseMessage);
//更新本子 删除原有文件,然后加入到队列
}else if(msg instanceof UpdateGalleryMessage ugm){
FileUtil.del(storagePath + ugm.getGalleryTask().getName());
lock.lock();
queue.add(ugm.getGalleryTask());
System.out.println(queue);
lock.unlock();
ctx.writeAndFlush(new ResponseMessage(ugm.messageId, (byte) 0));
}else if(msg instanceof GalleryPageQueryMessage gpqm){
byte result = DeliveryService.pageQuery(gpqm);
gpqm.setResult(result);
ctx.writeAndFlush(gpqm);
}
//修复预览
//重新生成压缩包
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
if(ctx.channel().equals(server)) {
log.info("server 下线");
server = null;
}
}
}
}