Class: Tengine::Core::Scheduler
Instance Method Summary
collapse
method_trace
safemode, #update_in_safe_mode
Constructor Details
#initialize(argv) ⇒ Scheduler
Returns a new instance of Scheduler.
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/tengine/core/scheduler.rb', line 23
def initialize argv
@uuid = UUID.new.generate
@config = Tengine::Core::Config::Atd.parse argv
@daemonize_options = {
:app_name => 'tengine_atd',
:ARGV => [@config[:action]],
:ontop => !@config[:process][:daemon],
:multiple => true,
:dir_mode => :normal,
:dir => File.expand_path(@config[:process][:pid_dir]),
}
Tengine::Core::MethodTraceable.disabled = !@config[:verbose]
rescue Exception
puts "[#{$!.class.name}] #{$!.message}\n " << $!.backtrace.join("\n ")
raise
end
|
Instance Method Details
#mark_schedule_done(sched) ⇒ Object
#pid ⇒ Object
45
46
47
|
# File 'lib/tengine/core/scheduler.rb', line 45
def pid
@pid ||= sprintf "process:%s/%d", ENV["MM_SERVER_NAME"], Process.pid
end
|
#run(__file__) ⇒ Object
86
87
88
89
90
91
92
93
94
95
96
|
# File 'lib/tengine/core/scheduler.rb', line 86
def run(__file__)
case @config[:action].to_sym
when :start
start_daemon(__file__)
when :stop
stop_daemon(__file__)
when :restart
stop_daemon(__file__)
start_daemon(__file__)
end
end
|
#search_for_schedule ⇒ Object
77
78
79
80
81
82
83
84
|
# File 'lib/tengine/core/scheduler.rb', line 77
def search_for_schedule
Tengine::Core::Schedule.where(
:scheduled_at.lte => Time.now,
:status => Tengine::Core::Schedule::SCHEDULED
).each_next_tick do |i|
yield i
end
end
|
#send_last_event ⇒ Object
49
50
51
52
|
# File 'lib/tengine/core/scheduler.rb', line 49
def send_last_event
sender.fire "finished.process.atd.tengine", :key => @uuid, :source_name => pid, :sender_name => pid, :occurred_at => Time.now, :level_key => :info, :keep_connection => true
sender.stop
end
|
#send_periodic_event ⇒ Object
54
55
56
|
# File 'lib/tengine/core/scheduler.rb', line 54
def send_periodic_event
sender.fire "atd.heartbeat.tengine", :key => @uuid, :source_name => pid, :sender_name => pid, :occurred_at => Time.now, :level_key => :debug, :keep_connection => true, :retry_count => 0
end
|
#send_scheduled_event(sched) ⇒ Object
58
59
60
61
|
# File 'lib/tengine/core/scheduler.rb', line 58
def send_scheduled_event sched
Tengine.logger.info "Scheduled time (#{sched.scheduled_at}) has come. Now firing #{sched.event_type_name} for #{sched.source_name}"
sender.fire sched.event_type_name, :key => sched._id, :source_name => sched.source_name, :sender_name => pid, :occurred_at => Time.now, :level_key => :info, :keep_connection => true, :properties => sched.properties
end
|
#sender ⇒ Object
41
42
43
|
# File 'lib/tengine/core/scheduler.rb', line 41
def sender
@sender ||= Tengine::Event::Sender.new Tengine::Mq::Suite.new(@config[:event_queue])
end
|
#shutdown ⇒ Object
113
114
115
116
117
118
|
# File 'lib/tengine/core/scheduler.rb', line 113
def shutdown
EM.run do
EM.cancel_timer @periodic if @periodic
send_last_event
end
end
|
#start ⇒ Object
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
# File 'lib/tengine/core/scheduler.rb', line 120
def start
@config.setup_loggers
Mongoid.configure do |c|
c.send :load_configuration, @config[:db]
end
require 'amqp'
Mongoid.logger = AMQP::Session.logger = Tengine.logger
EM.run do
sender.wait_for_connection do
@invalidate = EM.add_periodic_timer 1 do search_for_schedule do |sched|
send_scheduled_event sched
mark_schedule_done sched
end
end
int = @config[:heartbeat][:atd][:interval].to_i
if int and int > 0
@periodic = EM.add_periodic_timer int do
send_periodic_event
end
end
end
end
end
|
#start_daemon(__file__) ⇒ Object
98
99
100
101
102
103
104
105
106
|
# File 'lib/tengine/core/scheduler.rb', line 98
def start_daemon(__file__)
pdir = File.expand_path @config[:process][:pid_dir]
fname = File.basename __file__
cwd = Dir.getwd
Daemons.run_proc(fname, @daemonize_options) do
Dir.chdir(cwd) { self.start }
end
end
|
#stop_daemon(__file__) ⇒ Object
108
109
110
111
|
# File 'lib/tengine/core/scheduler.rb', line 108
def stop_daemon(__file__)
fname = File.basename __file__
Daemons.run_proc(fname, @daemonize_options)
end
|