Class: EstormMessageProcessor::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/estorm-message-processor/base.rb

Constant Summary collapse

@@mt_id =

MT id counter.

0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



7
8
9
# File 'lib/estorm-message-processor/base.rb', line 7

def channel
  @channel
end

#connObject (readonly)

Returns the value of attribute conn.



7
8
9
# File 'lib/estorm-message-processor/base.rb', line 7

def conn
  @conn
end

#consumerObject (readonly)

Returns the value of attribute consumer.



7
8
9
# File 'lib/estorm-message-processor/base.rb', line 7

def consumer
  @consumer
end

#queueObject (readonly)

Returns the value of attribute queue.



7
8
9
# File 'lib/estorm-message-processor/base.rb', line 7

def queue
  @queue
end

Class Method Details

.loggerObject



10
11
12
# File 'lib/estorm-message-processor/base.rb', line 10

def Base.logger
  @@logger
end

.logger=(logger) ⇒ Object



14
15
16
# File 'lib/estorm-message-processor/base.rb', line 14

def Base.logger=(logger)
  @@logger = logger
end

Instance Method Details

#loggerObject



18
19
20
# File 'lib/estorm-message-processor/base.rb', line 18

def logger
  @@logger
end

#queue_creation(config) ⇒ Object



69
70
71
72
73
74
75
76
77
# File 'lib/estorm-message-processor/base.rb', line 69

def queue_creation(config)
   setup_bunny_communications(config[:url],config[:connecturlflag],config[:queuename])
   #@consumer=EstormMessageProcessor::Consumer.new(@channel, @queue, config[:consumer_name], true, false, config)
   @consumer=EstormMessageProcessor::Consumer.new(@channel, @queue)

   @consumer.logger=logger
   raise "consumer creation problem" if @consumer==nil
   queue_mgmt(config)
end

#queue_mgmt(config) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/estorm-message-processor/base.rb', line 52

def queue_mgmt(config)
   msg= "[*] Waiting for messages in #{@queue.name}.  blocking is #{config[:blocking]}"
   logger.info msg
   count=0
   @channel.prefetch(1)  if config[:prefetch_one] # set quality of service to only delivery one message at a time....
   @msg_count,consumer_count = @consumer.queue_statistics  # just to get the stats before entering hte queue
#  @queue.subscribe(:block => config[:blocking]) do |delivery_info, properties, body|
  @consumer.target(@msg_count,config[:exit_when_done]) if config[:exit_when_done]
  @consumer.on_delivery() do |delivery_info, , payload|
     @consumer.process_messages(delivery_info,,payload)
     msg=   "ON DELIVERY: #{@consumer.count}: messages processed"
     logger.info msg
     @channel.close if @consumer.cancelled? 
    
    # @consumer.cancel if msg_count==0 && config[:exit_when_empty]
    end 
end

#setup_bunny_communications(url, flag, queuename) ⇒ Object



28
29
30
31
32
33
34
35
36
# File 'lib/estorm-message-processor/base.rb', line 28

def setup_bunny_communications(url,flag,queuename)
  @client=EstormMessageProcessor::Client.new
  @conn,@channel=@client.setup_bunny(url,flag)
  raise "connection problem with #{@client.inspect}" if @conn==nil
  @channel   = @conn.create_channel
  @queue   = @channel.queue(queuename)
  msg= "set up active MQ on #{queuename}"
  logger.info msg
end

#start(config) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/estorm-message-processor/base.rb', line 79

def start(config)
  msg= "Connecting to bunny environment #{config.inspect}"
  logger.info msg
  config[:exit_when_done]=false if  config[:exit_when_done]==nil
  queue_creation(config)
  
  # the block flag shuts down the thread. the timeout values says whether to unsubscriber
  #need to set ack to true to manage the qos parameter
 # retval= @queue.subscribe_with(@consumer,:ack => true, :block => config[:blocking], :timeout => config[:timeout])
 #  retval= @queue.subscribe_with(@consumer,:ack => true, :block => config[:blocking])
 retval ="[did not subscribe as msg count = 0]"
 retval= @queue.subscribe_with(@consumer, :block => config[:blocking])  if !config[:exit_when_done] or @msg_count >0
 # loop do   
      #should loop forever if blocking... otherwise needs  a loop
  #   sleep 1
 #  end
  msg= "Ending======about to tear_down_bunny [retval: #{retval}]...."
  logger.info msg
  tear_down_bunny
    
end

#tear_down_bunnyObject



38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/estorm-message-processor/base.rb', line 38

def tear_down_bunny
   if @conn!=nil && @conn.open? && @channel!=nil && @channel.open?
     sleep 1
     @consumer.cancel if @consumer!=nil && !@consumer.cancelled?
     sleep 1
   #  @queue.unsubscribe
   
   #  sleep 0.5
   end
   @conn.close if @conn != nil && @conn.open?
   msg= "closing bunny"
   logger.info msg
end