Class: CI::Queue::Redis::Worker

Inherits:
Base
  • Object
show all
Defined in:
lib/ci/queue/redis/worker.rb

Direct Known Subclasses

Grind, TestTimeRecord

Constant Summary

Constants inherited from Base

Base::CONNECTION_ERRORS, Base::TEN_MINUTES

Constants included from Common

Common::CONNECTION_ERRORS

Instance Attribute Summary collapse

Attributes included from Common

#config

Instance Method Summary collapse

Methods inherited from Base

#boot_heartbeat_process!, #created_at=, #custom_config, #custom_middlewares, #ensure_heartbeat_thread_alive!, #exhausted?, #expired?, #increment_test_failed, #max_test_failed?, #progress, #queue_initialized?, #queue_initializing?, #reconnect_attempts, #remaining, #running, #size, #stop_heartbeat!, #test_failed, #to_a, #wait_for_master, #with_heartbeat, #workers_count

Methods included from Common

#flaky?, #report_failure!, #report_success!, #rescue_connection_errors

Constructor Details

#initialize(redis, config) ⇒ Worker

Returns a new instance of Worker.



18
19
20
21
22
# File 'lib/ci/queue/redis/worker.rb', line 18

def initialize(redis, config)
  @reserved_test = nil
  @shutdown_required = false
  super(redis, config)
end

Instance Attribute Details

#totalObject (readonly)

Returns the value of attribute total.



16
17
18
# File 'lib/ci/queue/redis/worker.rb', line 16

def total
  @total
end

Instance Method Details

#acknowledge(test) ⇒ Object



102
103
104
105
106
107
108
109
110
# File 'lib/ci/queue/redis/worker.rb', line 102

def acknowledge(test)
  test_key = test.id
  raise_on_mismatching_test(test_key)
  eval_script(
    :acknowledge,
    keys: [key('running'), key('processed'), key('owners')],
    argv: [test_key],
  ) == 1
end

#buildObject



94
95
96
# File 'lib/ci/queue/redis/worker.rb', line 94

def build
  @build ||= CI::Queue::Redis::BuildRecord.new(self, redis, config)
end

#distributed?Boolean

Returns:

  • (Boolean)


24
25
26
# File 'lib/ci/queue/redis/worker.rb', line 24

def distributed?
  true
end

#master?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/ci/queue/redis/worker.rb', line 47

def master?
  @master
end

#pollObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/ci/queue/redis/worker.rb', line 51

def poll
  wait_for_master
  until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed?
    if test = reserve
      yield index.fetch(test)
    else
      sleep 0.05
    end
  end
  redis.pipelined do |pipeline|
    pipeline.expire(key('worker', worker_id, 'queue'), config.redis_ttl)
    pipeline.expire(key('processed'), config.redis_ttl)
  end
rescue *CONNECTION_ERRORS
end

#populate(tests, random: Random.new) ⇒ Object



28
29
30
31
32
33
# File 'lib/ci/queue/redis/worker.rb', line 28

def populate(tests, random: Random.new)
  @index = tests.map { |t| [t.id, t] }.to_h
  tests = Queue.shuffle(tests, random)
  push(tests.map(&:id))
  self
end

#populated?Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/ci/queue/redis/worker.rb', line 35

def populated?
  !!defined?(@index)
end

#release!Object



134
135
136
137
138
139
140
141
# File 'lib/ci/queue/redis/worker.rb', line 134

def release!
  eval_script(
    :release,
    keys: [key('running'), key('worker', worker_id, 'queue'), key('owners')],
    argv: [],
  )
  nil
end

#report_worker_error(error) ⇒ Object



98
99
100
# File 'lib/ci/queue/redis/worker.rb', line 98

def report_worker_error(error)
  build.report_worker_error(error)
end

#requeue(test, offset: Redis.requeue_offset) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/ci/queue/redis/worker.rb', line 112

def requeue(test, offset: Redis.requeue_offset)
  test_key = test.id
  raise_on_mismatching_test(test_key)
  global_max_requeues = config.global_max_requeues(total)

  requeued = config.max_requeues > 0 && global_max_requeues > 0 && eval_script(
    :requeue,
    keys: [
      key('processed'),
      key('requeues-count'),
      key('queue'),
      key('running'),
      key('worker', worker_id, 'queue'),
      key('owners'),
    ],
    argv: [config.max_requeues, global_max_requeues, test_key, offset],
  ) == 1

  @reserved_test = test_key unless requeued
  requeued
end

#retry_queueObject



81
82
83
84
85
86
87
88
# File 'lib/ci/queue/redis/worker.rb', line 81

def retry_queue
  failures = build.failed_tests.to_set
  log = redis.lrange(key('worker', worker_id, 'queue'), 0, -1)
  log.select! { |id| failures.include?(id) }
  log.uniq!
  log.reverse!
  Retry.new(log, config, redis: redis)
end

#retrying?Boolean

Returns:

  • (Boolean)


68
69
70
71
72
# File 'lib/ci/queue/redis/worker.rb', line 68

def retrying?
  redis.exists?(key('worker', worker_id, 'queue'))
rescue *CONNECTION_ERRORS
  false
end

#shutdown!Object



39
40
41
# File 'lib/ci/queue/redis/worker.rb', line 39

def shutdown!
  @shutdown_required = true
end

#shutdown_required?Boolean

Returns:

  • (Boolean)


43
44
45
# File 'lib/ci/queue/redis/worker.rb', line 43

def shutdown_required?
  @shutdown_required
end

#supervisorObject



90
91
92
# File 'lib/ci/queue/redis/worker.rb', line 90

def supervisor
  Supervisor.new(redis_url, config)
end