Module: SQS::Transport

Defined in:
lib/sqs/transport.rb

Overview

Include the SQS::Transport mixin in a class to add SQS enqueue and dequeue capabilities.

After including, call set_queue_name to specify which SQS queue will receive messages.

There are two ways to define the data that will be included in queued messages:

  • call include_in_sqs_message with an array of attributes to include

  • implement to_sqs_message and return an :attr => value hash

validate_message can be implemented to validate message data before it is pushed to SQS. If this method raises an exception, the message will not be queued.

As an example:

class MyMessage
  include SQS::Transport

  set_queue_name :widgets

  attr_accessor :color, :size
  include_in_sqs_message :color, :size

  def validate_message
    raise "Must have both size and color" if color.nil? || size.nil?
  end

end

creates a class that would then allow:

message = MyMessage.new
message.color = 'red'
message.size = 'xtrasmall'
message.queue!

at which point a :size => trasmall, :color => red message would be pushed onto the “widgets” SQS queue

A client using the same class could then call:

incoming = MyMessage.receive
puts incoming.color # "red"
puts incoming.size # "xtrasmall"
incoming.dequeue!  # message is removed from SQS queue

Note that by default JSON serializtion is used, so some type semanitics are not preseved. For example, if you set incoming.color to {:shade => :red} the data would be received as a string ({'shade' => 'red'}) The serialization method is customizable by overriding serialize on SQS::Message.

SQS::Transport stamps the enqueue and dequeue times, which can be used to calculate time in queue

puts incoming.enqueued_at # Mon Jul 06 22:16:15 UTC 2009
puts incoming.dequeued_at # Mon Jul 06 22:16:25 UTC 2009
puts incoming.time_in_queue # 10

In addition, the class includes a peek method that can be used to inspect queued messages without removing them from the queue.

Defined Under Namespace

Modules: ClassMethods

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#sqs_messageObject

Returns the associated SQS::Message instance



150
151
152
153
# File 'lib/sqs/transport.rb', line 150

def sqs_message
  @sqs_message ||= SQS::Message.new(self.class.sqs_queue_name)
  return @sqs_message
end

Class Method Details

.included(base) ⇒ Object

:nodoc:



60
61
62
# File 'lib/sqs/transport.rb', line 60

def self.included(base) # :nodoc:
  base.extend ClassMethods
end

Instance Method Details

#dequeue!Object

Removes a retrieved instance from the SQS queue.



145
146
147
# File 'lib/sqs/transport.rb', line 145

def dequeue!
 sqs_message.dequeue!
end

#dequeued_atObject

Returns the time the message was dequeued



166
167
168
# File 'lib/sqs/transport.rb', line 166

def dequeued_at
  sqs_message.dequeued_at
end

#enqueued_atObject

Returns the time the message was queued



161
162
163
# File 'lib/sqs/transport.rb', line 161

def enqueued_at
  sqs_message.enqueued_at
end

#queue!Object

Validates the message, then pushes it onto the associated queue. If validation raises an exception, the message will not be queued and the exception will bubble up.



138
139
140
141
142
# File 'lib/sqs/transport.rb', line 138

def queue!
  validate_message
  sqs_message.contents = to_sqs_message
  sqs_message.queue!
end

#sqs_message_idObject

Returns the Amazon id for the SQS message.



156
157
158
# File 'lib/sqs/transport.rb', line 156

def sqs_message_id
  sqs_message.id
end

#time_in_queueObject

Returns the amount of time a message spent in the queue, in seconds.



171
172
173
# File 'lib/sqs/transport.rb', line 171

def time_in_queue
  sqs_message.time_in_queue
end

#to_sqs_messageObject

Returns a hash that will be serialized and sent to SQS. By default, returns a {'attr' => 'value'} hash containing all attributes defined by the include_in_sqs_message call.

Override this to return a custom => ‘value’ hash for more control over serialization.



127
128
129
# File 'lib/sqs/transport.rb', line 127

def to_sqs_message
  Hash[*self.class.attrs_for_sqs_message.collect {|a| [a, self.send(a)]}.flatten]
end

#validate_messageObject

Implement validate_message to check message data before it is sent to SQS. If this method raises an exception, message will not be queued.



133
134
# File 'lib/sqs/transport.rb', line 133

def validate_message
end