什么是AMQP
RabbitMQ:(Rabbit,兔子)由erlang语言开发,基于AMQP协议,在erlang语言特性的加持下,RabbitMQ稳定性要比其他的MQ产品好一些,而且erlang语言本身是面向高并发的编程的语言,所以RabbitMQ速度也非常快。且它基于AMQP协议,对分布式、微服务更友好。
安装
使用Docker安装,安装请前往:
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
使用docker-compose安装,配置文件如下:
version: '3' services: rabbitmq: image: rabbitmq:3-management ports: - 5672:5672 - 15672:15672
- 端口15672,web端
- 端口5672,API端,写代码发布或获取消息时,需要监听此端口
启动命令
docker-compose up -d
Web端
本地访问localhost:15672
登录名:guest,密码:guest
登录成功如下图:
概念
Virtual Host
Virtual Host类似mysql中的Database,用来区分每个用户所管理的数据区域。
常见消息应用
Hello World
- P(producer):生产者,发送消息的服务
- C(consumer):消费者,接收消息的服务,只允许一个消费者接受
- Queue:队列,存储消息
Work queues
只是在Hello World 方式下,增加了多个消费者。消费者如果都在运行时,生产者的消息会以Round-robin(循环)的方式被消费者接受。如果只有一个消费者,就是简单的Hello World方式。
Go完整代码:“Work queues” Example
Publish/Subscribe
以上的消息是发送到 Queue,而这次是将消息发送到 Exchange (交换机),交换机的类型有 direct, topic, headers and fanout。Exchange 会将与它匹配的 Queue 广播的发送出。
本次先对 fanout 进行讲解,先创建 fanout 类型的 Exchange,使用了Go代码,后面会有完整的代码链接:
err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs", // exchange "", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
绑定
对于 fanout,routing key 为空字符串,此时如果没有消费者消费消息时,那发出去的消息会丢弃掉。现在看看如果绑定将 Exchange 与 Queue 绑定。
err = ch.QueueBind( q.Name, // queue name "", // routing key "logs", // exchange false, nil, )
上面设置的 routing key 为空,此时如何绑定 Queue ,可以先去看看临时队列,看完了后再看看详细代码。
Go完整代码:fanout example
Routing
有选择性的接受哪类消息,设置 Exchange 类型为 direct。消费端只能接收到启动后生产端发送来的消息,历史消息不能接收到,和 fanout 类型一样的。
消费端只需要关注它所要接受的 Routing key (路由键) 无需关注队列名称的是什么。如上图:可以定义 Routing key 为 error、info、error、warning,至于将消息发送到哪个队列,则不需要关心。
生产端:
err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) utils.FailOnError(err, "Failed to declare an exchange") body := utils.BodyFrom(os.Args) err = ch.Publish( "logs_direct", // exchange severityFrom(os.Args), // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
消费端:
err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_direct", // exchange false, nil)
Go完整代码:routing direct example
Topic
Exchange 设置为 Topic,消费端可以支持 Routing key 的模糊匹配。在设置 Topic 时,必须以英文句号分割词。例如:a.b.c。设置 Routing key 时,最大支持255字节。
消费端模糊匹配支持 * 和 #:
- * 匹配一个词
- # 匹配多个词
Go完整代码:topic exchange example
RPC/Publisher Confirms
现在不研究,自我感觉用的比较少,等用到了再说
临时队列
之前我们学的队列,在进行发送消息时,必须明确指定队列的名称。但对于 Temporary queues (临时队列),有如下特点:
- 不需要设置队列名称
- 每次重新连接时,都自动创建新的队列名称
- 当消费者都断开连接时,队列会自动删除
- 当前连接才能消费自动创建的队列消息
通过Go代码举例:
q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments )
设置 exclusive 为 true 时,自动创建的队列名称类似:amq.gen-JzTY20BRgKO-HjmUJj0wLg 这样的名称。