Module: Quredis

Defined in:
lib/quredis.rb,
lib/quredis/cli.rb,
lib/quredis/web.rb,
lib/quredis/admin.rb,
lib/quredis/version.rb

Defined Under Namespace

Classes: Admin, CLI, Web

Constant Summary collapse

Version =
VERSION = '0.5.2'

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#escapeObject (readonly)

Returns the value of attribute escape.



6
7
8
# File 'lib/quredis.rb', line 6

def escape
  @escape
end

#ingressObject (readonly)

Returns the value of attribute ingress.



6
7
8
# File 'lib/quredis.rb', line 6

def ingress
  @ingress
end

#loggerObject (readonly)

Returns the value of attribute logger.



8
9
10
# File 'lib/quredis.rb', line 8

def logger
  @logger
end

#nameObject (readonly)

Returns the value of attribute name.



6
7
8
# File 'lib/quredis.rb', line 6

def name
  @name
end

#redisObject (readonly)

Returns the value of attribute redis.



8
9
10
# File 'lib/quredis.rb', line 8

def redis
  @redis
end

#transitObject (readonly)

Returns the value of attribute transit.



6
7
8
# File 'lib/quredis.rb', line 6

def transit
  @transit
end

Instance Method Details

#hostObject



28
29
30
# File 'lib/quredis.rb', line 28

def host
  `hostname`.strip
end

#pidObject



32
33
34
# File 'lib/quredis.rb', line 32

def pid
  Process.pid
end

#pipeObject



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/quredis.rb', line 94

def pipe
  element = nil
  begin
    element = redis.brpoplpush(ingress, transit, @redis_timeout || 0)
    if element
      begin
        logger.debug("dequeued #{element}")
        @block.call element
      rescue StandardError, Timeout::Error => e
        logger.error e
        replies = redis.multi do |multi|
          multi.lrem(transit, -1, element)
          multi.lpush(escape, element)
        end
        element = nil if replies
      ensure
        redis.lrem(transit, -1, element) if element
      end
    else
      on_timeout if respond_to? :on_timeout
    end
  rescue Errno::EINTR => e
    raise e
  rescue StandardError, Timeout::Error => e
    logger.error e
  end
end

#quredis(name, options = {}, &block) ⇒ Object



10
11
12
13
14
15
16
17
18
# File 'lib/quredis.rb', line 10

def quredis(name, options = {}, &block)
  @name = name
  @ingress = options.fetch(:ingress, "ingress:#{name}")
  @transit = options.fetch(:transit, "transit:#{name}")
  @escape =  options.fetch(:escape, "escape:#{name}")
  @block = block
  @redis_timeout = options.fetch(:redis_timeout, 0)
  @enable_worker_info = options.fetch(:enable_worker_info, false)
end

#recoverObject

Not intended to run if we are running clustered service



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/quredis.rb', line 77

def recover
  loop do
    element = redis.rpoplpush(transit, escape)
    if element
      begin
        logger.debug("recovering #{element}")
        @block.call element
        redis.lrem(escape, -1, element)
      rescue Exception => e
        logger.error e
      end
    else
      break
    end
  end
end

#registerObject



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/quredis.rb', line 40

def register
  redis.multi do |multi|
    multi.set("quredis:queue:#{name}", {
      :name => name,
      :ingress => ingress,
      :transit => transit,
      :escape => escape
    }.to_json)
    multi.zadd('quredis:queues', Time.now.to_i, name)
      if @enable_worker_info
      multi.hmset("quredis:worker:#{worker_id}", *{
        :worker_id => worker_id,
        :host => host,
        :pid => pid,
        :messages => 0,
        :errors => 0,
        :start_time => Time.now.to_i
      }.to_a.flatten)
      multi.sadd("quredis:queue:#{@name}:workers", worker_id)
      multi.zadd('quredis:workers', Time.now.to_i, worker_id)
    end
  end
end

#startObject



64
65
66
67
68
69
70
71
72
73
74
# File 'lib/quredis.rb', line 64

def start
  connect unless redis
  register
  recover
  logger.info "Quredis starting..."
  logger.info "#{name} Queues: ingress -> #{ingress} transit -> #{transit} escape -> #{escape}"
  loop do
    connect unless redis
    pipe
  end
end

#worker_idObject



36
37
38
# File 'lib/quredis.rb', line 36

def worker_id
  @worker_id ||= "#{host}:#{pid}:#{@name}"
end