RabbitMQ笔记
Created|Updated
|Word Count:2.1k|Post Views:
docker安装RabbitMQ
1 2 3 4 5 6 7
| docker run \ -d --name myrabbitmq \ -p 5672:5672 -p 15672:15672 \ -v rabbitmqData:/var/lib/rabbitmq \ -e RABBITMQ_DEFAULT_USER=username \ -e RABBITMQ_DEFAULT_PASS=password \ rabbitmq:management
|
几种基础模型
基础生产者与消费者模型
- 一对一
- 生产者生产消息后放入消息队列,消费者从队列中获得消息后进行消费。
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class Provider {
@Test public void testSendMessage() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("123.56.2.36"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/ems"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",true,false,false,null);
channel.basicPublish("","hello",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
channel.close(); connection.close(); } }
|
消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class Customer { @Test public void testConsume() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("123.56.2.36"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/ems"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", true, false, false, null);
channel.basicConsume("hello", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } }
|
把其中创建连接的代码提取成工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class RabbitMQConnectionUtil {
private static ConnectionFactory connectionFactory = new ConnectionFactory();
public static Connection getConnection() { try { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("123.56.2.36"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/ems"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); return connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null; }
public static void closeConnectionAndChanel(Channel channel, Connection connection) { try { if (channel != null) channel.close(); if (connection != null) connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
|
工作队列模型
Fanout模型
- 生产者把消息发送到交换机,交换机再通过队列发送给消费者
- 有多个消费者,每个消费者都有自己的queue,每个queue都需要绑定到exchange(交换机)
- exchange会把消息发送到所有绑定的queue中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class Provider { public static void main(String[] args) throws IOException { Connection connection = RabbitMQConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout");
for (int i = 0; i < 20; i++) { channel.basicPublish("logs", "", null, "test".getBytes()); }
RabbitMQConnectionUtil.closeConnectionAndChanel(channel, connection); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class Consumer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs", "");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer1:" + new String(body)); } }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class Consumer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs", "");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer2:" + new String(body)); } }); } }
|
provider生产的消息会同时发给Consumer1和Consumer2
Routing模型(direct)
- 队列与交换机的绑定需要指定routingKey
- 消息的发送方再向exchange发消息时,也要指定routingKey
- exchange会比对routingKey,讲消息发送到对应的queue中
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class Provider { public static void main(String[] args) throws IOException { Connection connection = RabbitMQConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_direct", "direct"); String routingKey = "info"; channel.basicPublish("logs_direct", routingKey, null, ("这是direct模型发布的基于routingKey:" + routingKey).getBytes()); RabbitMQConnectionUtil.closeConnectionAndChanel(channel, connection); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class Consumer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_direct", "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs_direct", "error");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer1:" + new String(body)); } }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class Consumer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_direct", "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs_direct", "info"); channel.queueBind(queueName, "logs_direct", "error");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer2:" + new String(body)); } }); } }
|
Routing模型(topic)
- 与direct类似
- 不同之处在于topic类型的exchange可以让queue在绑定routingKey的时候使用通配符
- 这种模型的routingKey一般由几个单词组成,多个单词之间用’.’分隔,例如
message.info
- 通配符:
*
匹配一个词,#
匹配多个词(包括0个)
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class Provider { public static void main(String[] args) throws IOException { Connection connection = RabbitMQConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_topic", "topic"); String routingKey = "user.save"; channel.basicPublish("logs_topic", routingKey, null, ("这是direct模型发布的基于routingKey:" + routingKey).getBytes()); RabbitMQConnectionUtil.closeConnectionAndChanel(channel, connection); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class Consumer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_topic", "topic"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs_topic", "user.*");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer1:" + new String(body)); } }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class Consumer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_topic", "topic"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs_topic", "*.save");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer2:" + new String(body)); } }); } }
|
整合SpringBoot
1、引入依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
2、application.yml配置
1 2 3 4 5 6 7
| spring: rabbitmq: host: 123.56.2.36 port: 5672 username: admin password: admin virtual-host: /ems
|
剩下的以后再更新(
11111