什么是AMQP
RabbitMQ:(Rabbit,兔子)由erlang语言开发,基于AMQP协议,在erlang语言特性的加持下,RabbitMQ稳定性要比其他的MQ产品好一些,而且erlang语言本身是面向高并发的编程的语言,所以RabbitMQ速度也非常快。且它基于AMQP协议,对分布式、微服务更友好。
安装
使用Docker安装,安装请前往:
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
使用docker-compose安装,配置文件如下:
version: '3' services: rabbitmq: image: rabbitmq:3-management ports: - 5672:5672 - 15672:15672
- 端口15672,web端
- 端口5672,API端,写代码发布或获取消息时,需要监听此端口
启动命令
docker-compose up -d
Web端
本地访问localhost:15672
登录名:guest,密码:guest
登录成功如下图:
![notion image](https://www.notion.so/image/https%3A%2F%2Ffile.notion.so%2Ff%2Ff%2Fd54b0f25-9426-4b83-9c51-2e6165a030b2%2F1a94c783-76fc-4501-ac10-3878a61e61b2%2FUntitled.png%3Fid%3D46814f2e-0621-48b0-a1e4-7940a72229f3%26table%3Dblock%26spaceId%3Dd54b0f25-9426-4b83-9c51-2e6165a030b2%26expirationTimestamp%3D1722052800000%26signature%3DOCuj3wJTU5DjuOJymtEZp17BOxlHohrngpPVQneknsw?table=block&id=46814f2e-0621-48b0-a1e4-7940a72229f3&cache=v2)
概念
Virtual Host
Virtual Host类似mysql中的Database,用来区分每个用户所管理的数据区域。
![notion image](https://www.notion.so/image/https%3A%2F%2Ffile.notion.so%2Ff%2Ff%2Fd54b0f25-9426-4b83-9c51-2e6165a030b2%2F69c8c14a-9d6b-4787-b590-fb4033e96b81%2FUntitled.png%3Fid%3Dd92fc0bd-d359-4169-a5f3-0e033b73025d%26table%3Dblock%26spaceId%3Dd54b0f25-9426-4b83-9c51-2e6165a030b2%26expirationTimestamp%3D1722052800000%26signature%3DtBcxdZDLchc3jpx67vVmApwnI6qBp6Yuw3VIq5ZjZ6o?table=block&id=d92fc0bd-d359-4169-a5f3-0e033b73025d&cache=v2)
常见消息应用
Hello World
![notion image](https://www.notion.so/image/https%3A%2F%2Ffile.notion.so%2Ff%2Ff%2Fd54b0f25-9426-4b83-9c51-2e6165a030b2%2Fcc1757eb-34db-49f3-872a-4de15416b136%2FUntitled.png%3Fid%3Da377644b-3f99-40be-9f0c-a23634ca1ff3%26table%3Dblock%26spaceId%3Dd54b0f25-9426-4b83-9c51-2e6165a030b2%26expirationTimestamp%3D1722052800000%26signature%3D5z0acRWOrdGUW0K0HUCqMNBS8Sc3V_FNmX4qs6UzJqk?table=block&id=a377644b-3f99-40be-9f0c-a23634ca1ff3&cache=v2)
- P(producer):生产者,发送消息的服务
- C(consumer):消费者,接收消息的服务,只允许一个消费者接受
- Queue:队列,存储消息
Work queues
![notion image](https://www.notion.so/image/https%3A%2F%2Ffile.notion.so%2Ff%2Ff%2Fd54b0f25-9426-4b83-9c51-2e6165a030b2%2F9f9ad870-e438-4c4c-a004-5934f695c16c%2FUntitled.png%3Fid%3D627b1541-05f8-4762-8de7-b1c0437f8a0b%26table%3Dblock%26spaceId%3Dd54b0f25-9426-4b83-9c51-2e6165a030b2%26expirationTimestamp%3D1722052800000%26signature%3DDaygrAOYqlEjA2CkvaL31UN55oiAYrCtcGBg9t_RgTw?table=block&id=627b1541-05f8-4762-8de7-b1c0437f8a0b&cache=v2)
只是在Hello World 方式下,增加了多个消费者。消费者如果都在运行时,生产者的消息会以Round-robin(循环)的方式被消费者接受。如果只有一个消费者,就是简单的Hello World方式。
Go完整代码:“Work queues” Example
Publish/Subscribe
![notion image](https://www.notion.so/image/https%3A%2F%2Ffile.notion.so%2Ff%2Ff%2Fd54b0f25-9426-4b83-9c51-2e6165a030b2%2F0b1cdbec-3455-4eed-ad63-ab14ddba24af%2FUntitled.png%3Fid%3D1b6be8d4-ec7e-407d-b135-80d108dc828c%26table%3Dblock%26spaceId%3Dd54b0f25-9426-4b83-9c51-2e6165a030b2%26expirationTimestamp%3D1722052800000%26signature%3D1TMNd3xO2lqYuXxBkg-qzfelPsVWGaayPiXtzSYAyNA?table=block&id=1b6be8d4-ec7e-407d-b135-80d108dc828c&cache=v2)
以上的消息是发送到 Queue,而这次是将消息发送到 Exchange (交换机),交换机的类型有 direct, topic, headers and fanout。Exchange 会将与它匹配的 Queue 广播的发送出。
本次先对 fanout 进行讲解,先创建 fanout 类型的 Exchange,使用了Go代码,后面会有完整的代码链接:
err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs", // exchange "", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
绑定
![notion image](https://www.notion.so/image/https%3A%2F%2Ffile.notion.so%2Ff%2Ff%2Fd54b0f25-9426-4b83-9c51-2e6165a030b2%2Fb800315e-83f8-4928-b133-7961e96965a7%2FUntitled.png%3Fid%3D89000646-a994-448b-b905-90ffa5de4208%26table%3Dblock%26spaceId%3Dd54b0f25-9426-4b83-9c51-2e6165a030b2%26expirationTimestamp%3D1722052800000%26signature%3D3Z6dlw7NOoTj1b6jPD14DpLn1brJcAQ4KC-pChkblV8?table=block&id=89000646-a994-448b-b905-90ffa5de4208&cache=v2)
对于 fanout,routing key 为空字符串,此时如果没有消费者消费消息时,那发出去的消息会丢弃掉。现在看看如果绑定将 Exchange 与 Queue 绑定。
err = ch.QueueBind( q.Name, // queue name "", // routing key "logs", // exchange false, nil, )
上面设置的 routing key 为空,此时如何绑定 Queue ,可以先去看看临时队列,看完了后再看看详细代码。
Go完整代码:fanout example
Routing
有选择性的接受哪类消息,设置 Exchange 类型为 direct。消费端只能接收到启动后生产端发送来的消息,历史消息不能接收到,和 fanout 类型一样的。
![notion image](https://www.notion.so/image/https%3A%2F%2Ffile.notion.so%2Ff%2Ff%2Fd54b0f25-9426-4b83-9c51-2e6165a030b2%2Fc3f3d1a9-400b-414c-8b49-0fc50b379338%2FUntitled.png%3Fid%3Da5280ac2-daeb-42c9-be46-858d074a4a38%26table%3Dblock%26spaceId%3Dd54b0f25-9426-4b83-9c51-2e6165a030b2%26expirationTimestamp%3D1722052800000%26signature%3DzFbr7t0PiK1hJYcpk5F54_l_4vNbdY_M-Bzg7zb-Fjw?table=block&id=a5280ac2-daeb-42c9-be46-858d074a4a38&cache=v2)
消费端只需要关注它所要接受的 Routing key (路由键) 无需关注队列名称的是什么。如上图:可以定义 Routing key 为 error、info、error、warning,至于将消息发送到哪个队列,则不需要关心。
生产端:
err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) utils.FailOnError(err, "Failed to declare an exchange") body := utils.BodyFrom(os.Args) err = ch.Publish( "logs_direct", // exchange severityFrom(os.Args), // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
消费端:
err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_direct", // exchange false, nil)
Go完整代码:routing direct example
Topic
Exchange 设置为 Topic,消费端可以支持 Routing key 的模糊匹配。在设置 Topic 时,必须以英文句号分割词。例如:a.b.c。设置 Routing key 时,最大支持255字节。
消费端模糊匹配支持 * 和 #:
- * 匹配一个词
- # 匹配多个词
![notion image](https://www.notion.so/image/https%3A%2F%2Ffile.notion.so%2Ff%2Ff%2Fd54b0f25-9426-4b83-9c51-2e6165a030b2%2F6795c42f-1e6d-492c-bda2-99d3472075bb%2FUntitled.png%3Fid%3Daa337563-4a8f-43bc-be50-55947aa6f2fc%26table%3Dblock%26spaceId%3Dd54b0f25-9426-4b83-9c51-2e6165a030b2%26expirationTimestamp%3D1722052800000%26signature%3DoVLkSup8CpYSHbxhSZuKdTU5aO22ug4QxhU0KmUXZyg?table=block&id=aa337563-4a8f-43bc-be50-55947aa6f2fc&cache=v2)
Go完整代码:topic exchange example
RPC/Publisher Confirms
现在不研究,自我感觉用的比较少,等用到了再说
临时队列
之前我们学的队列,在进行发送消息时,必须明确指定队列的名称。但对于 Temporary queues (临时队列),有如下特点:
- 不需要设置队列名称
- 每次重新连接时,都自动创建新的队列名称
- 当消费者都断开连接时,队列会自动删除
- 当前连接才能消费自动创建的队列消息
通过Go代码举例:
q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments )
设置 exclusive 为 true 时,自动创建的队列名称类似:amq.gen-JzTY20BRgKO-HjmUJj0wLg 这样的名称。