Class: Qwirk::Publisher

Inherits:
Object
  • Object
show all
Includes:
Rumx::Bean
Defined in:
lib/qwirk/publisher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(adapter_factory, options) ⇒ Publisher

Parameters:

One of the following must be specified
  :queue_name            => String: Name of the Queue to publish to
  :topic_name            => String: Name of the Topic to publish to
Optional:
  :time_to_live          => expiration time in ms for the message (JMS)
  :persistent            => true or false (defaults to false) (JMS)
  :marshal               => Symbol: One of :ruby, :string, :json, :bson, :yaml or any registered types (See Qwirk::MarshalStrategy), defaults to :ruby
  :response              => if true or a hash of response options, a temporary reply queue will be setup for handling responses
    :time_to_live        => expiration time in ms for the response message(s) (JMS))
    :persistent          => true or false for the response message(s), set to false if you don't want timed out messages ending up in the DLQ (defaults to true unless time_to_live is set)


22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/qwirk/publisher.rb', line 22

def initialize(adapter_factory, options)
  options = options.dup
  @queue_name = options.delete(:queue_name)
  @topic_name = options.delete(:topic_name)
  raise "One of :queue_name or :topic_name must be given in #{self.class.name}" if !@queue_name && !@topic_name

  @response_options = options.delete(:response)
  # response_options should only be a hash or the values true or false
  @response_options = {} if @response_options && !@response_options.kind_of?(Hash)

  @tasks      = {}
  @impl       = adapter_factory.create_publisher_impl(@queue_name, @topic_name, options, @response_options)
  marshal_sym = options[:marshal] || :ruby
  @marshaler  = Qwirk::MarshalStrategy.find(marshal_sym)
end

Instance Attribute Details

#implObject (readonly)

attr_reader :producer_options, :persistent, :reply_queue



7
8
9
# File 'lib/qwirk/publisher.rb', line 7

def impl
  @impl
end

#marshalerObject (readonly)

attr_reader :producer_options, :persistent, :reply_queue



7
8
9
# File 'lib/qwirk/publisher.rb', line 7

def marshaler
  @marshaler
end

#response_optionsObject (readonly)

attr_reader :producer_options, :persistent, :reply_queue



7
8
9
# File 'lib/qwirk/publisher.rb', line 7

def response_options
  @response_options
end

Instance Method Details

#create_fail_queue_consumer(fail_queue_name = nil) ⇒ Object



63
64
65
# File 'lib/qwirk/publisher.rb', line 63

def create_fail_queue_consumer(fail_queue_name=nil)
  fail_queue_name || default_fail_queue_name
end

#create_producer_consumer_pair(task) ⇒ Object

Creates a producer/consumer pair for writing and reading responses for a given task. It will return a pair of [producer, consumer]. The producer will publish objects specifically for the task. The consumer is an object that responds_to receive which will return a [message_id, response object] and acknowledge_message which will acknowledge the last message read. It should also respond to stop which will interrupt any receive calls causing it to return nil.



50
51
52
53
# File 'lib/qwirk/publisher.rb', line 50

def create_producer_consumer_pair(task)
  @tasks[task.task_id] = task
  @impl.create_producer_consumer_pair(task.task_id, @marshaler)
end

#default_fail_queue_nameObject



59
60
61
# File 'lib/qwirk/publisher.rb', line 59

def default_fail_queue_name
  Qwirk.fail_queue_name(@queue_name || @topic_name)
end

#publish(object, props = {}) ⇒ Object

Publish the given object to the address.



39
40
41
42
43
44
# File 'lib/qwirk/publisher.rb', line 39

def publish(object, props={})
  start = Time.now
  marshaled_object = @marshaler.marshal(object)
  adapter_info = @impl.publish(marshaled_object, @marshaler, nil, props)
  return PublishHandle.new(self, adapter_info, start)
end

#to_sObject



55
56
57
# File 'lib/qwirk/publisher.rb', line 55

def to_s
  "#{self.class.name}:#{@queue_name || @topic_name}"
end