Class: CI::Queue::Redis::Monitor
- Inherits:
-
Object
- Object
- CI::Queue::Redis::Monitor
- Defined in:
- lib/ci/queue/redis/monitor.rb
Constant Summary collapse
- DEV_SCRIPTS_ROOT =
::File.('../../../../../../redis', __FILE__)
- RELEASE_SCRIPTS_ROOT =
::File.('../../redis', __FILE__)
- HEADER =
'L'
- HEADER_SIZE =
[0].pack(HEADER).bytesize
Instance Method Summary collapse
- #eval_script(script, *args) ⇒ Object
-
#initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key) ⇒ Monitor
constructor
A new instance of Monitor.
- #load_script(script) ⇒ Object
- #monitor ⇒ Object
- #process_messages(io) ⇒ Object
- #process_tick!(id:) ⇒ Object
- #read_message(io) ⇒ Object
- #read_script(name) ⇒ Object
- #soft_signal(sig) ⇒ Object
- #wait_for_events(ios) ⇒ Object
Constructor Details
#initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key) ⇒ Monitor
Returns a new instance of Monitor.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/ci/queue/redis/monitor.rb', line 16 def initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key) @zset_key = zset_key @processed_key = processed_key @owners_key = owners_key @worker_queue_key = worker_queue_key @logger = logger @redis = ::Redis.new(url: redis_url, reconnect_attempts: [0, 0, 0.1, 0.5, 1, 3, 5]) @shutdown = false @pipe = pipe @self_pipe_reader, @self_pipe_writer = IO.pipe @self_pipe_writer.sync = true @queue = [] @deadlines = {} %i[TERM INT USR1].each do |sig| Signal.trap(sig) { soft_signal(sig) } end end |
Instance Method Details
#eval_script(script, *args) ⇒ Object
49 50 51 |
# File 'lib/ci/queue/redis/monitor.rb', line 49 def eval_script(script, *args) @redis.evalsha(load_script(script), *args) end |
#load_script(script) ⇒ Object
53 54 55 56 |
# File 'lib/ci/queue/redis/monitor.rb', line 53 def load_script(script) @scripts_cache ||= {} @scripts_cache[script] ||= @redis.script(:load, read_script(script)) end |
#monitor ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/ci/queue/redis/monitor.rb', line 105 def monitor @logger.debug("Starting monitor") ios = [@self_pipe_reader, @pipe] until @shutdown while (sig = @queue.shift) case sig when :INT, :TERM @logger.debug("Received #{sig}, exiting") @shutdown = 0 break else raise "Unknown signal: #{sig.inspect}" end end wait_for_events(ios) end @logger.debug('Done') @shutdown end |
#process_messages(io) ⇒ Object
79 80 81 82 83 84 85 |
# File 'lib/ci/queue/redis/monitor.rb', line 79 def (io) while ( = (io)) type, kwargs = kwargs.transform_keys!(&:to_sym) public_send("process_#{type}", **kwargs) end end |
#process_tick!(id:) ⇒ Object
39 40 41 42 43 44 45 46 47 |
# File 'lib/ci/queue/redis/monitor.rb', line 39 def process_tick!(id:) eval_script( :heartbeat, keys: [@zset_key, @processed_key, @owners_key, @worker_queue_key], argv: [Time.now.to_f, id] ) rescue => error @logger.info(error) end |
#read_message(io) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/ci/queue/redis/monitor.rb', line 66 def (io) case header = io.read_nonblock(HEADER_SIZE, exception: false) when :wait_readable nil when nil @logger.debug('Broken pipe, exiting') @shutdown = 0 false else JSON.parse(io.read(header.unpack1(HEADER))) end end |
#read_script(name) ⇒ Object
58 59 60 61 62 |
# File 'lib/ci/queue/redis/monitor.rb', line 58 def read_script(name) ::File.read(::File.join(DEV_SCRIPTS_ROOT, "#{name}.lua")) rescue SystemCallError ::File.read(::File.join(RELEASE_SCRIPTS_ROOT, "#{name}.lua")) end |
#soft_signal(sig) ⇒ Object
34 35 36 37 |
# File 'lib/ci/queue/redis/monitor.rb', line 34 def soft_signal(sig) @queue << sig @self_pipe_writer << '.' end |
#wait_for_events(ios) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/ci/queue/redis/monitor.rb', line 87 def wait_for_events(ios) return if @shutdown return unless (ready = IO.select(ios, nil, nil, 10)) ready[0].each do |io| case io when @self_pipe_reader io.read_nonblock(512, exception: false) # Just flush the pipe, the information is in the @queue when @pipe (@pipe) else @logger.debug("Unknown reader: #{io.inspect}") raise "Unknown reader: #{io.inspect}" end end end |