Class: Qwirk::Adapter::JMS::Worker
- Inherits:
-
Object
- Object
- Qwirk::Adapter::JMS::Worker
- Defined in:
- lib/qwirk/adapter/jms/worker.rb
Instance Method Summary collapse
- #acknowledge_message(msg) ⇒ Object
- #handle_failure(message, exception, fail_queue_name) ⇒ Object
-
#initialize(worker_config) ⇒ Worker
constructor
A new instance of Worker.
- #message_to_object(msg) ⇒ Object
- #ready_to_stop? ⇒ Boolean
- #receive_message ⇒ Object
- #send_exception(original_message, e) ⇒ Object
- #send_response(original_message, marshaled_object) ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(worker_config) ⇒ Worker
Returns a new instance of Worker.
6 7 8 9 10 11 12 13 14 |
# File 'lib/qwirk/adapter/jms/worker.rb', line 6 def initialize(worker_config) @worker_config = worker_config @name = worker_config.name @marshaler = worker_config.marshaler @marshal_sym = worker_config.marshal_sym @session = worker_config.connection.create_session @consumer = @session.consumer(worker_config.destination) @session.start end |
Instance Method Details
#acknowledge_message(msg) ⇒ Object
23 24 25 |
# File 'lib/qwirk/adapter/jms/worker.rb', line 23 def (msg) msg.acknowledge end |
#handle_failure(message, exception, fail_queue_name) ⇒ Object
43 44 45 46 47 48 49 |
# File 'lib/qwirk/adapter/jms/worker.rb', line 43 def handle_failure(, exception, fail_queue_name) @session.producer(:queue_name => fail_queue_name) do |producer| # TODO: Can't add attribute to read-only message? #message['qwirk:exception'] = Qwirk::RemoteException.new(e).to_hash.to_yaml producer.send() end end |
#message_to_object(msg) ⇒ Object
38 39 40 41 |
# File 'lib/qwirk/adapter/jms/worker.rb', line 38 def (msg) marshaler = Qwirk::MarshalStrategy.find(msg['qwirk:marshal'] || :ruby) return marshaler.unmarshal(msg.data) end |
#ready_to_stop? ⇒ Boolean
60 61 62 |
# File 'lib/qwirk/adapter/jms/worker.rb', line 60 def ready_to_stop? true end |
#receive_message ⇒ Object
16 17 18 19 20 21 |
# File 'lib/qwirk/adapter/jms/worker.rb', line 16 def @consumer.receive rescue Exception => e Qwirk.logger.warn "Error during receive: #{e.}" return nil end |
#send_exception(original_message, e) ⇒ Object
31 32 33 34 35 36 |
# File 'lib/qwirk/adapter/jms/worker.rb', line 31 def send_exception(, e) @string_marshaler ||= MarshalStrategy.find(:string) do_send_response(:string, @string_marshaler, , "Exception: #{e.}") do || ['qwirk:exception'] = Qwirk::RemoteException.new(e).marshal end end |
#send_response(original_message, marshaled_object) ⇒ Object
27 28 29 |
# File 'lib/qwirk/adapter/jms/worker.rb', line 27 def send_response(, marshaled_object) do_send_response(@marshal_sym, @marshaler, , marshaled_object) end |
#stop ⇒ Object
51 52 53 54 55 56 57 58 |
# File 'lib/qwirk/adapter/jms/worker.rb', line 51 def stop return if @stopped Qwirk.logger.info "Stopping JMS worker #{@name}" # Don't clobber the session before a reply @consumer.close if @consumer @session.close if @session @stopped = true end |