Class: Cron::Server

Inherits:
Object
  • Object
show all
Includes:
StandardModel
Defined in:
lib/app/jobs/cron/server.rb

Overview

Handle the coordination of which server should be running the cron jobs

Constant Summary collapse

STATE_PRIMARY =

Constants

'primary'
STATE_SECONDARY =
'secondary'
ALL_STATES =
[STATE_PRIMARY, STATE_SECONDARY].freeze

Class Method Summary collapse

Instance Method Summary collapse

Methods included from StandardModel

#audit_action, #auto_strip_attributes, #capture_user_info, #clear_cache, #created_by_display_name, #delete_and_log, #destroy_and_log, included, #last_modified_by_display_name, #log_change, #log_deletion, #remove_blank_secure_fields, #save_and_log, #save_and_log!, #secure_fields, #update, #update!, #update_and_log, #update_and_log!

Methods included from App47Logger

clean_params, #clean_params, delete_parameter_keys, #log_controller_error, log_debug, #log_debug, log_error, #log_error, log_exception, #log_message, log_message, #log_warn, log_warn, mask_parameter_keys, #update_flash_messages

Class Method Details

.find_or_create_serverObject

Find a record for this server



76
77
78
# File 'lib/app/jobs/cron/server.rb', line 76

def self.find_or_create_server
  Cron::Server.find_or_create_by!(host_name: Socket.gethostname, pid: Process.pid)
end

.primary_serverObject

Find a the current master



83
84
85
# File 'lib/app/jobs/cron/server.rb', line 83

def self.primary_server
  Cron::Server.where(state: STATE_PRIMARY).first
end

.warm_up_serverObject

Warm up a server on the next evaluation



90
91
92
93
94
# File 'lib/app/jobs/cron/server.rb', line 90

def self.warm_up_server
  return unless SystemConfiguration.aws_auto_scaling_configured?

  primary_server.auto_scale([primary_server.desired_server_count + 1, 10].min)
end

Instance Method Details

#active_countObject

Returns the count of active servers



275
276
277
# File 'lib/app/jobs/cron/server.rb', line 275

def active_count
  current_server_count
end

#alive?Boolean

Return true if I’ve reported in the last two minutes

Returns:

  • (Boolean)


139
140
141
# File 'lib/app/jobs/cron/server.rb', line 139

def alive?
  last_check_in_at >= 90.seconds.ago.utc
end

#auto_scale(desired_count = 0) ⇒ Object

Sets the desired and minimum number of EC2 instances to run



252
253
254
255
256
257
258
259
260
# File 'lib/app/jobs/cron/server.rb', line 252

def auto_scale(desired_count = 0)
  set desired_server_count: desired_count
  # Make sure we don't remove any workers with assigned jobs by accident
  return if desired_count.positive? && desired_count <= current_desired_capacity

  client.update_auto_scaling_group(auto_scaling_group_name: sys_config.aws_auto_scaling_group_name,
                                   min_size: desired_count,
                                   desired_capacity: desired_count)
end

#auto_scaling_groupObject

Returns the AutoScalingGroup associated with the account



186
187
188
189
# File 'lib/app/jobs/cron/server.rb', line 186

def auto_scaling_group
  filter = { auto_scaling_group_names: [sys_config.auto_scaling_group_name] }
  @auto_scaling_group ||= client.describe_auto_scaling_groups(filter).auto_scaling_groups.first
end

#become_primaryObject

Become primary, making others secondary



99
100
101
102
103
104
105
106
107
108
109
# File 'lib/app/jobs/cron/server.rb', line 99

def become_primary
  Cron::Server.each(&:become_secondary)
  # sleep a small amount of time to randomize a new primary
  sleep rand(1..15)
  # Check to see if another node already became primary
  primary = Cron::Server.primary_server
  return if primary.present? && primary.alive?

  # no one else is in, so become primary
  update! state: STATE_PRIMARY, last_check_in_at: Time.now.utc
end

#become_secondary(user = nil) ⇒ Object

Become secondary node



114
115
116
117
118
119
120
# File 'lib/app/jobs/cron/server.rb', line 114

def become_secondary(user = nil)
  if user.present?
    update_and_log! user, state: STATE_SECONDARY
  else
    update! state: STATE_SECONDARY
  end
end

#check_auto_scaleObject

Auto scale environment



160
161
162
163
164
165
166
167
168
# File 'lib/app/jobs/cron/server.rb', line 160

def check_auto_scale
  return unless SystemConfiguration.aws_auto_scaling_configured?

  if delayed_jobs_count.eql?(0)
    handle_zero_job_count
  else
    handle_auto_scale_jobs
  end
end

#check_inObject

Perform a check in for the server



153
154
155
# File 'lib/app/jobs/cron/server.rb', line 153

def check_in
  set last_check_in_at: Time.now.utc
