Class: CI::Queue::Redis::Base

Inherits:
Object
  • Object
show all
Includes:
Common
Defined in:
lib/ci/queue/redis/base.rb

Direct Known Subclasses

Supervisor, Worker

Constant Summary collapse

TEN_MINUTES =
60 * 10
CONNECTION_ERRORS =
[
  ::Redis::BaseConnectionError,
  ::SocketError, # https://github.com/redis/redis-rb/pull/631
].freeze

Instance Attribute Summary

Attributes included from Common

#config

Instance Method Summary collapse

Methods included from Common

#distributed?, #flaky?, #release!, #report_failure!, #report_success!, #rescue_connection_errors, #retrying?

Constructor Details

#initialize(redis_url, config) ⇒ Base

Returns a new instance of Base.



14
15
16
17
18
# File 'lib/ci/queue/redis/base.rb', line 14

def initialize(redis_url, config)
  @redis_url = redis_url
  @redis = ::Redis.new(url: redis_url)
  @config = config
end

Instance Method Details

#created_at=(timestamp) ⇒ Object



33
34
35
# File 'lib/ci/queue/redis/base.rb', line 33

def created_at=(timestamp)
  redis.setnx(key('created-at'), timestamp)
end

#exhausted?Boolean

Returns:

  • (Boolean)


20
21
22
# File 'lib/ci/queue/redis/base.rb', line 20

def exhausted?
  queue_initialized? && size == 0
end

#expired?Boolean

Returns:

  • (Boolean)


24
25
26
27
28
29
30
31
# File 'lib/ci/queue/redis/base.rb', line 24

def expired?
  if (created_at = redis.get(key('created-at')))
    (created_at.to_f + config.redis_ttl + TEN_MINUTES) < Time.now.to_f
  else
    # if there is no created at set anymore we assume queue is expired
    true
  end
end

#increment_test_failedObject



82
83
84
# File 'lib/ci/queue/redis/base.rb', line 82

def increment_test_failed
  redis.incr(key('test_failed_count'))
end

#max_test_failed?Boolean

Returns:

  • (Boolean)


90
91
92
93
94
# File 'lib/ci/queue/redis/base.rb', line 90

def max_test_failed?
  return false if config.max_test_failed.nil?

  test_failed >= config.max_test_failed
end

#progressObject



51
52
53
# File 'lib/ci/queue/redis/base.rb', line 51

def progress
  total - size
end

#queue_initialized?Boolean

Returns:

  • (Boolean)


71
72
73
74
75
76
# File 'lib/ci/queue/redis/base.rb', line 71

def queue_initialized?
  @queue_initialized ||= begin
    status = master_status
    status == 'ready' || status == 'finished'
  end
end

#queue_initializing?Boolean

Returns:

  • (Boolean)


78
79
80
# File 'lib/ci/queue/redis/base.rb', line 78

def queue_initializing?
  master_status == 'setup'
end

#sizeObject



37
38
39
40
41
42
# File 'lib/ci/queue/redis/base.rb', line 37

def size
  redis.multi do |transaction|
    transaction.llen(key('queue'))
    transaction.zcard(key('running'))
  end.inject(:+)
end

#test_failedObject



86
87
88
# File 'lib/ci/queue/redis/base.rb', line 86

def test_failed
  redis.get(key('test_failed_count')).to_i
end

#to_aObject



44
45
46
47
48
49
# File 'lib/ci/queue/redis/base.rb', line 44

def to_a
  redis.multi do |transaction|
    transaction.lrange(key('queue'), 0, -1)
    transaction.zrange(key('running'), 0, -1)
  end.flatten.reverse.map { |k| index.fetch(k) }
end

#wait_for_master(timeout: 30) ⇒ Object

Raises:



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/ci/queue/redis/base.rb', line 55

def wait_for_master(timeout: 30)
  return true if master?
  (timeout * 10 + 1).to_i.times do
    if queue_initialized?
      return true
    else
      sleep 0.1
    end
  end
  raise LostMaster, "The master worker is still `#{master_status}` after #{timeout} seconds waiting."
end

#workers_countObject



67
68
69
# File 'lib/ci/queue/redis/base.rb', line 67

def workers_count
  redis.scard(key('workers'))
end