Class: Qwirk::Adapter::InMemory::Worker
- Inherits:
-
Object
- Object
- Qwirk::Adapter::InMemory::Worker
- Defined in:
- lib/qwirk/adapter/in_memory/worker.rb
Instance Attribute Summary collapse
-
#stopped ⇒ Object
readonly
Returns the value of attribute stopped.
Instance Method Summary collapse
- #acknowledge_message(message) ⇒ Object
- #handle_failure(message, exception, fail_queue_name) ⇒ Object
-
#initialize(name, marshaler, queue) ⇒ Worker
constructor
A new instance of Worker.
- #message_to_object(msg) ⇒ Object
-
#ready_to_stop? ⇒ Boolean
If the worker_config has been commanded to stop, workers will continue processing messages until this returns true.
- #receive_message ⇒ Object
- #send_exception(original_message, e) ⇒ Object
- #send_response(original_message, marshaled_object) ⇒ Object
- #stop ⇒ Object
- #to_s ⇒ Object
Constructor Details
#initialize(name, marshaler, queue) ⇒ Worker
Returns a new instance of Worker.
8 9 10 11 12 |
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 8 def initialize(name, marshaler, queue) @name = name @marshaler = marshaler @queue = queue end |
Instance Attribute Details
#stopped ⇒ Object (readonly)
Returns the value of attribute stopped.
6 7 8 |
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 6 def stopped @stopped end |
Instance Method Details
#acknowledge_message(message) ⇒ Object
19 20 |
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 19 def () end |
#handle_failure(message, exception, fail_queue_name) ⇒ Object
37 38 39 40 |
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 37 def handle_failure(, exception, fail_queue_name) # TODO: Mode for persisting to flat file? Qwirk.logger.warn("Dropping message that failed: #{}") end |
#message_to_object(msg) ⇒ Object
32 33 34 35 |
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 32 def (msg) # The publisher has already unmarshaled the object to save hassle here. return msg end |
#ready_to_stop? ⇒ Boolean
If the worker_config has been commanded to stop, workers will continue processing messages until this returns true
50 51 52 |
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 50 def ready_to_stop? @queue.stopped? end |
#receive_message ⇒ Object
14 15 16 17 |
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 14 def , @reply_queue = @queue.read(self) return end |
#send_exception(original_message, e) ⇒ Object
27 28 29 30 |
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 27 def send_exception(, e) # TODO: I think exceptions should be recreated fully so no need for marshal/unmarshal? do_send_response(, Qwirk::RemoteException.new(e)) end |
#send_response(original_message, marshaled_object) ⇒ Object
22 23 24 25 |
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 22 def send_response(, marshaled_object) # We unmarshal so our workers get consistent messages regardless of the adapter do_send_response(, @marshaler.unmarshal(marshaled_object)) end |
#stop ⇒ Object
42 43 44 45 46 47 |
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 42 def stop return if @stopped @stopped = true Qwirk.logger.debug { "Stopping #{self}" } @queue.interrupt_read end |
#to_s ⇒ Object
54 55 56 |
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 54 def to_s "#{@name} (InMemory)" end |