Class: Pique::Agent
Overview
Collect jobs from Sidekiq and send them to api.monit.run
Class Method Summary collapse
Instance Method Summary collapse
- #can_run? ⇒ Boolean
- #dump ⇒ Object
- #push(worker, job) ⇒ Object
- #report ⇒ Object
- #run ⇒ Object
- #run_loop ⇒ Object
- #running? ⇒ Boolean
Class Method Details
.run! ⇒ Object
8 9 10 11 |
# File 'lib/pique/agent.rb', line 8 def self.run! instance.run unless instance.running? instance end |
Instance Method Details
#can_run? ⇒ Boolean
25 26 27 |
# File 'lib/pique/agent.rb', line 25 def can_run? ENV['PIQUE_API_URL'].present? end |
#dump ⇒ Object
59 60 61 62 63 |
# File 'lib/pique/agent.rb', line 59 def dump logger.info "Sidekiq Monit Dump Start" logger.info JSON.dump(@jobs) logger.info "Sidekiq Monit Dump End" end |
#push(worker, job) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/pique/agent.rb', line 33 def push(worker, job) @jobs << { retry: job['retry'], account_id: 1, worker: job['class'], job_id: job['jid'], batch_id: job['bid'], arguments: JSON.dump(job['args'] || []), queue_name: job['queue'], created_at: Time.at(job['created_at']).utc.iso8601(6), scheduled_at: job['at'] ? Time.at(job['at']).utc.iso8601(6) : nil, enqueued_at: job['enqueued_at'] ? Time.at(job['enqueued_at']).utc.iso8601(6) : nil, tags: job['tags'], timestamp: Time.now.utc.iso8601(6) } end |
#report ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/pique/agent.rb', line 65 def report jobs = @jobs.dup @jobs.clear return unless jobs.any? logger.info "Reporting #{jobs.count} jobs" request = Net::HTTP::Post.new(@uri.request_uri) request.body = jobs.map { |job| JSON.dump(job) }.join("\n") response = @http.request(request) logger.info "Reported #{jobs.count} jobs | Response: #{response.code} #{response.} | #{response.body}" end |
#run ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/pique/agent.rb', line 13 def run logger.info "Starting Pique Agent" return if running? @jobs = [] @running = true @uri = URI(ENV['PIQUE_API_URL']) @http = Net::HTTP.new(@uri.host, @uri.port) @http.use_ssl = true run_loop logger.info "Pique Agent Started" end |
#run_loop ⇒ Object
50 51 52 53 54 55 56 57 |
# File 'lib/pique/agent.rb', line 50 def run_loop @_thread = Thread.new do loop do report sleep 5 - (rand * 5) # Randomize sleep time end end end |
#running? ⇒ Boolean
29 30 31 |
# File 'lib/pique/agent.rb', line 29 def running? @running end |