Class: Qwirk::Adapter::Inline::Publisher
- Inherits:
-
Object
- Object
- Qwirk::Adapter::Inline::Publisher
- Defined in:
- lib/qwirk/adapter/inline/publisher.rb
Defined Under Namespace
Classes: MyResponseHandle
Instance Method Summary collapse
- #create_fail_producer_consumer_pair(task_id, marshaler) ⇒ Object
-
#create_producer_consumer_pair(task_id, marshaler) ⇒ Object
See Qwirk::Publisher#create_producer_consumer_pair for the requirements for this method.
- #default_marshal_sym ⇒ Object
-
#initialize(adapter_factory, queue_name, topic_name, options, response_options) ⇒ Publisher
constructor
A new instance of Publisher.
-
#publish(marshaled_object, marshaler, task_id, props) ⇒ Object
Publish the given object and return the reply_queue as the adapter_info.
-
#with_response(response_handle) {|response_handle| ... } ⇒ Object
See Qwirk::PublishHandle#read_response for the requirements for this method.
Constructor Details
#initialize(adapter_factory, queue_name, topic_name, options, response_options) ⇒ Publisher
Returns a new instance of Publisher.
21 22 23 |
# File 'lib/qwirk/adapter/inline/publisher.rb', line 21 def initialize(adapter_factory, queue_name, topic_name, , ) @adapter_factory, @queue_name, @topic_name, @options, @response_options = adapter_factory, queue_name, topic_name, , end |
Instance Method Details
#create_fail_producer_consumer_pair(task_id, marshaler) ⇒ Object
70 71 72 |
# File 'lib/qwirk/adapter/inline/publisher.rb', line 70 def create_fail_producer_consumer_pair(task_id, marshaler) # TBD end |
#create_producer_consumer_pair(task_id, marshaler) ⇒ Object
See Qwirk::Publisher#create_producer_consumer_pair for the requirements for this method
66 67 68 |
# File 'lib/qwirk/adapter/inline/publisher.rb', line 66 def create_producer_consumer_pair(task_id, marshaler) # TBD end |
#default_marshal_sym ⇒ Object
25 26 27 |
# File 'lib/qwirk/adapter/inline/publisher.rb', line 25 def default_marshal_sym :none end |
#publish(marshaled_object, marshaler, task_id, props) ⇒ Object
Publish the given object and return the reply_queue as the adapter_info.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/qwirk/adapter/inline/publisher.rb', line 30 def publish(marshaled_object, marshaler, task_id, props) response_handle = @response_options ? MyResponseHandle.new : nil # Since we're inline, we'll just unmarshal the object so there is less info to carry around object = marshaler.unmarshal(marshaled_object) if manager = @adapter_factory.manager @message_handled = false manager.worker_configs.each do |worker_config| if worker_config.active if @queue_name && @queue_name == worker_config.queue_name run_worker(worker_config, object, response_handle) @message_handled = true break elsif @topic_name && @topic_name == worker_config.topic_name run_worker(worker_config, object, response_handle) @message_handled = true end end end if !@message_handled && !@no_message_handled_warning Qwirk.logger.warn("Publish message #{object.inspect} being dropped as no Qwirk worker has been configured to handle it") @no_message_handled_warning = true end elsif !@no_manager_warning Qwirk.logger.warn("Publish message #{object.inspect} being dropped as no Qwirk manager has been configured for #{@adapter_factory.key}") @no_manager_warning = true end return response_handle end |
#with_response(response_handle) {|response_handle| ... } ⇒ Object
See Qwirk::PublishHandle#read_response for the requirements for this method.
60 61 62 63 |
# File 'lib/qwirk/adapter/inline/publisher.rb', line 60 def with_response(response_handle, &block) raise "Could not find response_handle" unless response_handle yield response_handle end |