Class: Pique::Agent

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/pique/agent.rb

Overview

Collect jobs from Sidekiq and send them to api.monit.run

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.run!Object



8
9
10
11
# File 'lib/pique/agent.rb', line 8

def self.run!
  instance.run unless instance.running?
  instance
end

Instance Method Details

#can_run?Boolean

Returns:

  • (Boolean)


25
26
27
# File 'lib/pique/agent.rb', line 25

def can_run?
  ENV['PIQUE_API_URL'].present?
end

#dumpObject



59
60
61
62
63
# File 'lib/pique/agent.rb', line 59

def dump
  logger.info "Sidekiq Monit Dump Start"
  logger.info JSON.dump(@jobs)
  logger.info "Sidekiq Monit Dump End"
end

#push(worker, job) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/pique/agent.rb', line 33

def push(worker, job)
  @jobs << {
    retry: job['retry'],
    account_id: 1,
    worker: job['class'],
    job_id: job['jid'],
    batch_id: job['bid'],
    arguments: JSON.dump(job['args'] || []),
    queue_name: job['queue'],
    created_at: Time.at(job['created_at']).utc.iso8601(6),
    scheduled_at: job['at'] ? Time.at(job['at']).utc.iso8601(6) : nil,
    enqueued_at: job['enqueued_at'] ? Time.at(job['enqueued_at']).utc.iso8601(6) : nil,
    tags: job['tags'],
    timestamp: Time.now.utc.iso8601(6)
  }
end

#reportObject



65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/pique/agent.rb', line 65

def report
  jobs = @jobs.dup
  @jobs.clear
  return unless jobs.any?

  logger.info "Reporting #{jobs.count} jobs"

  request = Net::HTTP::Post.new(@uri.request_uri)
  request.body = jobs.map { |job| JSON.dump(job) }.join("\n")

  response = @http.request(request)
  logger.info "Reported #{jobs.count} jobs | Response: #{response.code} #{response.message} | #{response.body}"
end

#runObject



13
14
15
16
17
18
19
20
21
22
23
# File 'lib/pique/agent.rb', line 13

def run
  logger.info "Starting Pique Agent"
  return if running?
  @jobs = []
  @running = true
  @uri = URI(ENV['PIQUE_API_URL'])
  @http = Net::HTTP.new(@uri.host, @uri.port)
  @http.use_ssl = true
  run_loop
  logger.info "Pique Agent Started"
end

#run_loopObject



50
51
52
53
54
55
56
57
# File 'lib/pique/agent.rb', line 50

def run_loop
  @_thread = Thread.new do
    loop do
      report
      sleep 5 - (rand * 5) # Randomize sleep time
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/pique/agent.rb', line 29

def running?
  @running
end