Module: Quredis
- Defined in:
- lib/quredis.rb,
lib/quredis/cli.rb,
lib/quredis/web.rb,
lib/quredis/admin.rb,
lib/quredis/version.rb
Defined Under Namespace
Constant Summary collapse
- Version =
VERSION = '0.5.2'
Instance Attribute Summary collapse
-
#escape ⇒ Object
readonly
Returns the value of attribute escape.
-
#ingress ⇒ Object
readonly
Returns the value of attribute ingress.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
-
#transit ⇒ Object
readonly
Returns the value of attribute transit.
Instance Method Summary collapse
- #host ⇒ Object
- #pid ⇒ Object
- #pipe ⇒ Object
- #quredis(name, options = {}, &block) ⇒ Object
-
#recover ⇒ Object
Not intended to run if we are running clustered service.
- #register ⇒ Object
- #start ⇒ Object
- #worker_id ⇒ Object
Instance Attribute Details
#escape ⇒ Object (readonly)
Returns the value of attribute escape.
6 7 8 |
# File 'lib/quredis.rb', line 6 def escape @escape end |
#ingress ⇒ Object (readonly)
Returns the value of attribute ingress.
6 7 8 |
# File 'lib/quredis.rb', line 6 def ingress @ingress end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
8 9 10 |
# File 'lib/quredis.rb', line 8 def logger @logger end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
6 7 8 |
# File 'lib/quredis.rb', line 6 def name @name end |
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
8 9 10 |
# File 'lib/quredis.rb', line 8 def redis @redis end |
#transit ⇒ Object (readonly)
Returns the value of attribute transit.
6 7 8 |
# File 'lib/quredis.rb', line 6 def transit @transit end |
Instance Method Details
#host ⇒ Object
28 29 30 |
# File 'lib/quredis.rb', line 28 def host `hostname`.strip end |
#pid ⇒ Object
32 33 34 |
# File 'lib/quredis.rb', line 32 def pid Process.pid end |
#pipe ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/quredis.rb', line 94 def pipe element = nil begin element = redis.brpoplpush(ingress, transit, @redis_timeout || 0) if element begin logger.debug("dequeued #{element}") @block.call element rescue StandardError, Timeout::Error => e logger.error e replies = redis.multi do |multi| multi.lrem(transit, -1, element) multi.lpush(escape, element) end element = nil if replies ensure redis.lrem(transit, -1, element) if element end else on_timeout if respond_to? :on_timeout end rescue Errno::EINTR => e raise e rescue StandardError, Timeout::Error => e logger.error e end end |
#quredis(name, options = {}, &block) ⇒ Object
10 11 12 13 14 15 16 17 18 |
# File 'lib/quredis.rb', line 10 def quredis(name, = {}, &block) @name = name @ingress = .fetch(:ingress, "ingress:#{name}") @transit = .fetch(:transit, "transit:#{name}") @escape = .fetch(:escape, "escape:#{name}") @block = block @redis_timeout = .fetch(:redis_timeout, 0) @enable_worker_info = .fetch(:enable_worker_info, false) end |
#recover ⇒ Object
Not intended to run if we are running clustered service
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/quredis.rb', line 77 def recover loop do element = redis.rpoplpush(transit, escape) if element begin logger.debug("recovering #{element}") @block.call element redis.lrem(escape, -1, element) rescue Exception => e logger.error e end else break end end end |
#register ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/quredis.rb', line 40 def register redis.multi do |multi| multi.set("quredis:queue:#{name}", { :name => name, :ingress => ingress, :transit => transit, :escape => escape }.to_json) multi.zadd('quredis:queues', Time.now.to_i, name) if @enable_worker_info multi.hmset("quredis:worker:#{worker_id}", *{ :worker_id => worker_id, :host => host, :pid => pid, :messages => 0, :errors => 0, :start_time => Time.now.to_i }.to_a.flatten) multi.sadd("quredis:queue:#{@name}:workers", worker_id) multi.zadd('quredis:workers', Time.now.to_i, worker_id) end end end |
#start ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/quredis.rb', line 64 def start connect unless redis register recover logger.info "Quredis starting..." logger.info "#{name} Queues: ingress -> #{ingress} transit -> #{transit} escape -> #{escape}" loop do connect unless redis pipe end end |
#worker_id ⇒ Object
36 37 38 |
# File 'lib/quredis.rb', line 36 def worker_id @worker_id ||= "#{host}:#{pid}:#{@name}" end |