Class: Qwirk::Adapter::InMemory::Publisher
- Inherits:
-
Object
- Object
- Qwirk::Adapter::InMemory::Publisher
- Defined in:
- lib/qwirk/adapter/in_memory/publisher.rb
Defined Under Namespace
Classes: MyTaskConsumer, MyTaskProducer
Instance Method Summary collapse
- #create_fail_producer_consumer_pair(task_id, marshaler) ⇒ Object
-
#create_producer_consumer_pair(task_id, marshaler) ⇒ Object
See Qwirk::Publisher#create_producer_consumer_pair for the requirements for this method.
- #default_marshal_sym ⇒ Object
-
#initialize(adapter_factory, queue_name, topic_name, options, response_options) ⇒ Publisher
constructor
A new instance of Publisher.
-
#publish(marshaled_object, marshaler, task_id, props) ⇒ Object
Publish the given object and return the reply_queue as the adapter_info.
-
#with_response(reply_queue) {|reply_queue| ... } ⇒ Object
See Qwirk::PublishHandle#read_response for the requirements for this method.
Constructor Details
#initialize(adapter_factory, queue_name, topic_name, options, response_options) ⇒ Publisher
Returns a new instance of Publisher.
6 7 8 9 |
# File 'lib/qwirk/adapter/in_memory/publisher.rb', line 6 def initialize(adapter_factory, queue_name, topic_name, , ) @queue_name, @topic_name, @options, @response_options = queue_name, topic_name, , @queue = Factory.get_publisher_queue(queue_name, topic_name) end |
Instance Method Details
#create_fail_producer_consumer_pair(task_id, marshaler) ⇒ Object
43 44 45 46 47 48 49 50 |
# File 'lib/qwirk/adapter/in_memory/publisher.rb', line 43 def create_fail_producer_consumer_pair(task_id, marshaler) consumer_queue = Queue.new("#{@queue}Fail:#{task_id}") # TODO: Unlimited queue or some form of exception on maximum consumer_queue.max_size = -1 producer = MyTaskProducer.new(@queue, consumer_queue, marshaler, {}) consumer = MyTaskConsumer.new(@queue, consumer_queue) return producer, consumer end |
#create_producer_consumer_pair(task_id, marshaler) ⇒ Object
See Qwirk::Publisher#create_producer_consumer_pair for the requirements for this method
35 36 37 38 39 40 41 |
# File 'lib/qwirk/adapter/in_memory/publisher.rb', line 35 def create_producer_consumer_pair(task_id, marshaler) consumer_queue = Queue.new("#{@queue}:#{task_id}") consumer_queue.max_size = @response_options[:queue_max_size] || 100 producer = MyTaskProducer.new(@queue, consumer_queue, marshaler, @response_options) consumer = MyTaskConsumer.new(@queue, consumer_queue) return producer, consumer end |
#default_marshal_sym ⇒ Object
11 12 13 |
# File 'lib/qwirk/adapter/in_memory/publisher.rb', line 11 def default_marshal_sym :none end |
#publish(marshaled_object, marshaler, task_id, props) ⇒ Object
Publish the given object and return the reply_queue as the adapter_info.
16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/qwirk/adapter/in_memory/publisher.rb', line 16 def publish(marshaled_object, marshaler, task_id, props) # Since we're in-memory, we'll just unmarshal the object so there is less info to carry around object = marshaler.unmarshal(marshaled_object) reply_queue = nil if @response_options reply_queue = ReplyQueue.new("#{@queue}:#{object.to_s}") end @queue.write([object, reply_queue]) # Return the object to get sent to with_response below. return reply_queue end |
#with_response(reply_queue) {|reply_queue| ... } ⇒ Object
See Qwirk::PublishHandle#read_response for the requirements for this method.
29 30 31 32 |
# File 'lib/qwirk/adapter/in_memory/publisher.rb', line 29 def with_response(reply_queue, &block) raise "Could not find reply_queue for #{@queue}" unless reply_queue yield reply_queue end |