Class: LittleMonster::Core::Runner
- Inherits:
-
Object
- Object
- LittleMonster::Core::Runner
- Includes:
- Loggable
- Defined in:
- lib/little_monster/core/runner.rb
Instance Method Summary collapse
-
#initialize(params) ⇒ Runner
constructor
A new instance of Runner.
- #run ⇒ Object
- #send_heartbeat! ⇒ Object
Methods included from Loggable
Constructor Details
#initialize(params) ⇒ Runner
Returns a new instance of Runner.
5 6 7 8 9 10 11 12 |
# File 'lib/little_monster/core/runner.rb', line 5 def initialize(params) @params = params @thread_id = Thread.current.object_id @heartbeat_task = Concurrent::TimerTask.new(execution_interval: LittleMonster.heartbeat_execution_interval) do send_heartbeat! end end |
Instance Method Details
#run ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/little_monster/core/runner.rb', line 14 def run send_heartbeat! @heartbeat_task.execute unless LittleMonster.disable_requests? job = LittleMonster::Job::Factory.new(@params).build job.run unless job.nil? rescue JobNotFoundError => e logger.error "[id:#{@params[:id]}][type:job_not_found] [message:#{e..dump}] \n #{e.backtrace.to_a.join("\n\t")}" ensure @heartbeat_task.shutdown end |
#send_heartbeat! ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/little_monster/core/runner.rb', line 27 def send_heartbeat! return if LittleMonster.disable_requests? res = LittleMonster::API.put "/jobs/#{@params[:id]}/worker", body: { ip: Socket.gethostname, host: Socket.gethostname, pid: "#{Process.pid}-#{@thread_id}" } raise LittleMonster::JobAlreadyLockedError, "job [id:#{@params[:id]}] is already locked, discarding" if res.code == 401 res.success? end |