Class: EstormMessageProcessor::Base
- Inherits:
-
Object
- Object
- EstormMessageProcessor::Base
- Defined in:
- lib/estorm-message-processor/base.rb
Constant Summary collapse
- @@mt_id =
MT id counter.
0
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#conn ⇒ Object
readonly
Returns the value of attribute conn.
-
#consumer ⇒ Object
readonly
Returns the value of attribute consumer.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Class Method Summary collapse
Instance Method Summary collapse
- #logger ⇒ Object
- #queue_creation(config) ⇒ Object
- #queue_mgmt(config) ⇒ Object
- #setup_bunny_communications(url, flag, queuename) ⇒ Object
- #start(config) ⇒ Object
- #tear_down_bunny ⇒ Object
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
7 8 9 |
# File 'lib/estorm-message-processor/base.rb', line 7 def channel @channel end |
#conn ⇒ Object (readonly)
Returns the value of attribute conn.
7 8 9 |
# File 'lib/estorm-message-processor/base.rb', line 7 def conn @conn end |
#consumer ⇒ Object (readonly)
Returns the value of attribute consumer.
7 8 9 |
# File 'lib/estorm-message-processor/base.rb', line 7 def consumer @consumer end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
7 8 9 |
# File 'lib/estorm-message-processor/base.rb', line 7 def queue @queue end |
Class Method Details
.logger ⇒ Object
10 11 12 |
# File 'lib/estorm-message-processor/base.rb', line 10 def Base.logger @@logger end |
.logger=(logger) ⇒ Object
14 15 16 |
# File 'lib/estorm-message-processor/base.rb', line 14 def Base.logger=(logger) @@logger = logger end |
Instance Method Details
#logger ⇒ Object
18 19 20 |
# File 'lib/estorm-message-processor/base.rb', line 18 def logger @@logger end |
#queue_creation(config) ⇒ Object
69 70 71 72 73 74 75 76 77 |
# File 'lib/estorm-message-processor/base.rb', line 69 def queue_creation(config) setup_bunny_communications(config[:url],config[:connecturlflag],config[:queuename]) #@consumer=EstormMessageProcessor::Consumer.new(@channel, @queue, config[:consumer_name], true, false, config) @consumer=EstormMessageProcessor::Consumer.new(@channel, @queue) @consumer.logger=logger raise "consumer creation problem" if @consumer==nil queue_mgmt(config) end |
#queue_mgmt(config) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/estorm-message-processor/base.rb', line 52 def queue_mgmt(config) msg= "[*] Waiting for messages in #{@queue.name}. blocking is #{config[:blocking]}" logger.info msg count=0 @channel.prefetch(1) if config[:prefetch_one] # set quality of service to only delivery one message at a time.... @msg_count,consumer_count = @consumer.queue_statistics # just to get the stats before entering hte queue # @queue.subscribe(:block => config[:blocking]) do |delivery_info, properties, body| @consumer.target(@msg_count,config[:exit_when_done]) if config[:exit_when_done] @consumer.on_delivery() do |delivery_info, , payload| @consumer.(delivery_info,,payload) msg= "ON DELIVERY: #{@consumer.count}: messages processed" logger.info msg @channel.close if @consumer.cancelled? # @consumer.cancel if msg_count==0 && config[:exit_when_empty] end end |
#setup_bunny_communications(url, flag, queuename) ⇒ Object
28 29 30 31 32 33 34 35 36 |
# File 'lib/estorm-message-processor/base.rb', line 28 def setup_bunny_communications(url,flag,queuename) @client=EstormMessageProcessor::Client.new @conn,@channel=@client.setup_bunny(url,flag) raise "connection problem with #{@client.inspect}" if @conn==nil @channel = @conn.create_channel @queue = @channel.queue(queuename) msg= "set up active MQ on #{queuename}" logger.info msg end |
#start(config) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/estorm-message-processor/base.rb', line 79 def start(config) msg= "Connecting to bunny environment #{config.inspect}" logger.info msg config[:exit_when_done]=false if config[:exit_when_done]==nil queue_creation(config) # the block flag shuts down the thread. the timeout values says whether to unsubscriber #need to set ack to true to manage the qos parameter # retval= @queue.subscribe_with(@consumer,:ack => true, :block => config[:blocking], :timeout => config[:timeout]) # retval= @queue.subscribe_with(@consumer,:ack => true, :block => config[:blocking]) retval ="[did not subscribe as msg count = 0]" retval= @queue.subscribe_with(@consumer, :block => config[:blocking]) if !config[:exit_when_done] or @msg_count >0 # loop do #should loop forever if blocking... otherwise needs a loop # sleep 1 # end msg= "Ending======about to tear_down_bunny [retval: #{retval}]...." logger.info msg tear_down_bunny end |
#tear_down_bunny ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/estorm-message-processor/base.rb', line 38 def tear_down_bunny if @conn!=nil && @conn.open? && @channel!=nil && @channel.open? sleep 1 @consumer.cancel if @consumer!=nil && !@consumer.cancelled? sleep 1 # @queue.unsubscribe # sleep 0.5 end @conn.close if @conn != nil && @conn.open? msg= "closing bunny" logger.info msg end |