end

#clientObject

Returns the AWS AutoScaling Client



173
174
175
176
177
# File 'lib/app/jobs/cron/server.rb', line 173

def client
  @client ||= Aws::AutoScaling::Client.new(access_key_id: sys_config.aws_access_key_id,
                                           secret_access_key: sys_config.aws_secret_access_key,
                                           region: sys_config.aws_region)
end

#current_desired_capacityObject

Returns the current value of ‘desired capacity’ for the AutoScalingGroup



201
202
203
204
205
206
207
# File 'lib/app/jobs/cron/server.rb', line 201

def current_desired_capacity
  current = auto_scaling_group.desired_capacity
  set current_server_count: current
  current
rescue StandardError
  0
end

#dead?Boolean

Is the server dead, meaning is it not reporting within the last two minutes

Returns:

  • (Boolean)


146
147
148
# File 'lib/app/jobs/cron/server.rb', line 146

def dead?
  !alive?
end

#delayed_jobs_countObject

Returns a count of the Delayed Jobs in queue that have not failed



194
195
196
# File 'lib/app/jobs/cron/server.rb', line 194

def delayed_jobs_count
  @delayed_jobs_count ||= Delayed::Backend::Mongoid::Job.where(failed_at: nil).read(mode: :primary).count
end

#executeObject

Go through the logic once a minute



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/app/jobs/cron/server.rb', line 35

def execute
  if primary?
    run_cron_jobs
  else
    primary = Cron::Server.where(state: STATE_PRIMARY).first
    if primary.blank? || primary.dead?
      become_primary
      run_cron_jobs
    end
  end
  time_to_next_run
rescue StandardError => error
  App47Logger.log_error 'Unable to run cron server', error
  time_to_next_run
ensure
  check_in
end

#handle_auto_scale_jobsObject

Calls the ‘auto_scale’ method with a variable ‘desired_count’ based on how many jobs are running We don’t need any more workers if the job count is less than 1,000



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/app/jobs/cron/server.rb', line 222

def handle_auto_scale_jobs
  return if delayed_jobs_count < 50

  case delayed_jobs_count
  when 50..250
    auto_scale(1)
  when 251..500
    auto_scale(2)
  when 501..1_000
    auto_scale(3)
  when 1_001..2_000
    auto_scale(4)
  when 2_001..3_999
    auto_scale(4)
  when 4_000..7_999
    auto_scale(5)
  when 8_000..10_999
    auto_scale(5)
  when 11_000..13_999
    auto_scale(6)
  when 14_000..17_999
    auto_scale(6)
  else
    auto_scale(7)
  end
end

#handle_zero_job_countObject

Calls the ‘auto_scale’ method with a ‘desired_count’ of 0 unless the capacity is already at 0



212
213
214
215
216
# File 'lib/app/jobs/cron/server.rb', line 212

def handle_zero_job_count
  return if current_desired_capacity.eql?(0)

  auto_scale
end

#high_landerObject

Look to make sure there is only one primary



265
266
267
268
269
270
# File 'lib/app/jobs/cron/server.rb', line 265

def high_lander
  return if secondary? # Don't need to check if not primary

  primary = Cron::Server.where(state: STATE_PRIMARY).first
  errors.add(:state, 'there can only be one primary') unless primary.blank? || primary.eql?(self)
end

#inactive_countObject

Returns the count of inactive servers



282
283
284
# File 'lib/app/jobs/cron/server.rb', line 282

def inactive_count
  desired_server_count
end

#primary?Boolean

Am I the primary server

Returns:

  • (Boolean)


125
126
127
# File 'lib/app/jobs/cron/server.rb', line 125

def primary?
  alive? && STATE_PRIMARY.eql?(state)
end

#run_cron_jobsObject



53
54
55
56
# File 'lib/app/jobs/cron/server.rb', line 53

def run_cron_jobs
  run_jobs
  check_auto_scale
end

#run_jobsObject

Run all cron tab jobs



61
62
63
64
# File 'lib/app/jobs/cron/server.rb', line 61

def run_jobs
  now = Time.now.utc
  Cron::Tab.all.each { |tab| tab.run if tab.time_to_run?(now) }
end

#secondary?Boolean

Am I a secondary server

Returns:

  • (Boolean)


132
133
134
# File 'lib/app/jobs/cron/server.rb', line 132

def secondary?
  STATE_SECONDARY.eql?(state)
end

#sys_configObject



179
180
181
# File 'lib/app/jobs/cron/server.rb', line 179

def sys_config
  @sys_config ||= SystemConfiguration.configuration
end

#time_to_next_runObject

Determine the next minute to run,



69
70
71
# File 'lib/app/jobs/cron/server.rb', line 69

def time_to_next_run
  60 - Time.now.utc.to_i % 60
end