Class: Qwirk::Adapter::Inline::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/qwirk/adapter/inline/publisher.rb

Defined Under Namespace

Classes: MyResponseHandle

Instance Method Summary collapse

Constructor Details

#initialize(adapter_factory, queue_name, topic_name, options, response_options) ⇒ Publisher

Returns a new instance of Publisher.



21
22
23
# File 'lib/qwirk/adapter/inline/publisher.rb', line 21

def initialize(adapter_factory, queue_name, topic_name, options, response_options)
  @adapter_factory, @queue_name, @topic_name, @options, @response_options = adapter_factory, queue_name, topic_name, options, response_options
end

Instance Method Details

#create_fail_producer_consumer_pair(task_id, marshaler) ⇒ Object



70
71
72
# File 'lib/qwirk/adapter/inline/publisher.rb', line 70

def create_fail_producer_consumer_pair(task_id, marshaler)
  # TBD
end

#create_producer_consumer_pair(task_id, marshaler) ⇒ Object

See Qwirk::Publisher#create_producer_consumer_pair for the requirements for this method



66
67
68
# File 'lib/qwirk/adapter/inline/publisher.rb', line 66

def create_producer_consumer_pair(task_id, marshaler)
  # TBD
end

#default_marshal_symObject



25
26
27
# File 'lib/qwirk/adapter/inline/publisher.rb', line 25

def default_marshal_sym
  :none
end

#publish(marshaled_object, marshaler, task_id, props) ⇒ Object

Publish the given object and return the reply_queue as the adapter_info.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/qwirk/adapter/inline/publisher.rb', line 30

def publish(marshaled_object, marshaler, task_id, props)
  response_handle = @response_options ? MyResponseHandle.new : nil
  # Since we're inline, we'll just unmarshal the object so there is less info to carry around
  object = marshaler.unmarshal(marshaled_object)
  if manager = @adapter_factory.manager
    @message_handled = false
    manager.worker_configs.each do |worker_config|
      if worker_config.active
        if @queue_name && @queue_name == worker_config.queue_name
          run_worker(worker_config, object, response_handle)
          @message_handled = true
          break
        elsif @topic_name && @topic_name == worker_config.topic_name
          run_worker(worker_config, object, response_handle)
          @message_handled = true
        end
      end
    end
    if !@message_handled && !@no_message_handled_warning
      Qwirk.logger.warn("Publish message #{object.inspect} being dropped as no Qwirk worker has been configured to handle it")
      @no_message_handled_warning = true
    end
  elsif !@no_manager_warning
    Qwirk.logger.warn("Publish message #{object.inspect} being dropped as no Qwirk manager has been configured for #{@adapter_factory.key}")
    @no_manager_warning = true
  end
  return response_handle
end

#with_response(response_handle) {|response_handle| ... } ⇒ Object

See Qwirk::PublishHandle#read_response for the requirements for this method.

Yields:

  • (response_handle)


60
61
62
63
# File 'lib/qwirk/adapter/inline/publisher.rb', line 60

def with_response(response_handle, &block)
  raise "Could not find response_handle" unless response_handle
  yield response_handle
end