Class: Tengine::Core::HeartbeatWatcher
- Inherits:
-
Object
- Object
- Tengine::Core::HeartbeatWatcher
- Extended by:
- MethodTraceable
- Defined in:
- lib/tengine/core/heartbeat_watcher.rb
Instance Method Summary collapse
-
#initialize(argv) ⇒ HeartbeatWatcher
constructor
A new instance of HeartbeatWatcher.
- #pid ⇒ Object
- #run(__file__) ⇒ Object
- #search_for_invalid_heartbeat ⇒ Object
- #send_invalidate_event(type, e0) ⇒ Object
- #send_last_event ⇒ Object
- #send_periodic_event ⇒ Object
- #sender ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #start_daemon(__file__) ⇒ Object
- #stop_daemon(__file__) ⇒ Object
Methods included from MethodTraceable
Constructor Details
#initialize(argv) ⇒ HeartbeatWatcher
Returns a new instance of HeartbeatWatcher.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/tengine/core/heartbeat_watcher.rb', line 21 def initialize argv @uuid = UUID.new.generate @config = Tengine::Core::Config::HeartbeatWatcher.parse argv @daemonize_options = { :app_name => 'tengine_heartbeat_watcher', :ARGV => [@config[:action]], :ontop => !@config[:process][:daemon], :multiple => true, :dir_mode => :normal, :dir => File.(@config[:process][:pid_dir]), } Tengine::Core::MethodTraceable.disabled = !@config[:verbose] rescue Exception puts "[#{$!.class.name}] #{$!.}\n " << $!.backtrace.join("\n ") raise end |
Instance Method Details
#pid ⇒ Object
38 39 40 |
# File 'lib/tengine/core/heartbeat_watcher.rb', line 38 def pid @pid ||= sprintf "process:%s/%d", ENV["MM_SERVER_NAME"], Process.pid end |
#run(__file__) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/tengine/core/heartbeat_watcher.rb', line 82 def run(__file__) case @config[:action].to_sym when :start start_daemon(__file__) when :stop stop_daemon(__file__) when :restart stop_daemon(__file__) start_daemon(__file__) end end |
#search_for_invalid_heartbeat ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/tengine/core/heartbeat_watcher.rb', line 69 def search_for_invalid_heartbeat t = Time.now a = @config[:heartbeat].to_hash.each_pair.map do |e, h| Tengine::Core::Event.where( :event_type_name => "#{e}.heartbeat.tengine", :occurred_at.lte => t - h[:expire] ) end a.flatten.each_next_tick do |i| yield i if i end end |
#send_invalidate_event(type, e0) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/tengine/core/heartbeat_watcher.rb', line 55 def send_invalidate_event type, e0 obj = e0.as_document.symbolize_keys Tengine.logger.info "Heartbeat expiration detected! for #{e0.event_type_name} of #{e0.source_name}: last seen #{e0.occurred_at} (#{(Time.now - e0.occurred_at).to_f} secs before)" obj.delete :_id obj.delete :confirmed obj.delete :updated_at obj.delete :created_at obj.delete :lock_version obj[:event_type_name] = type obj[:level] = Tengine::Event::LEVELS_INV[:error] e1 = Tengine::Event.new obj sender.fire e1, :keep_connection => true end |
#send_last_event ⇒ Object
46 47 48 49 |
# File 'lib/tengine/core/heartbeat_watcher.rb', line 46 def send_last_event sender.fire "finished.process.hbw.tengine", :key => @uuid, :source_name => pid, :sender_name => pid, :occurred_at => Time.now, :level_key => :info, :keep_connection => true sender.stop end |
#send_periodic_event ⇒ Object
51 52 53 |
# File 'lib/tengine/core/heartbeat_watcher.rb', line 51 def send_periodic_event sender.fire "hbw.heartbeat.tengine", :key => @uuid, :source_name => pid, :sender_name => pid, :occurred_at => Time.now, :level_key => :debug, :keep_connection => true, :retry_count => 0 end |
#sender ⇒ Object
42 43 44 |
# File 'lib/tengine/core/heartbeat_watcher.rb', line 42 def sender @sender ||= Tengine::Event::Sender.new Tengine::Mq::Suite.new(@config[:event_queue]) end |
#shutdown ⇒ Object
108 109 110 111 112 113 114 |
# File 'lib/tengine/core/heartbeat_watcher.rb', line 108 def shutdown EM.run do EM.cancel_timer @invalidate if @invalidate EM.cancel_timer @periodic if @periodic send_last_event end end |
#start ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/tengine/core/heartbeat_watcher.rb', line 116 def start @config.setup_loggers Mongoid.configure do |c| c.send :load_configuration, @config[:db] end require 'amqp' Mongoid.logger = AMQP::Session.logger = Tengine.logger EM.run do sender.wait_for_connection do @invalidate = EM.add_periodic_timer 1 do # !!! MAGIC NUMBER search_for_invalid_heartbeat do |obj| type = case obj.event_type_name when /job|core|hbw|resourcew|atd/ then "expired.#$&.heartbeat.tengine" end EM.next_tick do send_invalidate_event type, obj end end end int = @config[:heartbeat][:hbw][:interval].to_i if int and int > 0 @periodic = EM.add_periodic_timer int do send_periodic_event end end end end end |
#start_daemon(__file__) ⇒ Object
94 95 96 97 98 99 100 101 |
# File 'lib/tengine/core/heartbeat_watcher.rb', line 94 def start_daemon(__file__) pdir = File. @config[:process][:pid_dir] fname = File.basename __file__ cwd = Dir.getwd Daemons.run_proc(fname, @daemonize_options) do Dir.chdir(cwd) { self.start } end end |
#stop_daemon(__file__) ⇒ Object
103 104 105 106 |
# File 'lib/tengine/core/heartbeat_watcher.rb', line 103 def stop_daemon(__file__) fname = File.basename __file__ Daemons.run_proc(fname, @daemonize_options) end |