路由模型
RabbitMQ 提供了五種不同的通信模型,上一篇文章中,簡單的介紹了一下RabbitMQ的發(fā)布訂閱模型模型。這篇文章來學(xué)習(xí)一下RabbitMQ中的路由模型(direct)。
路由模型(direct):路由模式相當(dāng)于是分布訂閱模式的升級版,多了一個 路由key來約束隊列與交換機(jī)的綁定。
在路由模型中,生產(chǎn)者將消息發(fā)送到交換機(jī),交換機(jī)根據(jù)消息的路由鍵將消息轉(zhuǎn)發(fā)到對應(yīng)的隊列中。每個隊列可以綁定多個路由鍵,每個路由鍵可以綁定到多個隊列中。消費(fèi)者從隊列中接收消息并處理。當(dāng)一個路由鍵被多個隊列綁定時,交換機(jī)會將消息發(fā)送到所有綁定的隊列中。當(dāng)一個隊列綁定多個路由鍵時,該隊列將能夠接收到所有路由鍵對應(yīng)的消息。
適用場景
路由模型適用于需要點(diǎn)對點(diǎn)通信的場景,例如:
- 系統(tǒng)監(jiān)控告警通知;
- 任務(wù)分發(fā);
- 用戶私信系統(tǒng);
- 訂單確認(rèn)通知等。
演示
生產(chǎn)者
// 生產(chǎn)者 public class Producer { private static final String EXCHANGE_NAME = "exchange_direct_1"; // 定義路由的key,key值是可以隨意定義的 private static final String EXCHANGE_ROUTING_KEY1 = "direct_km1"; private static final String EXCHANGE_ROUTING_KEY2 = "direct_km2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); for (int i = 0; i < 100; i++) { if (i % 2 == 0) { channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1, null, ("路由模型發(fā)送的第 " + i + " 條信息").getBytes()); } else { channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2, null, ("路由模型發(fā)送的第 " + i + " 條信息").getBytes()); } } channel.close(); connection.close(); } }
消費(fèi)者
// 消費(fèi)者1 public class Consumer { private static final String QUEUE_NAME = "queue_direct_1"; private static final String EXCHANGE_NAME = "exchange_direct_1"; private static final String EXCHANGE_ROUTING_KEY1 = "direct_km1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者1接收到的消息是:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
// 消費(fèi)者2 public class Consumer2 { private static final String QUEUE_NAME = "queue_direct_2"; private static final String EXCHANGE_NAME = "exchange_direct_1"; private static final String EXCHANGE_ROUTING_KEY2 = "direct_km2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者2接收到的消息是:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
測試
先啟動2個消費(fèi)者,再啟動生產(chǎn)者
可以得到結(jié)果是消費(fèi)者1得到了序號是偶數(shù)的消息
消費(fèi)者2得到了序號是奇數(shù)的消息
小結(jié)
本文介紹了 RabbitMQ 通信模型中的路由模型的使用,通過交換機(jī)和路由鍵實現(xiàn)點(diǎn)對點(diǎn)通信,適合于需要點(diǎn)對點(diǎn)通信的場景。在實際使用過程中,需要注意以下幾點(diǎn):
- 路由鍵必須要與消費(fèi)者綁定隊列時的路由鍵相同,否則無法接收到消息;
- 可以通過多個交換機(jī)和路由鍵來實現(xiàn)更靈活的消息路由。
-
交換機(jī)
+關(guān)注
關(guān)注
21文章
2656瀏覽量
100179 -
路由
+關(guān)注
關(guān)注
0文章
278瀏覽量
41930 -
模型
+關(guān)注
關(guān)注
1文章
3305瀏覽量
49221 -
Direct
+關(guān)注
關(guān)注
0文章
15瀏覽量
11180 -
rabbitmq
+關(guān)注
關(guān)注
0文章
18瀏覽量
1042
發(fā)布評論請先 登錄
相關(guān)推薦
RabbitMQ通信模型中的work模型
![<b class='flag-5'>RabbitMQ</b>通信<b class='flag-5'>模型</b><b class='flag-5'>中</b>的work<b class='flag-5'>模型</b>](https://file1.elecfans.com/web2/M00/A7/CB/wKgZomURKWuAE4irAAAiES4QRrA023.jpg)
RabbitMQ是什么
![<b class='flag-5'>RabbitMQ</b>是什么](https://file1.elecfans.com/web2/M00/A6/05/wKgaomURKf-AU2OUAAEy9TP0wxw707.jpg)
基于Docker Compose部署RabbitMQ的經(jīng)驗分享
![基于Docker Compose部署<b class='flag-5'>RabbitMQ</b>的經(jīng)驗分享](https://file1.elecfans.com/web2/M00/BA/8B/wKgZomWUxW2AcoWnAAAFkbxO78A320.png)
多態(tài)路由承載的內(nèi)容分發(fā)模型
![多態(tài)<b class='flag-5'>路由</b>承載的內(nèi)容分發(fā)<b class='flag-5'>模型</b>](https://file.elecfans.com/web2/M00/49/75/poYBAGKhwLeAVJohAAAVc7TtAEQ936.jpg)
什么情況下使用RabbitMQ或 Kafka
![什么情況下使用<b class='flag-5'>RabbitMQ</b>或 Kafka](https://file.elecfans.com/web2/M00/93/02/poYBAGP1fzOAJrivAABiVmXChpg745.jpg)
什么情況下使用RabbitMQ或 Kafka
![什么情況下使用<b class='flag-5'>RabbitMQ</b>或 Kafka](https://file.elecfans.com//web2/M00/94/19/pYYBAGP4Ky2AAxSvAAByOO0W5vk752.jpg)
評論