帮助文档
专业提供香港服务器、香港云服务器、香港高防服务器租用、香港云主机、台湾服务器、美国服务器、美国云服务器vps租用、韩国高防服务器租用、新加坡服务器、日本服务器租用 一站式全球网络解决方案提供商!专业运营维护IDC数据中心,提供高质量的服务器托管,服务器机房租用,服务器机柜租用,IDC机房机柜租用等服务,稳定、安全、高性能的云端计算服务,实时满足您的多样性业务需求。 香港大带宽稳定可靠,高级工程师提供基于服务器硬件、操作系统、网络、应用环境、安全的免费技术支持。
服务器资讯 / 香港服务器租用 / 香港VPS租用 / 香港云服务器 / 美国服务器租用 / 台湾服务器租用 / 日本服务器租用 / 官方公告 / 帮助文档
用netty轻松实现一个高效稳定的TCP服务器
发布时间:2024-03-08 23:23:10   分类:帮助文档
用netty轻松实现一个高效稳定的TCP服务器           随着物联网的发展,很多项目都开始涉及到了tcp连接这块,在这里我们轻松用netty去实现,站在巨人的肩膀上。 关于netty包引用: io.netty netty-all 4.1.42.Final compile 实现TCP服务器代码 依赖netty只需几行代码tcp服务: import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteOrder; public class TcpServer { private Logger log = LoggerFactory.getLogger(getClass()); //自定义tcp服务端口号 private int port=9000; static TcpServer tcpServer; //单例设计模式 private TcpServer(){ } public static TcpServer getInstance(){ if(tcpServer==null){ tcpServer=new TcpServer(); } return tcpServer; }; public void run() throws InterruptedException { // 创建主线程组(接受连接) EventLoopGroup bossGroup = new NioEventLoopGroup(); // 创建工作线程组(处理连接) EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 指定工作线程数量为20 // 创建ServerBootstrap实例,用于配置服务器 ServerBootstrap bootstrap = new ServerBootstrap(); // 配置主、工作线程组 bootstrap.group(bossGroup, workerGroup); // 指定使用NIO进行网络传输 bootstrap.channel(NioServerSocketChannel.class); // 设置子Channel的Socket选项,允许地址重用 bootstrap.childOption(ChannelOption.SO_REUSEADDR, true); // 配置子Channel的处理器,这里使用ChannelInitializer bootstrap.childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { // 添加自定义的解码器,这里是处理协议 ch.pipeline().addLast(new YKCDecoderV1()); // 添加自定义的服务器处理器 ch.pipeline().addLast(new TCPServerHandler()); } }); // 绑定端口并添加监听器,处理绑定操作的结果 bootstrap.bind(port).addListener((ChannelFutureListener) future -> { // 在绑定成功后输出日志信息 log.info("bind success in port: " + port); }); // 输出服务器启动成功信息 System.out.println("server started!"); } }  业务处理代码(参考) 以下是处理报文业务类可参考,注意代码未优化: import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.net.InetSocketAddress; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; / * 正则解析版 */ public class YKCDecoderV1 extends ByteToMessageDecoder { final static String reg = "^68.{14,332}";//单指令解析 根据业务协议报文定最短和最长 final static Pattern pattern1 = Pattern.compile(reg); @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf bufferIn, List list) throws Exception { // 获取可读字节数 int leng = bufferIn.readableBytes(); // 如果可读字节数小于8,输出错误信息并跳过这部分数据 if (leng < 8) { System.out.println("err! cmd len < 8 ."); String s = ByteBufUtil.hexDump(bufferIn); System.out.println(s); bufferIn.skipBytes(leng); return; } else { String s = ByteBufUtil.hexDump(bufferIn); Matcher matcher1 = pattern1.matcher(s); if (matcher1.find()) { String cmd = matcher1.group(); //单指令 System.out.println("sign cmd: " + cmd); String lenStr = cmd.substring(2, 4); int len = (Integer.parseInt(lenStr, 16) + 4) * 2; int cmdLen = cmd.length(); if (cmdLen == len) { JFYChargeProtocol jfyChargeProtocol = new JFYChargeProtocol(cmd); list.add(jfyChargeProtocol); bufferIn.skipBytes(leng); } else if (cmdLen > len) { multiHand(cmd, list); bufferIn.skipBytes(leng); } } else { logErr(channelHandlerContext, s); System.out.println("err! cmd format invalid: " + s); bufferIn.skipBytes(leng); } } } private void multiHand(String cmd, List list) { if (cmd.length() < 8) { return; } String lenStr = cmd.substring(2, 4); int len = (Integer.parseInt(lenStr, 16) + 4) * 2; if (len > cmd.length()) { return; } String newCmd = cmd.substring(0, len); if (newCmd.length() == len) { System.out.println("multi cmd-> " + newCmd); JFYChargeProtocol jfyChargeProtocol = new JFYChargeProtocol(newCmd); list.add(jfyChargeProtocol); } if (cmd.length() > len) { System.out.println("multi xxx-> " + cmd); String two = cmd.substring(len); if(two.startsWith("68")){ multiHand(two, list); } } } private int checkSignCmd(String cmd) { int cmd_len = getCmdLen(cmd); return cmd.length() - cmd_len; } private int getCmdLen(String cmd) { String leng = cmd.substring(28, 30) + cmd.substring(26, 28); int dec_num = Integer.parseInt(leng, 16); return (dec_num * 2) + 34; } private void logErr(ChannelHandlerContext ctx, String msg) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIP = insocket.getAddress().getHostAddress(); System.out.println(clientIP + " :: " + msg); } public class JFYChargeProtocol { private int length; private byte[] raw; private String rawStr; public JFYChargeProtocol(int length,byte[] raw){ this.length=length; this.raw=raw; } public JFYChargeProtocol(String raw){ this.rawStr=raw; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } public byte[] getRaw() { return raw; } public void setRaw(byte[] raw) { this.raw = raw; } public String getRawStr() { return rawStr; } public void setRawStr(String rawStr) { this.rawStr = rawStr; } } import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.stereotype.Service; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Service public class TCPServerHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LogManager.getLogger(TCPServerHandler.class); static Map inList=new ConcurrentHashMap(); / * 新连接 * @param ctx */ @Override public void channelActive(ChannelHandlerContext ctx) { String channelName=getChannelName(ctx); inList.put(channelName,ctx); logger.info("dev new conn > " +channelName); } private String getChannelName(ChannelHandlerContext ctx) { return "ykc".concat(ctx.channel().remoteAddress().toString()); } / * 连接下线 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String channelName=getChannelName(ctx); logger.info("dev close conn > " + channelName); inList.remove(channelName); ctx.fireChannelInactive(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { JFYChargeProtocol in = (JFYChargeProtocol) msg; String readMsg= in.getRawStr(); logger.info("read dev <= " + readMsg); String channelName=getChannelName(ctx); readMsg=channelName+"$$"+readMsg; PackageHandlerImpl.getInstance().doHandle(readMsg); //ctx.writeAndFlush(in); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } / * 回复信息给设备 * @param hex */ public static boolean RepDev(String hex){ String[] kv= hex.split("\\$\\$"); if(kv.length==2){ String key=kv[0]; ChannelHandlerContext context=inList.get(key); if(context!=null){ byte[] bytes= ByteUtil.hexString2Bytes(kv[1]); ByteBuf byteBuf= Unpooled.copiedBuffer(bytes); context.writeAndFlush(byteBuf); return true; }else{ logger.error("dev offline="+key); } }else{ logger.error("cmd format err"); } return false; } } import java.util.ArrayList; import java.util.List; public class PackageHandlerImpl implements PackageHandler { public static List packageHandlers= new ArrayList(); static PackageHandlerImpl packageHandler; protected PackageHandlerImpl(){ super(); System.out.println("init PackageHandlerImpl"); } public static PackageHandlerImpl getInstance(){ if(packageHandler==null){ packageHandler=new PackageHandlerImpl(); } return packageHandler; } @Override public void doHandle(String hex) { for(PackageHandler f : packageHandlers){ f.doHandle(hex); } } public PackageHandlerImpl addHandle(PackageHandler f){ packageHandlers.add(f); return this; } } / * 包处理 */ public interface PackageHandler { void doHandle(String hex) ; } import org.apache.activemq.command.ActiveMQQueue; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service; import javax.jms.Destination; @Service public class TranServiceImpl { private static final Logger logger = LogManager.getLogger(DevServiceApplication.class); / * 接受服务器 返回数据 */ private final String out_name="ykc_out"; / * 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装 */ @Autowired private JmsMessagingTemplate jmsTemplate; / * 发送消息 采用系统配置类型 * * @param queueName 是发送到的队列名称 * @param message 是发送到的队列 */ public void sendMessage(String queueName, final String message) { jmsTemplate.convertAndSend(queueName, message); } / * 发送消息 采用指定队列类型 * * @param queueName 是发送到的队列 * @param message 是发送到的队列 */ public void sendMessageByQueue(String queueName, final String message) { Destination destination = new ActiveMQQueue(queueName); jmsTemplate.convertAndSend(destination, message); } @JmsListener(destination = out_name) public void receiveQueue(String text) { System.out.println("to dev => "+text); if(!TCPServerHandler.RepDev(text)){ logger.error("write mq fail ==> "+text); } } }  运行 当前是集成到 springboot2框架在这里即可运行,或实现实现 org.springframework.boot.ApplicationRunner 或 org.springframework.boot.CommandLineRunner 的接口,即启动后执行的任务,不用框架的在main方法也可以直接运行。 / * tcp服务在框架启动后 跟着启动即可 */ @SpringBootApplication public class DevServiceApplication { private static final Logger logger = LogManager.getLogger(DevServiceApplication.class); public static void main(String[] args) { TcpServer tcpServer = TcpServer.getInstance(); try { tcpServer.run(); PackageHandlerImpl packageHandler = PackageHandlerImpl.getInstance(); packageHandler.addHandle(new PackageHandlerByMQ()); } catch (InterruptedException e) { e.printStackTrace(); logger.error("TCP服务错误", e); throw new RuntimeException(); } } 总结 看看服务器上的tcp服务运行情况 运行天数: 流量状态图:             站在netty巨人的肩膀上,这个tcp服务实现方式简单,运行更是稳定。服务器运行时就部署了一直到今天共运行1235天了,900多个设备同步在线,配了2g的jvm运行内存,cpu占用5.6(top截图等了很久才出来5.6是个峰值,平时不到1)确保某个市的充电桩设备。中间由于客户的充电桩设备协议问题更新过几次,刚时开始是使用netty官网的解码LengthFieldBasedFrameDecoder做处理,可以说非常高效,但随后发现有几个产商的设备报文头部有特殊字符,而且刚好和协议头有些重叠,再考虑到示来的产商协议的不确定性,为了兼容这些产家不得以并以正则的方法去处理。 扩展部分 Netty 官方提供的编解码器 字符串编解码器: StringEncoder:将字符串编码为字节。StringDecoder:将字节解码为字符串。 字节流编解码器: ByteArrayEncoder:将字节数组编码为字节。ByteArrayDecoder:将字节解码为字节数组。 对象序列化编解码器: ObjectEncoder:将对象序列化为字节。ObjectDecoder:将字节反序列化为对象。 长度字段编解码器: LengthFieldPrepender:在消息头部添加表示消息长度的字段。LengthFieldBasedFrameDecoder:根据长度字段解码消息,用于处理拆包和粘包问题。 行分隔符编解码器: LineBasedFrameDecoder:按行切分消息,通常用于处理文本协议。 DelimiterBasedFrameDecoder: DelimiterBasedFrameDecoder:按照指定的分隔符切分消息,用于处理自定义分隔符的协议。 Protobuf 编解码器: ProtobufEncoder:将 Protobuf 对象编码为字节。ProtobufDecoder:将字节解码为 Protobuf 对象。 HTTP 编解码器: HttpRequestEncoder:将 HTTP 请求编码为字节。HttpResponseDecoder:将字节解码为 HTTP 响应。HttpRequestDecoder:将字节解码为 HTTP 请求。HttpResponseEncoder:将 HTTP 响应编码为字节。 WebSocket 编解码器: WebSocketServerProtocolHandler:处理 WebSocket 握手以及帧的编解码。
  • 7*24H在线售后
  • 高可用资源,安全稳定
  • 1v1专属客服对接
  • 无忧退款试用保障