实现一款高可用的 TCP 数据传输服务器(Java版)
1.netty能做什么
首先netty是一款高性能、封装性良好且灵活、基于NIO(真·非阻塞IO)的开源框架。可以用来手写web服务器、TCP服务器等,支持的协议丰富,如:常用的HTTP/HTTPS/WEBSOCKET,并且提供的大量的方法,十分灵活,可以根据自己的需求量身DIV一款服务器。
用netty编写TCP的服务器/客户端
1.可以自己设计数据传输协议如下面这样:
2.可以自定义编码规则和解码规则
3.可以自定义客户端与服务端的数据交互细节,处理socket流攻击、TCP的粘包和拆包问题
2.Quick Start
创建一个普通的maven项目,不依赖任何的三方web服务器,用main方法执行即可。
加入POM依赖
io.netty
netty-all
4.1.6.Final
com.fasterxml.jackson.core
jackson-databind
2.9.7
log4j
log4j
1.2.17
设计一套基于TCP的数据传输协议
public class TcpProtocol {
private byte header=0x58;
private int len;
private byte [] data;
private byte tail=0x63;
public byte getTail() {
return tail;
}
public void setTail(byte tail) {
this.tail = tail;
}
public TcpProtocol(int len, byte[] data) {
this.len = len;
this.data = data;
}
public TcpProtocol() {
}
public byte getHeader() {
return header;
}
public void setHeader(byte header) {
this.header = header;
}
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
}
这里使用16进制表示协议的开始位和结束位,其中0x58代表开始,0x63代表结束,均用一个字节来进行表示。
TCP服务器的启动类
public class TcpServer {
private int port;
private Logger logger = Logger.getLogger(this.getClass());
public void init(){
logger.info("正在启动tcp服务器……");
NioEventLoopGroup boss = new NioEventLoopGroup();//主线程组
NioEventLoopGroup work = new NioEventLoopGroup();//工作线程组
try {
ServerBootstrap bootstrap = new ServerBootstrap();//引导对象
bootstrap.group(boss,work);//配置工作线程组
bootstrap.channel(NioServerSocketChannel.class);//配置为NIO的socket通道
bootstrap.childHandler(new ChannelInitializer
() {
protected void initChannel(SocketChannel ch) throws Exception {//绑定通道参数
ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));//设置log监听器,并且日志级别为debug,方便观察运行流程
ch.pipeline().addLast("encode",new EncoderHandler());//编码器。发送消息时候用过
ch.pipeline().addLast("decode",new DecoderHandler());//解码器,接收消息时候用
ch.pipeline().addLast("handler",new BusinessHandler());//业务处理类,最终的消息会在这个handler中进行业务处理
}
});
bootstrap.option(ChannelOption.SO_BACKLOG,1024);//缓冲区
bootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);//ChannelOption对象设置TCP套接字的参数,非必须步骤
ChannelFuture future = bootstrap.bind(port).sync();//使用了Future来启动线程,并绑定了端口
logger.info("启动tcp服务器启动成功,正在监听端口:"+port);
future.channel().closeFuture().sync();//以异步的方式关闭端口
}catch (InterruptedException e) {
logger.info("启动出现异常:"+e);
}finally {
work.shutdownGracefully();
boss.shutdownGracefully();//出现异常后,关闭线程组
logger.info("tcp服务器已经关闭");
}
}
public static void main(String[] args) {
new TcpServer(8777).init();
}
public TcpServer(int port) {
this.port = port;
}
}
只要是基于netty的服务器,都会用到bootstrap 并用这个对象绑定工作线程组,channel的Class,以及用户DIV的各种pipeline的handler类,注意在添加自定义handler的时候,数据的流动顺序和pipeline中添加hanlder的顺序是一致的。也就是说,从上往下应该为:底层字节流的解码/编码handler、业务处理handler。
编码器
编码器是服务器按照协议格式返回数据给客户端时候调用的,继承MessageToByteEncoder代码:
public class EncoderHandler extends MessageToByteEncoder {
private Logger logger = Logger.getLogger(this.getClass());
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (msg instanceof TcpProtocol){
TcpProtocol protocol = (TcpProtocol) msg;
out.writeByte(protocol.getHeader());
out.writeInt(protocol.getLen());
out.writeBytes(protocol.getData());
out.writeByte(protocol.getTail());
logger.debug("数据编码成功:"+out);
}else {
logger.info("不支持的数据协议:"+msg.getClass()+"\t期待的数据协议类是:"+TcpProtocol.class);
}
}
}
解码器
解码器属于比较核心的部分,自定义解码协议、粘包、拆包等都在里面实现,继承自ByteToMessageDecoder,其实ByteToMessageDecoder的内部已经帮我们处理好了拆包/粘包的问题,只需要按照它的设计原则去实现decode方法即可:
public class DecoderHandler extends ByteToMessageDecoder {
//最小的数据长度:开头标准位1字节
private static int MIN_DATA_LEN=6;
//数据解码协议的开始标志
private static byte PROTOCOL_HEADER=0x58;
//数据解码协议的结束标志
private static byte PROTOCOL_TAIL=0x63;
private Logger logger = Logger.getLogger(this.getClass());
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List