帮助文档
专业提供香港服务器、香港云服务器、香港高防服务器租用、香港云主机、台湾服务器、美国服务器、美国云服务器vps租用、韩国高防服务器租用、新加坡服务器、日本服务器租用 一站式全球网络解决方案提供商!专业运营维护IDC数据中心,提供高质量的服务器托管,服务器机房租用,服务器机柜租用,IDC机房机柜租用等服务,稳定、安全、高性能的云端计算服务,实时满足您的多样性业务需求。 香港大带宽稳定可靠,高级工程师提供基于服务器硬件、操作系统、网络、应用环境、安全的免费技术支持。
服务器资讯 / 香港服务器租用 / 香港VPS租用 / 香港云服务器 / 美国服务器租用 / 台湾服务器租用 / 日本服务器租用 / 官方公告 / 帮助文档
2024.2.25 模拟实现 RabbitMQ —— 网络通信设计(服务器)
发布时间:2024-02-29 12:17:43   分类:帮助文档
2024.2.25 模拟实现 RabbitMQ —— 网络通信设计(服务器) 目录 引言 约定应用层的通信协议 自定义应用层协议 Type Length PayLod  实现 Broker Server 类 属性 与 构造 启动 Broker Server 停止 Broker Server 处理客户端连接 读取请求 与 写回响应 根据请求计算响应 清除 channel  引言 生产者 和 消费者 都是客户端,均通过 网络 和 Broker Server 进行通信 注意点一: 此处我们将使用 TCP 协议来作为通信的底层协议 注意点二: TCP 是有连接的(Connection)由于 创建/断开 TCP 连接的成本还挺高,需要三次握手啥的所以为了能够让 TCP 连接得到复用我们还将创建一个 Channel 类作为 Connection 内部的 逻辑上 的连接即一个 Connection 中可能有多个 Channel(一个管道,多个网线传输的效果) 约定应用层的通信协议 此处要交互的 Message 为 二进制数据HTTP 为文本协议,JSON 为文本格式,不适用此处场景所以我们自定义一个应用层协议,使用二进制的方式来传输数据! 自定义应用层协议 Type type 描述当前这个请求或响应是干啥的 具体理解: 在我们的 MQ 中,客户端(生产者 + 消费者)和 服务器(Broker Server)之间要进行的操作就是 VirtualHost 中的那些核心 API我们希望客户端通过网络能够远程调用 VirtualHost 中的核心 API此处 type 就是在描述当前这个请求/响应是在调用哪个 API取值如下:0x1 创建 channel0x2 关闭 channel0x3 创建 exchange0x4 销毁 exchange0x5 创建 queue0x6 销毁 queue0x7 创建 binding0x8 销毁 binding0x9 发送 message0xa 订阅 message0xb 返回 ack0xc 服务器给客户端推送消息(被订阅的消息)响应独有的 Length length 用来描述 payload 长度(防止粘包问题) PayLod payload 会根据当前是请求还是响应,以及当前的 type 有不同的取值 实例理解 实例一: 比如 type 是 0x3(创建交换机),同时当前是一个请求此时 payload 里的内容,就相当于是 exchangeDeclare 的参数序列化的结果 具体代码实现: 按照上述自定义应用层协议 创建 Request 类 import lombok.Data; /* * 表示一个网络通信中的请求对象,按照自定义协议的格式来展开的 * */ @Data public class Request { private int type; private int length; private byte[] payload; } 按照上述自定义应用层协议 创建 BasicArguments 类用于表示各方法的公共参数 import lombok.Data; import java.io.Serializable; /* * 使用这个类表示方法的公共参数/辅助的字段 * 后续使用每个方法又会有一些不同的参数,不同的参数再分别使用不同的子类来表示 * */ @Data public class BasicArguments implements Serializable { // 表示一次请求/响应 的身份标识,可以把请求和响应对上 protected String rid; // 这个通信使用的 channel 的身份标识 protected String channelId; } 每个方法有不同的参数,此处实例 type = 0x3 ,即 创建交换机(exchangeDeclare)所以我们根据 VirtualHost 中的 exchangeDeclare 方法中的参数,单独创建一个类出来该类还需 继承用于表示公共参数的 BasicArguments 类 import com.example.demo.mqserver.core.ExchangeType; import lombok.Getter; import lombok.Setter; import java.io.Serializable; import java.util.Map; @Getter @Setter public class ExchangeDeclareArguments extends BasicArguments implements Serializable { private String exchangeName; private ExchangeType exchangeType; private boolean durable; private boolean autoDelete; private Map arguments; } 注意: 其他 type 类型(除 0x1、0x2 、0xa 外)也均根据 其在 VirtualHost 中对应的参数,单独创建一个类即可0x1 和 0x2 分别为 创建 channel 和 关闭 channel,二者 API 所需参数就是公共参数,使用 BasicArguments 类即可,无需单独创建类type = 0xa,即 订阅消息(basicConsume),后文详细讲解 实例二: 比如 type = 0x3(创建交换机),同时当前是一个响应此时 payload 里的内容,就是 exchangeDeclare 的返回结果的序列化内容 具体代码实现: 按照上述自定义应用层协议 创建 Response 类 import lombok.Data; /* * 这个对象表示一个响应,也是根据自定义应用层协议来的 * */ @Data public class Response { private int type; private int length; private byte[] payload; } 按照上述自定义应用层协议 创建 BasicReturns 类用于表示远程调用方法的返回值 import lombok.Data; import java.io.Serializable; /* * 这个类表示各个远程调用的方法的返回值和公共信息 * */ @Data public class BasicReturns implements Serializable { // 用来标识唯一的请求和响应 protected String rid; // 用来标识一个 channelId protected String channelId; // 表示当前这个远程调用方法的返回值 protected boolean ok; } 注意: 其他 type 类型(除 0xc 外)均使用 BasicReturns 类中的成员变量 作为返回参数type = 0xc,该 type 类型为响应独占,表示 服务器给客户端推送消息(被订阅的消息),后文详解讲解 特例一: 比如 type = 0xa(订阅消息),同时当前是一个请求这个核心 API 比较特殊,其参数中包含有 回调函数 具体代码编写: 我们根据 VirtualHost 中的 BasicConsume 方法中的参数,单独创建一个类出来并且该类也要 继承用于表示公共参数的 BasicArguments 类唯一不同的是,其中用于表示 回调函数的参数 consumer 我们不写入该类中也就代表着在客户端发送请求时,不再携带 consumer 参数因为在 broker server 这边,我们规定 BasicConsume 的回调方法统一为 将收到的消息返回给消费者消费者仅需收到消息后,再在客户端自己这边执行一个用户自定义的回调就行了! import lombok.Getter; import lombok.Setter; import java.io.Serializable; @Getter @Setter public class BasicConsumeArguments extends BasicArguments implements Serializable { private String consumerTag; private String queueName; private boolean autoAck; // 这个类对应的 basicConsume 方法中,还有一个参数,是回调函数(如何来有效处理消息) // 这个回调函数,是不能通过网络传输的 // 站在 broker server 这边,针对消息的处理问题,其实是统一的(把消息返回给客户端) // 客户端这边收到消息之后,再在客户端自己这边执行一个用户自定义的回调就行了 // 此时客户端就不需要把自身的回调告诉服务器了! // 这个类就不需要 consumer 成员了 } 特列二: type = 0xc,即 服务器给客户端推送消息(被订阅的消息),该类型一定是一个响应! 如上图所示的蓝色部分此处我们定义一个 SubScribeReturns 类用于表示在消费者订阅队列之后,服务器给消费推送消息的响应参数此处仍需继承一下 代表响应公共参数的 BasicReturns 类 import com.example.demo.mqserver.core.BasicProperties; import lombok.Getter; import lombok.Setter; import java.io.Serializable; @Getter @Setter public class SubScribeReturns extends BasicReturns implements Serializable { private String consumerTag; private BasicProperties basicProperties; private byte[] body; } 注意: SubScribeReturns 类虽然继承了 BasicReturns 类但是在返回时,无需填写 BasicReturns 类中的成员变量 rid因为该响应无相对应的请求,故该响应无 rid,即将 rid 设为空字符串即可 小结: 上述内容属于服务器程序的关键环节,自定义应用层协议  实现 Broker Server 类 属性 与 构造 /* * 这个 BrokerServer 就是咱们 消息队列 本体服务器 * 本质上就是一个 TCP 的服务器 * */ public class BrokerServer { private ServerSocket serverSocket = null; // 当前考虑一个 BrokerServer 上只有一个 虚拟主机 private VirtualHost virtualHost = new VirtualHost("default"); // 使用这个 哈希表 表示当前的所有会话(也就是说有哪些客户端正在和咱们的服务器进行通信) // 此处的 key 是 channelId,value 为对应的 Socket 对象 private ConcurrentHashMap sessions = new ConcurrentHashMap(); // 引入线程池,来处理多个客户端的请求 private ExecutorService executorService = null; // 引入一个 Boolean 变量控制服务器是否继续运行 private volatile boolean runnable = true; public BrokerServer(int port) throws IOException { serverSocket = new ServerSocket(port); } } 启动 Broker Server public void start() throws IOException { System.out.println("[BrokerServer] 启动!"); executorService = Executors.newCachedThreadPool(); try { while (runnable) { Socket clientSocket = serverSocket.accept(); // 把处理连接的逻辑丢给这个线程池 executorService.submit(() ->{ processConnection(clientSocket); }); } }catch (SocketException e){ System.out.println("[BrokerServer] 服务器停止运行!"); } } 停止 Broker Server // 一般来说停止服务器,就是直接 kill 掉对应进程就行了 // 此处还是搞一个单独的停止方法,主要是用于后续的单元测试 public void stop() throws IOException { runnable = false; // 把线程池中的任务都放弃了,让线程都销毁 executorService.shutdownNow(); serverSocket.close(); } 处理客户端连接 // 通过这个方法来处理一个客户端的连接 // 在这一个连接中,可能会涉及到多个请求和响应 private void processConnection(Socket clientSocket){ try (InputStream inputStream = clientSocket.getInputStream(); OutputStream outputStream = clientSocket.getOutputStream()){ // 这里需要按照特定格式来读取并解析,此时就需要用到 DataInputStream 和 DataOutputStream try (DataInputStream dataInputStream = new DataInputStream(inputStream); DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){ while (true) { // 1、读取请求并解析 Request request = readRequest(dataInputStream); // 2、根据请求计算响应 Response response = process(request,clientSocket); // 3、把响应写回给客户端 writeResponse(dataOutputStream,response); } } } catch (EOFException | SocketException e) { // 对于这个代码,DataInputStream 如果读到 EOF,就会抛出一个 EOFException 异常 // 需要借助这个异常来结束循环 System.out.println("[BrokerServer] connection 关闭!客户端的地址:" + clientSocket.getInetAddress().toString() + ":" + clientSocket.getPort()); } catch (IOException | ClassNotFoundException | MqException e) { System.out.println("[BrokerServer] connection 出现异常!"); e.printStackTrace(); }finally { try { // 当连接处理完了,就需要记得关闭 socket clientSocket.close(); // 一个 TCP 连接中,可能包含多个 channel 需要把当前这个 socket 对应的所有 channel 也顺便清理掉 clearClosedSession(clientSocket); }catch (IOException e) { e.printStackTrace(); } } } 读取请求 与 写回响应 private Request readRequest(DataInputStream dataInputStream) throws IOException { Request request = new Request(); request.setType(dataInputStream.readInt()); request.setLength(dataInputStream.readInt()); byte[] payload = new byte[request.getLength()]; int n = dataInputStream.read(payload); if(n != request.getLength()) { throw new IOException("读取请求格式出错!"); } request.setPayload(payload); return request; } private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException { dataOutputStream.writeInt(response.getType()); dataOutputStream.writeInt(response.getLength()); dataOutputStream.write(response.getPayload()); // 这个刷新缓冲区也是重要的操作,保证当前写的这些数据能够快速进入到网卡里,而不至于在内存中呆着 dataOutputStream.flush(); } 根据请求计算响应 根据不同的 type 类型,来远程调用 VirtualHost 中不同的核心 API 具体代码编写: private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException { // 1、把 request 中的 payload 做一个初步的解析 BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload()); System.out.println("[Request] rid = " + basicArguments.getRid() + ", channelId = " + basicArguments.getChannelId() + ", type = " + request.getType() + ", length = " + request.getLength()); // 2、根据 type 的值,来进一步区分接下来这次请求要干啥 boolean ok = true; if(request.getType() == 0x1) { // 创建 channel sessions.put(basicArguments.getChannelId(), clientSocket); System.out.println("[BrokerServer] 创建 channel 完成! channelId = " + basicArguments.getChannelId()); }else if(request.getType() == 0x2) { // 销毁 channel sessions.remove(basicArguments.getChannelId()); System.out.println("[BrokerServer] 销毁 channel 完成! channelId = " + basicArguments.getChannelId()); } else if(request.getType() == 0x3) { // 创建交换机,此时 payload 就是 ExchangeDeclareArguments 对象了 ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments; ok = virtualHost.exchangeDeclare(arguments.getExchangeName(),arguments.getExchangeType(), arguments.isDurable(),arguments.isAutoDelete(),arguments.getArguments()); } else if(request.getType() == 0x4) { // 删除交换机,此时 payload 就是 ExchangeDeleteArguments 对象了 ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments; ok = virtualHost.exchangeDelete(arguments.getExchangeName()); } else if(request.getType() == 0x5) { // 创建队列,此时 payload 就是 QueueDeclareArguments 对象了 QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments; ok = virtualHost.queueDeclare(arguments.getQueueName(),arguments.isDurable(), arguments.isExclusive(),arguments.isAutoDelete(),arguments.getArguments()); } else if(request.getType() == 0x6){ // 销毁队列,此时 payload 就是 QueueDeleteArguments 对象了 QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments; ok = virtualHost.queueDelete(arguments.getQueueName()); } else if(request.getType() == 0x7){ // 创建绑定,此时 payload 就是 QueueBindArguments 对象了 QueueBindArguments arguments = (QueueBindArguments) basicArguments; ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey()); } else if(request.getType() == 0x8){ // 删除绑定,此时 payload 就是 QueueUnbindArguments 对象了 QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments; ok = virtualHost.queueUnbind(arguments.getQueueName(),arguments.getExchangeName()); } else if(request.getType() == 0x9){ // 发送消息,此时 payload 就是 BasicPublishArguments 对象了 BasicPublishArguments arguments = (BasicPublishArguments) basicArguments; ok = virtualHost.basicPublish(arguments.getExchangeName(),arguments.getRoutingKey(), arguments.getBasicProperties(),arguments.getBody()); } else if(request.getType() == 0xa){ // 订阅消息,此时 payload 就是 BasicConsumeArguments 对象了 BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments; ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(), new Consumer() { // 这个回调函数要做的工作,就是把服务器收到的消息可以直接推送回对应的消费者客户端 @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException { // 先知道当前这个收到的消息,要发给哪个客户端,此处 consumerTag 其实是 channelId // 根据 channelId 去 sessions 中查询,就可以得到对应的 socket 对象了,从而可以往里面发送数据了 // 1、根据 channelId 找到 socket 对象 Socket clientSocket = sessions.get(consumerTag); if(clientSocket == null || clientSocket.isClosed()) { throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!"); } // 2、构造响应数据 SubScribeReturns subScribeReturns = new SubScribeReturns(); subScribeReturns.setChannelId(consumerTag); subScribeReturns.setRid(""); // 由于这里只有响应,没有请求,不需要去对应 rid 暂时不需要 subScribeReturns.setOk(true); subScribeReturns.setConsumerTag(consumerTag); subScribeReturns.setBody(body); subScribeReturns.setBasicProperties(basicProperties); byte[] paylaod = BinaryTool.toBytes(subScribeReturns); Response response = new Response(); // 0xc 表示服务器给消费者客户端推送的消息数据 response.setType(0xc); // response 的 payload 就是一个 SubScribeReturns response.setLength(paylaod.length); response.setPayload(paylaod); // 3、把数据写回给客户端 // 注意!此处的 dataOutputStream 这个对象不能 close! // 如果把 dataOutputStream 关闭,就会直接把 clientSocket 里的 outputStream 也给关了 // 此时就无法继续往 socket 中写入后续数据了! DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream()); writeResponse(dataOutputStream,response); } }); } else if(request.getType() == 0xb){ // 确认消息,此时 payload 就是 BasicAckArguments 对象了 BasicAckArguments arguments = (BasicAckArguments) basicArguments; ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId()); }else { // 当前的 type 是非法的 throw new MqException("[BrokerServer] 未知的 type!type = " + request.getType()); } // 3、构造响应 BasicReturns basicReturns = new BasicReturns(); basicReturns.setChannelId(basicArguments.getChannelId()); basicReturns.setRid(basicArguments.getRid()); basicReturns.setOk(ok); byte[] payload = BinaryTool.toBytes(basicReturns); Response response = new Response(); response.setType(request.getType()); response.setLength(payload.length); response.setPayload(payload); System.out.println("[Response] rid = " + basicReturns.getRid() + ",channelId = " + basicReturns.getChannelId() + ", type = " + response.getType() + ",length = " + response.getLength()); return response; } 注意点一: 当前请求中的 payload 里面放的内容 是根据 type 的类型来走的比如 type 是 0x3,payload 就是 ExchangeDeclareArguments比如 type 是 0x4,payload 就是 ExchangeDeleteArguments... 注意点二: 此处设定的不同的方法的参数,虽然都有不同的类但是它们均继承自同一个 BasicArguments 类因此先将 payload 转成 BasicArguments 清除 channel  清理 sessions 这个 哈希表 中的 session 信息 具体代码编写: private void clearClosedSession(Socket clientSocket) { // 这里要做的事情,主要就是遍历上述 session hash 表,把该关闭的 socket 对应的键值对,统统删掉 List toDeleteChannelId = new ArrayList<>(); for(Map.Entry entry : sessions.entrySet()) { if(entry.getValue() == clientSocket) { // 不能在这里直接删除 // 这属于集合类的一个大忌!!一边遍历,一边删除! // session.remove(entry.getKey()); toDeleteChannelId.add(entry.getKey()); } } for (String channelId : toDeleteChannelId) { sessions.remove(channelId); } System.out.println("[BrokerServer] 清理 session 完成!被清理的 channelId = " + toDeleteChannelId); }
香港云服务器租用推荐
服务器租用资讯
·广东云服务有限公司怎么样
·广东云服务器怎么样
·广东锐讯网络有限公司怎么样
·广东佛山的蜗牛怎么那么大
·广东单位电话主机号怎么填写
·管家婆 花生壳怎么用
·官网域名过期要怎么办
·官网邮箱一般怎么命名
·官网网站被篡改怎么办
服务器租用推荐
·美国服务器租用
·台湾服务器租用
·香港云服务器租用
·香港裸金属服务器
·香港高防服务器租用
·香港服务器租用特价