Class: CI::Queue::Redis::Worker
- Inherits:
-
Base
- Object
- Base
- CI::Queue::Redis::Worker
show all
- Defined in:
- lib/ci/queue/redis/worker.rb
Constant Summary
Constants inherited
from Base
Base::CONNECTION_ERRORS, Base::TEN_MINUTES
Constants included
from Common
Common::CONNECTION_ERRORS
Instance Attribute Summary collapse
Attributes included from Common
#config
Instance Method Summary
collapse
Methods inherited from Base
#boot_heartbeat_process!, #created_at=, #custom_config, #custom_middlewares, #ensure_heartbeat_thread_alive!, #exhausted?, #expired?, #increment_test_failed, #max_test_failed?, #progress, #queue_initialized?, #queue_initializing?, #reconnect_attempts, #remaining, #running, #size, #stop_heartbeat!, #test_failed, #to_a, #wait_for_master, #with_heartbeat, #workers_count
Methods included from Common
#flaky?, #report_failure!, #report_success!, #rescue_connection_errors
Constructor Details
#initialize(redis, config) ⇒ Worker
Returns a new instance of Worker.
18
19
20
21
22
|
# File 'lib/ci/queue/redis/worker.rb', line 18
def initialize(redis, config)
@reserved_test = nil
@shutdown_required = false
super(redis, config)
end
|
Instance Attribute Details
#total ⇒ Object
Returns the value of attribute total.
16
17
18
|
# File 'lib/ci/queue/redis/worker.rb', line 16
def total
@total
end
|
Instance Method Details
#acknowledge(test) ⇒ Object
102
103
104
105
106
107
108
109
110
|
# File 'lib/ci/queue/redis/worker.rb', line 102
def acknowledge(test)
test_key = test.id
raise_on_mismatching_test(test_key)
eval_script(
:acknowledge,
keys: [key('running'), key('processed'), key('owners')],
argv: [test_key],
) == 1
end
|
#build ⇒ Object
94
95
96
|
# File 'lib/ci/queue/redis/worker.rb', line 94
def build
@build ||= CI::Queue::Redis::BuildRecord.new(self, redis, config)
end
|
#distributed? ⇒ Boolean
24
25
26
|
# File 'lib/ci/queue/redis/worker.rb', line 24
def distributed?
true
end
|
#master? ⇒ Boolean
47
48
49
|
# File 'lib/ci/queue/redis/worker.rb', line 47
def master?
@master
end
|
#poll ⇒ Object
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
# File 'lib/ci/queue/redis/worker.rb', line 51
def poll
wait_for_master
until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed?
if test = reserve
yield index.fetch(test)
else
sleep 0.05
end
end
redis.pipelined do |pipeline|
pipeline.expire(key('worker', worker_id, 'queue'), config.redis_ttl)
pipeline.expire(key('processed'), config.redis_ttl)
end
rescue *CONNECTION_ERRORS
end
|
#populate(tests, random: Random.new) ⇒ Object
28
29
30
31
32
33
|
# File 'lib/ci/queue/redis/worker.rb', line 28
def populate(tests, random: Random.new)
@index = tests.map { |t| [t.id, t] }.to_h
tests = Queue.shuffle(tests, random)
push(tests.map(&:id))
self
end
|
#populated? ⇒ Boolean
35
36
37
|
# File 'lib/ci/queue/redis/worker.rb', line 35
def populated?
!!defined?(@index)
end
|
#release! ⇒ Object
134
135
136
137
138
139
140
141
|
# File 'lib/ci/queue/redis/worker.rb', line 134
def release!
eval_script(
:release,
keys: [key('running'), key('worker', worker_id, 'queue'), key('owners')],
argv: [],
)
nil
end
|
#report_worker_error(error) ⇒ Object
98
99
100
|
# File 'lib/ci/queue/redis/worker.rb', line 98
def report_worker_error(error)
build.report_worker_error(error)
end
|
#requeue(test, offset: Redis.requeue_offset) ⇒ Object
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
# File 'lib/ci/queue/redis/worker.rb', line 112
def requeue(test, offset: Redis.requeue_offset)
test_key = test.id
raise_on_mismatching_test(test_key)
global_max_requeues = config.global_max_requeues(total)
requeued = config.max_requeues > 0 && global_max_requeues > 0 && eval_script(
:requeue,
keys: [
key('processed'),
key('requeues-count'),
key('queue'),
key('running'),
key('worker', worker_id, 'queue'),
key('owners'),
],
argv: [config.max_requeues, global_max_requeues, test_key, offset],
) == 1
@reserved_test = test_key unless requeued
requeued
end
|
#retry_queue ⇒ Object
81
82
83
84
85
86
87
88
|
# File 'lib/ci/queue/redis/worker.rb', line 81
def retry_queue
failures = build.failed_tests.to_set
log = redis.lrange(key('worker', worker_id, 'queue'), 0, -1)
log.select! { |id| failures.include?(id) }
log.uniq!
log.reverse!
Retry.new(log, config, redis: redis)
end
|
#retrying? ⇒ Boolean
68
69
70
71
72
|
# File 'lib/ci/queue/redis/worker.rb', line 68
def retrying?
redis.exists?(key('worker', worker_id, 'queue'))
rescue *CONNECTION_ERRORS
false
end
|
#shutdown! ⇒ Object
39
40
41
|
# File 'lib/ci/queue/redis/worker.rb', line 39
def shutdown!
@shutdown_required = true
end
|
#shutdown_required? ⇒ Boolean
43
44
45
|
# File 'lib/ci/queue/redis/worker.rb', line 43
def shutdown_required?
@shutdown_required
end
|
#supervisor ⇒ Object
90
91
92
|
# File 'lib/ci/queue/redis/worker.rb', line 90
def supervisor
Supervisor.new(redis_url, config)
end
|