Class: Skiplock::Worker
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Skiplock::Worker
- Defined in:
- lib/skiplock/worker.rb
Class Method Summary collapse
Instance Method Summary collapse
Class Method Details
.cleanup(hostname = nil) ⇒ Object
6 7 8 9 10 11 12 13 |
# File 'lib/skiplock/worker.rb', line 6 def self.cleanup(hostname = nil) delete_ids = [] self.where(namespace: Skiplock.namespace, hostname: hostname || Socket.gethostname).each do |worker| sid = Process.getsid(worker.pid) rescue nil delete_ids << worker.id if worker.sid != sid || worker.updated_at < 10.minutes.ago end self.where(id: delete_ids).delete_all if delete_ids.count > 0 end |
.generate(capacity:, hostname:, master: true) ⇒ Object
15 16 17 18 19 |
# File 'lib/skiplock/worker.rb', line 15 def self.generate(capacity:, hostname:, master: true) worker = self.create!(pid: Process.pid, sid: Process.getsid(), master: master, hostname: hostname, capacity: capacity, namespace: Skiplock.namespace) rescue worker = self.create!(pid: Process.pid, sid: Process.getsid(), master: false, hostname: hostname, capacity: capacity, namespace: Skiplock.namespace) end |
Instance Method Details
#shutdown ⇒ Object
21 22 23 24 25 26 |
# File 'lib/skiplock/worker.rb', line 21 def shutdown @running = false @executor.shutdown @executor.kill unless @executor.wait_for_termination(@config[:graceful_shutdown]) Skiplock.logger.info "[Skiplock] Shutdown of #{self.master ? 'master' : 'cluster'} worker#{(' ' + @num.to_s) if @num > 0 && @config[:workers] > 2} (PID: #{self.pid}) was completed." end |
#start(worker_num: 0, **config) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/skiplock/worker.rb', line 28 def start(worker_num: 0, **config) @num = worker_num @config = config @pg_config = ActiveRecord::Base.connection.raw_connection.conninfo_hash.compact @namespace_query = Skiplock.namespace.nil? ? "namespace IS NULL" : "namespace = '#{Skiplock.namespace}'" @queues_order_query = @config[:queues].map { |q,v| "WHEN queue_name = '#{q}' THEN #{v}" }.join(' ') if @config[:queues].is_a?(Hash) && @config[:queues].count > 0 @running = true @map = ::PG::TypeMapByOid.new @map.add_coder(::PG::TextDecoder::Boolean.new(oid: 16, name: 'bool')) @map.add_coder(::PG::TextDecoder::Integer.new(oid: 20, name: 'int8')) @map.add_coder(::PG::TextDecoder::Integer.new(oid: 21, name: 'int2')) @map.add_coder(::PG::TextDecoder::Integer.new(oid: 23, name: 'int4')) @map.add_coder(::PG::TextDecoder::TimestampUtc.new(oid: 1114, name: 'timestamp')) @map.add_coder(::PG::TextDecoder::String.new(oid: 2950, name: 'uuid')) @map.add_coder(::PG::TextDecoder::JSON.new(oid: 3802, name: 'jsonb')) @executor = Concurrent::ThreadPoolExecutor.new(min_threads: @config[:min_threads] + 1, max_threads: @config[:max_threads] + 1, max_queue: @config[:max_threads] + 1, idletime: 60, auto_terminate: false, fallback_policy: :abort) @executor.post { run } if @config[:standalone] Process.setproctitle("skiplock: #{self.master ? 'master' : 'cluster'} worker#{(' ' + @num.to_s) if @num > 0 && @config[:workers] > 2} [#{Rails.application.class.name.deconstantize.downcase}:#{Rails.env}]") ActiveRecord::Base.connection.throw_away! end end |