Class: Skiplock::Worker

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/skiplock/worker.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.cleanup(hostname = nil) ⇒ Object



6
7
8
9
10
11
12
13
# File 'lib/skiplock/worker.rb', line 6

def self.cleanup(hostname = nil)
  delete_ids = []
  self.where(namespace: Skiplock.namespace, hostname: hostname || Socket.gethostname).each do |worker|
    sid = Process.getsid(worker.pid) rescue nil
    delete_ids << worker.id if worker.sid != sid || worker.updated_at < 10.minutes.ago
  end
  self.where(id: delete_ids).delete_all if delete_ids.count > 0
end

.generate(capacity:, hostname:, master: true) ⇒ Object



15
16
17
18
19
# File 'lib/skiplock/worker.rb', line 15

def self.generate(capacity:, hostname:, master: true)
  worker = self.create!(pid: Process.pid, sid: Process.getsid(), master: master, hostname: hostname, capacity: capacity, namespace: Skiplock.namespace)
rescue
  worker = self.create!(pid: Process.pid, sid: Process.getsid(), master: false, hostname: hostname, capacity: capacity, namespace: Skiplock.namespace)
end

Instance Method Details

#shutdownObject



21
22
23
24
25
26
# File 'lib/skiplock/worker.rb', line 21

def shutdown
  @running = false
  @executor.shutdown
  @executor.kill unless @executor.wait_for_termination(@config[:graceful_shutdown])
  Skiplock.logger.info "[Skiplock] Shutdown of #{self.master ? 'master' : 'cluster'} worker#{(' ' + @num.to_s) if @num > 0 && @config[:workers] > 2} (PID: #{self.pid}) was completed."
end

#start(worker_num: 0, **config) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/skiplock/worker.rb', line 28

def start(worker_num: 0, **config)
  @num = worker_num
  @config = config
  @pg_config = ActiveRecord::Base.connection.raw_connection.conninfo_hash.compact
  @namespace_query = Skiplock.namespace.nil? ? "namespace IS NULL" : "namespace = '#{Skiplock.namespace}'"
  @queues_order_query = @config[:queues].map { |q,v| "WHEN queue_name = '#{q}' THEN #{v}" }.join(' ') if @config[:queues].is_a?(Hash) && @config[:queues].count > 0
  @running = true
  @map = ::PG::TypeMapByOid.new
  @map.add_coder(::PG::TextDecoder::Boolean.new(oid: 16, name: 'bool'))
  @map.add_coder(::PG::TextDecoder::Integer.new(oid: 20, name: 'int8'))
  @map.add_coder(::PG::TextDecoder::Integer.new(oid: 21, name: 'int2'))
  @map.add_coder(::PG::TextDecoder::Integer.new(oid: 23, name: 'int4'))
  @map.add_coder(::PG::TextDecoder::TimestampUtc.new(oid: 1114, name: 'timestamp'))
  @map.add_coder(::PG::TextDecoder::String.new(oid: 2950, name: 'uuid'))
  @map.add_coder(::PG::TextDecoder::JSON.new(oid: 3802, name: 'jsonb'))
  @executor = Concurrent::ThreadPoolExecutor.new(min_threads: @config[:min_threads] + 1, max_threads: @config[:max_threads] + 1, max_queue: @config[:max_threads] + 1, idletime: 60, auto_terminate: false, fallback_policy: :abort)
  @executor.post { run }
  if @config[:standalone]
    Process.setproctitle("skiplock: #{self.master ? 'master' : 'cluster'} worker#{(' ' + @num.to_s) if @num > 0 && @config[:workers] > 2} [#{Rails.application.class.name.deconstantize.downcase}:#{Rails.env}]")
    ActiveRecord::Base.connection.throw_away!
  end
end