Class: Qwirk::Publisher
- Inherits:
-
Object
- Object
- Qwirk::Publisher
- Includes:
- Rumx::Bean
- Defined in:
- lib/qwirk/publisher.rb
Instance Attribute Summary collapse
-
#impl ⇒ Object
readonly
attr_reader :producer_options, :persistent, :reply_queue.
-
#marshaler ⇒ Object
readonly
attr_reader :producer_options, :persistent, :reply_queue.
-
#response_options ⇒ Object
readonly
attr_reader :producer_options, :persistent, :reply_queue.
Instance Method Summary collapse
- #create_fail_queue_consumer(fail_queue_name = nil) ⇒ Object
-
#create_producer_consumer_pair(task) ⇒ Object
Creates a producer/consumer pair for writing and reading responses for a given task.
- #default_fail_queue_name ⇒ Object
-
#initialize(adapter_factory, options) ⇒ Publisher
constructor
Parameters: One of the following must be specified :queue_name => String: Name of the Queue to publish to :topic_name => String: Name of the Topic to publish to Optional: :time_to_live => expiration time in ms for the message (JMS) :persistent => true or false (defaults to false) (JMS) :marshal => Symbol: One of :ruby, :string, :json, :bson, :yaml or any registered types (See Qwirk::MarshalStrategy), defaults to :ruby :response => if true or a hash of response options, a temporary reply queue will be setup for handling responses :time_to_live => expiration time in ms for the response message(s) (JMS)) :persistent => true or false for the response message(s), set to false if you don’t want timed out messages ending up in the DLQ (defaults to true unless time_to_live is set).
-
#publish(object, props = {}) ⇒ Object
Publish the given object to the address.
- #to_s ⇒ Object
Constructor Details
#initialize(adapter_factory, options) ⇒ Publisher
Parameters:
One of the following must be specified
:queue_name => String: Name of the Queue to publish to
:topic_name => String: Name of the Topic to publish to
Optional:
:time_to_live => expiration time in ms for the message (JMS)
:persistent => true or false (defaults to false) (JMS)
:marshal => Symbol: One of :ruby, :string, :json, :bson, :yaml or any registered types (See Qwirk::MarshalStrategy), defaults to :ruby
:response => if true or a hash of response options, a temporary reply queue will be setup for handling responses
:time_to_live => expiration time in ms for the response message(s) (JMS))
:persistent => true or false for the response message(s), set to false if you don't want timed out messages ending up in the DLQ (defaults to true unless time_to_live is set)
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/qwirk/publisher.rb', line 22 def initialize(adapter_factory, ) = .dup @queue_name = .delete(:queue_name) @topic_name = .delete(:topic_name) raise "One of :queue_name or :topic_name must be given in #{self.class.name}" if !@queue_name && !@topic_name @response_options = .delete(:response) # response_options should only be a hash or the values true or false @response_options = {} if @response_options && !@response_options.kind_of?(Hash) @tasks = {} @impl = adapter_factory.create_publisher_impl(@queue_name, @topic_name, , @response_options) marshal_sym = [:marshal] || :ruby @marshaler = Qwirk::MarshalStrategy.find(marshal_sym) end |
Instance Attribute Details
#impl ⇒ Object (readonly)
attr_reader :producer_options, :persistent, :reply_queue
7 8 9 |
# File 'lib/qwirk/publisher.rb', line 7 def impl @impl end |
#marshaler ⇒ Object (readonly)
attr_reader :producer_options, :persistent, :reply_queue
7 8 9 |
# File 'lib/qwirk/publisher.rb', line 7 def marshaler @marshaler end |
#response_options ⇒ Object (readonly)
attr_reader :producer_options, :persistent, :reply_queue
7 8 9 |
# File 'lib/qwirk/publisher.rb', line 7 def @response_options end |
Instance Method Details
#create_fail_queue_consumer(fail_queue_name = nil) ⇒ Object
63 64 65 |
# File 'lib/qwirk/publisher.rb', line 63 def create_fail_queue_consumer(fail_queue_name=nil) fail_queue_name || default_fail_queue_name end |
#create_producer_consumer_pair(task) ⇒ Object
Creates a producer/consumer pair for writing and reading responses for a given task. It will return a pair of [producer, consumer]. The producer will publish objects specifically for the task. The consumer is an object that responds_to receive which will return a [message_id, response object] and acknowledge_message which will acknowledge the last message read. It should also respond to stop which will interrupt any receive calls causing it to return nil.
50 51 52 53 |
# File 'lib/qwirk/publisher.rb', line 50 def create_producer_consumer_pair(task) @tasks[task.task_id] = task @impl.create_producer_consumer_pair(task.task_id, @marshaler) end |
#default_fail_queue_name ⇒ Object
59 60 61 |
# File 'lib/qwirk/publisher.rb', line 59 def default_fail_queue_name Qwirk.fail_queue_name(@queue_name || @topic_name) end |
#publish(object, props = {}) ⇒ Object
Publish the given object to the address.
39 40 41 42 43 44 |
# File 'lib/qwirk/publisher.rb', line 39 def publish(object, props={}) start = Time.now marshaled_object = @marshaler.marshal(object) adapter_info = @impl.publish(marshaled_object, @marshaler, nil, props) return PublishHandle.new(self, adapter_info, start) end |
#to_s ⇒ Object
55 56 57 |
# File 'lib/qwirk/publisher.rb', line 55 def to_s "#{self.class.name}:#{@queue_name || @topic_name}" end |