Flushing a queue using AMQP, Rabbit, and Ruby

Posted by jeffshantz on Stack Overflow See other posts from Stack Overflow or by jeffshantz
Published on 2010-04-15T01:49:15Z Indexed on 2010/04/15 1:53 UTC
Read the original article Hit count: 469

Filed under:
|
|

I'm developing a system in Ruby that is going to make use of RabbitMQ to send messages to a queue as it does some work. I am using:

  • Ruby 1.9.1 Stable
  • RabbitMQ 1.7.2
  • AMQP gem v0.6.7 (http://github.com/tmm1/amqp)

Most of the examples I have seen on this gem have their publish calls in an EM.add_periodic_timer block. This doesn't work for what I suspect is a vast majority of use cases, and certainly not for mine. I need to publish a message as I complete some work, so putting a publish statement in an add_periodic_timer block doesn't suffice.

So, I'm trying to figure out how to publish a few messages to a queue, and then "flush" it, so that any messages I've published are then delivered to my subscribers.

To give you an idea of what I mean, consider the following publisher code:

#!/usr/bin/ruby

require 'rubygems'
require 'mq'

MESSAGES = ["hello","goodbye","test"]

AMQP.start do

  queue = MQ.queue('testq')
  messages_published = 0

  while (messages_published < 50)

    if (rand() < 0.4)
      message = MESSAGES[rand(MESSAGES.size)]
      puts "#{Time.now.to_s}: Publishing: #{message}"
      queue.publish(message)
      messages_published += 1
    end

    sleep(0.1)

  end

  AMQP.stop do
    EM.stop
  end

end

So, this code simply loops, publishing a message with 40% probability on each iteration of the loop, and then sleeps for 0.1 seconds. It does this until 50 messages have been published, and then stops AMQP. Of course, this is just a proof of concept.

Now, my subscriber code:

#!/usr/bin/ruby

require 'rubygems'
require 'mq'

AMQP.start do

  queue = MQ.queue('testq') 
  queue.subscribe do |header, msg|
    puts "#{Time.now.to_s}: Received #{msg}"
  end

end

So, we just subscribe to the queue, and for each message received, we print it out.

Great, except that the subscriber only receives all 50 messages when the publisher calls AMQP.stop.

Here's the output from my publisher. It has been truncated in the middle for brevity:

$ ruby publisher.rb 
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:42 -0400: Publishing: hello
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:43 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: test
2010-04-14 21:45:45 -0400: Publishing: test
.
.
.
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: goodbye

Next, the output from my subscriber:

$ ruby consumer.rb
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received hello
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
.
.
.
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye

If you note the timestamps in the output, the subscriber only receives all of the messages once the publisher has stopped AMQP and exited.

So, being an AMQP newb, how can I get my messages to deliver immediately? I tried putting AMQP.start and AMQP.stop in the body of the while loop of the publisher, but then only the first message gets delivered -- though strangely, if I turn on logging, there are no error messages reported by the server and the messages do get sent to the queue, but never get received by the subscriber.

Suggestions would be much appreciated. Thanks for reading.

© Stack Overflow or respective owner

Related posts about ruby

Related posts about amqp