Home

Hi, I'm Chareice.

很高兴见到你

RabbitMQJava算法计算机科学基础PythonRubyRails
12 Mar 2015

RabbitMQ 教程系列第一部分 Hello World

1. 简介

RabbitMQ是一个消息代理。从本质上说,它接受生产者(producers)的消息,然后把这些消息交付给消费者(consumers)。 在这之间,它可以根据你制定的规则来制定消息的路径,缓冲消息,持久化消息。

RabbitMQ,或者说信息传递,使用到的一些术语。

  • 生产(Producing)意味着发送,一个发送消息的程序,可以称作生产者(producer)。我们可以这么表示。

producer
  • 队列(Queue),是一个信箱的名称,它存在于RabbitMQ的内部。尽管消息流在RabbitMQ和你的应用程序之间穿过, 但是它们也可以只被存储在队列中。一个队列没有绑定任何的限制,它可以存储你希望的,尽可能多的信息 — 实际上 它是一个无限大小的缓冲区。许多不同的生产者可以发送消息到同一个队列中,不同的消费者也可以从同一个队列中读取消息。 一个队列可以被绘制为下面这张图,图形上面写队列的名字。

queue
  • 消费(Consuming)和接收是同一个意思。一个消费者是一个等待接收消息的应用程序。可以用如下图形表示。

consumer

需要注意的是,生产者、消费者和消息代理不需要在同一台机器上,实际上在应用中也是这个情况。

2. "Hello World"

在这一部分我们将使用Ruby编写两个小程序。一个生产者发送单条信息,一个消费者接收这条信息然后将其打印出来。 我们将掩盖一些 Bunny API的细节,全神贯注地做简单的事情。

如下面的图形所示,"P"是我们的生产者,"C"是我们的消费者。在中间的盒子是一个队列 — 一个RabbitMQ的消息缓冲区,代表消费者。

RabbitMQ使用AMQP 0.9.1 协议,这是一个开源的消息传递协议。RabbitMQ有多种不同的客户端实现,在这个教程中我们将使用Bunny。

2.1. 发送

sending

我们称我们的消息发送为 send.rb ,消息接收为 receive.rb。 发送者将连接上RabbitMQ,发送一条消息然后退出。

send.rb 中,首先我们需要引入 Bunny 库。

#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"

接下来连接 RabbitMQ 服务器。

conn = Bunny.new
conn.start

连接抽象了socket连接,并且帮我们处理了协议层的对接。现在我们使用默认的设置连接本地机器上的消息代理服务器。

如果我们想要连接一个远程端口的消息代理,我们可以通过 :hostname 选项来指定不同的服务器地址。

conn = Bunny.new(:hostname => "rabbit.local")
conn.start

下面我们创建一个通道,通道是大多数API完成任务的地方。

ch = conn.create_channel

要发送消息,我们必须声明我们要发送的队列,这样我们就可以向队列中发送消息。

q = ch.queue('hello')
ch.default_exchange.publish("Hello World", :routing_key => q.name)
puts " [x] Send 'Hello World!'"

声明一个队列是幂等的,只有在该队列没有创建的时候才会被创建。消息的内容是字节数组,所以你可以编码任何你想要的。

最后,关闭连接。

conn.close

2.2. 接收

刚才是我们的生产者,发送消息。我们的接收者是要从RabbitMQ一直接收消息,所以不同于消息发送者,我们的接收者必须一直运行监听消息。

receiving

receive.rb 也和 send.rb 一样,要先引入 Bunny

#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"

设置连接也是和 sender 一样。我们打开一个连接和一个通道,并且定义我们想要监听消息的队列名称,这里要注意的是监听的队列名称要和消息发布的一致。

conn = Bunny.new
conn.start

ch = conn.create_channel
q = ch.queue('hello')

需要注意的是,我们在这里声明了队列名称,因为也许我们的接受者会比发送者更早一步运行,我们必须要确保我们要消费的这个队列是存在的。

我们将要告诉服务器我们需要从这个队列里面接收消息,之后服务器就会异步地推送消息给我们的消费者,我们提供了一个将会在消息推送过来时运行的回调。

puts ' [*] Waiting for messages in #{q.name}. To exit press CTRL+C'
q.subscribe(:block => true) do |delivery_info, properties, body|
  puts ' [x] Received #{body}'

  delivery_info.consumer.cancel
end

Bunny::Queue#subscribe 方法使用 :block 参数来阻塞线程。

Load Comments