Class: Qwirk::Adapter::JMS::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/qwirk/adapter/jms/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(worker_config) ⇒ Worker

Returns a new instance of Worker.



6
7
8
9
10
11
12
13
14
# File 'lib/qwirk/adapter/jms/worker.rb', line 6

def initialize(worker_config)
  @worker_config = worker_config
  @name          = worker_config.name
  @marshaler     = worker_config.marshaler
  @marshal_sym   = worker_config.marshal_sym
  @session       = worker_config.connection.create_session
  @consumer      = @session.consumer(worker_config.destination)
  @session.start
end

Instance Method Details

#acknowledge_message(msg) ⇒ Object



23
24
25
# File 'lib/qwirk/adapter/jms/worker.rb', line 23

def acknowledge_message(msg)
  msg.acknowledge
end

#handle_failure(message, exception, fail_queue_name) ⇒ Object



43
44
45
46
47
48
49
# File 'lib/qwirk/adapter/jms/worker.rb', line 43

def handle_failure(message, exception, fail_queue_name)
  @session.producer(:queue_name => fail_queue_name) do |producer|
    # TODO: Can't add attribute to read-only message?
    #message['qwirk:exception'] = Qwirk::RemoteException.new(e).to_hash.to_yaml
    producer.send(message)
  end
end

#message_to_object(msg) ⇒ Object



38
39
40
41
# File 'lib/qwirk/adapter/jms/worker.rb', line 38

def message_to_object(msg)
  marshaler = Qwirk::MarshalStrategy.find(msg['qwirk:marshal'] || :ruby)
  return marshaler.unmarshal(msg.data)
end

#ready_to_stop?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/qwirk/adapter/jms/worker.rb', line 60

def ready_to_stop?
  true
end

#receive_messageObject



16
17
18
19
20
21
# File 'lib/qwirk/adapter/jms/worker.rb', line 16

def receive_message
  @consumer.receive
rescue Exception => e
  Qwirk.logger.warn "Error during receive: #{e.message}"
  return nil
end

#send_exception(original_message, e) ⇒ Object



31
32
33
34
35
36
# File 'lib/qwirk/adapter/jms/worker.rb', line 31

def send_exception(original_message, e)
  @string_marshaler ||= MarshalStrategy.find(:string)
  do_send_response(:string, @string_marshaler, original_message, "Exception: #{e.message}") do |reply_message|
    reply_message['qwirk:exception'] = Qwirk::RemoteException.new(e).marshal
  end
end

#send_response(original_message, marshaled_object) ⇒ Object



27
28
29
# File 'lib/qwirk/adapter/jms/worker.rb', line 27

def send_response(original_message, marshaled_object)
  do_send_response(@marshal_sym, @marshaler, original_message, marshaled_object)
end

#stopObject



51
52
53
54
55
56
57
58
# File 'lib/qwirk/adapter/jms/worker.rb', line 51

def stop
  return if @stopped
  Qwirk.logger.info "Stopping JMS worker #{@name}"
  # Don't clobber the session before a reply
  @consumer.close if @consumer
  @session.close if @session
  @stopped = true
end