Class: Tengine::Core::Scheduler

Inherits:
Object
  • Object
show all
Extended by:
MethodTraceable
Includes:
SafeUpdatable
Defined in:
lib/tengine/core/scheduler.rb

Instance Method Summary collapse

Methods included from MethodTraceable

method_trace

Methods included from SafeUpdatable

safemode, #update_in_safe_mode

Constructor Details

#initialize(argv) ⇒ Scheduler

Returns a new instance of Scheduler.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/tengine/core/scheduler.rb', line 23

def initialize argv
  @uuid = UUID.new.generate
  @config = Tengine::Core::Config::Atd.parse argv
  @daemonize_options = {
    :app_name => 'tengine_atd',
    :ARGV => [@config[:action]],
    :ontop => !@config[:process][:daemon],
    :multiple => true,
    :dir_mode => :normal,
    :dir => File.expand_path(@config[:process][:pid_dir]),
  }

  Tengine::Core::MethodTraceable.disabled = !@config[:verbose]
rescue Exception
  puts "[#{$!.class.name}] #{$!.message}\n  " << $!.backtrace.join("\n  ")
  raise
end

Instance Method Details

#mark_schedule_done(sched) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/tengine/core/scheduler.rb', line 63

def mark_schedule_done sched
  # 複数のマシンで複数のatdが複数動いている可能性があり、その場合には複数の
  # atdが同時に同じエントリに更新をかける可能性はとても高い。そのような状況
  # でもエラーになってはいけない。
  Tengine::Core::Schedule.with(
    safe: safemode(Tengine::Core::Schedule.collection)
  ).where(
    :_id => sched.id,
    :status => Tengine::Core::Schedule::SCHEDULED
  ).update_all(
    :status => Tengine::Core::Schedule::FIRED
  )
end

#pidObject



45
46
47
# File 'lib/tengine/core/scheduler.rb', line 45

def pid
  @pid ||= sprintf "process:%s/%d", ENV["MM_SERVER_NAME"], Process.pid
end

#run(__file__) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/tengine/core/scheduler.rb', line 86

def run(__file__)
  case @config[:action].to_sym
  when :start
    start_daemon(__file__)
  when :stop
    stop_daemon(__file__)
  when :restart
    stop_daemon(__file__)
    start_daemon(__file__)
  end
end

#search_for_scheduleObject



77
78
79
80
81
82
83
84
# File 'lib/tengine/core/scheduler.rb', line 77

def search_for_schedule
  Tengine::Core::Schedule.where(
    :scheduled_at.lte => Time.now,
    :status => Tengine::Core::Schedule::SCHEDULED
  ).each_next_tick do |i|
    yield i
  end
end

#send_last_eventObject



49
50
51
52
# File 'lib/tengine/core/scheduler.rb', line 49

def send_last_event
  sender.fire "finished.process.atd.tengine", :key => @uuid, :source_name => pid, :sender_name => pid, :occurred_at => Time.now, :level_key => :info, :keep_connection => true
  sender.stop
end

#send_periodic_eventObject



54
55
56
# File 'lib/tengine/core/scheduler.rb', line 54

def send_periodic_event
  sender.fire "atd.heartbeat.tengine", :key => @uuid, :source_name => pid, :sender_name => pid, :occurred_at => Time.now, :level_key => :debug, :keep_connection => true, :retry_count => 0
end

#send_scheduled_event(sched) ⇒ Object



58
59
60
61
# File 'lib/tengine/core/scheduler.rb', line 58

def send_scheduled_event sched
  Tengine.logger.info "Scheduled time (#{sched.scheduled_at}) has come.  Now firing #{sched.event_type_name} for #{sched.source_name}"
  sender.fire sched.event_type_name, :key => sched._id, :source_name => sched.source_name, :sender_name => pid, :occurred_at => Time.now, :level_key => :info, :keep_connection => true, :properties => sched.properties
end

#senderObject



41
42
43
# File 'lib/tengine/core/scheduler.rb', line 41

def sender
  @sender ||= Tengine::Event::Sender.new Tengine::Mq::Suite.new(@config[:event_queue])
end

#shutdownObject



113
114
115
116
117
118
# File 'lib/tengine/core/scheduler.rb', line 113

def shutdown
  EM.run do
    EM.cancel_timer @periodic if @periodic
    send_last_event
  end
end

#startObject



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/tengine/core/scheduler.rb', line 120

def start
  @config.setup_loggers

  Mongoid.configure do |c|
    c.send :load_configuration, @config[:db]
  end

  require 'amqp'
  Mongoid.logger = AMQP::Session.logger = Tengine.logger

  EM.run do
    sender.wait_for_connection do
      @invalidate = EM.add_periodic_timer 1 do # !!! MAGIC NUMBER
        search_for_schedule do |sched|
          send_scheduled_event sched
          mark_schedule_done sched
        end
      end
      int = @config[:heartbeat][:atd][:interval].to_i
      if int and int > 0
        @periodic = EM.add_periodic_timer int do
          send_periodic_event
        end
      end
    end
  end
end

#start_daemon(__file__) ⇒ Object



98
99
100
101
102
103
104
105
106
# File 'lib/tengine/core/scheduler.rb', line 98

def start_daemon(__file__)
  pdir = File.expand_path @config[:process][:pid_dir]
  fname = File.basename __file__
  cwd = Dir.getwd
  #    Daemons.run_proc(fname, :ARGV => [@config[:action]], :multiple => true, :ontop => !@config[:process][:daemon], :dir_mode => :normal, :dir => pdir) do
  Daemons.run_proc(fname, @daemonize_options) do
    Dir.chdir(cwd) { self.start }
  end
end

#stop_daemon(__file__) ⇒ Object



108
109
110
111
# File 'lib/tengine/core/scheduler.rb', line 108

def stop_daemon(__file__)
  fname = File.basename __file__
  Daemons.run_proc(fname, @daemonize_options)
end