Class: Minitest::Distributed::Coordinators::RedisCoordinator
- Inherits:
-
Object
- Object
- Minitest::Distributed::Coordinators::RedisCoordinator
- Extended by:
- T::Sig
- Includes:
- CoordinatorInterface
- Defined in:
- lib/minitest/distributed/coordinators/redis_coordinator.rb
Overview
The RedisCoordinator is an implementation of the test coordinator interface using a Redis stream + consumergroup for coordination.
We assume a bunch of workers will be started at the same time. Every worker will try to become the leader by trying to create the consumergroup. Only one will succeed, which will then continue to populate the list of tests to run to the stream.
AFter that, all workers will start consuming from the stream. They will first try to claim stale entries from other workers (determined by the ‘test_timeout_seconds` option), and process them up to a maximum of `max_attempts` attempts. Then, they will consume tests from the stream, run them, and ack them. This is done in batches to reduce load on Redis.
Retrying failed tests (up to ‘max_attempts` times) uses the same mechanism. When a test fails, and we haven’t exhausted the maximum number of attempts, we do not ACK the result with Redis. The means that another worker will eventually claim the test, and run it again. However, in this case we don’t want to slow things down unnecessarily. When a test fails and we want to retry it, we add the test to the ‘retry_set` in Redis. When other worker sees that a test is in this set, it can immediately claim the test, rather than waiting for the timeout.
Finally, when we have acked the same number of tests as we populated into the queue, the run is considered finished. The first worker to detect this will remove the consumergroup and the associated stream from Redis.
If a worker starts for the same run_id while it is already considered completed, it will start a “retry run”. It will find all the tests that failed/errored on the previous attempt, and schedule only those tests to be run, rather than the full test suite returned by the test selector. This can be useful to retry flaky tests. Subsequent workers coming online will join this worker to form a consumer group exactly as described above.
Instance Attribute Summary collapse
-
#configuration ⇒ Object
readonly
Returns the value of attribute configuration.
-
#group_name ⇒ Object
readonly
Returns the value of attribute group_name.
-
#local_results ⇒ Object
readonly
Returns the value of attribute local_results.
-
#reclaimed_failed_tests ⇒ Object
readonly
Returns the value of attribute reclaimed_failed_tests.
-
#reclaimed_timeout_tests ⇒ Object
readonly
Returns the value of attribute reclaimed_timeout_tests.
-
#stream_key ⇒ Object
readonly
Returns the value of attribute stream_key.
Instance Method Summary collapse
- #aborted? ⇒ Boolean
- #combined_results ⇒ Object
- #consume(reporter:) ⇒ Object
-
#initialize(configuration:) ⇒ RedisCoordinator
constructor
A new instance of RedisCoordinator.
- #produce(test_selector:) ⇒ Object
- #register_reporters(reporter:, options:) ⇒ Object
Constructor Details
#initialize(configuration:) ⇒ RedisCoordinator
Returns a new instance of RedisCoordinator.
65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/minitest/distributed/coordinators/redis_coordinator.rb', line 65 def initialize(configuration:) @configuration = configuration @redis = T.let(nil, T.nilable(Redis)) @stream_key = T.let(key("queue"), String) @group_name = T.let("minitest-distributed", String) @local_results = T.let(ResultAggregate.new, ResultAggregate) @combined_results = T.let(nil, T.nilable(ResultAggregate)) @reclaimed_timeout_tests = T.let(Set.new, T::Set[EnqueuedRunnable]) @reclaimed_failed_tests = T.let(Set.new, T::Set[EnqueuedRunnable]) @aborted = T.let(false, T::Boolean) end |
Instance Attribute Details
#configuration ⇒ Object (readonly)
Returns the value of attribute configuration.
47 48 49 |
# File 'lib/minitest/distributed/coordinators/redis_coordinator.rb', line 47 def configuration @configuration end |
#group_name ⇒ Object (readonly)
Returns the value of attribute group_name.
53 54 55 |
# File 'lib/minitest/distributed/coordinators/redis_coordinator.rb', line 53 def group_name @group_name end |
#local_results ⇒ Object (readonly)
Returns the value of attribute local_results.
56 57 58 |
# File 'lib/minitest/distributed/coordinators/redis_coordinator.rb', line 56 def local_results @local_results end |
#reclaimed_failed_tests ⇒ Object (readonly)
Returns the value of attribute reclaimed_failed_tests.
62 63 64 |
# File 'lib/minitest/distributed/coordinators/redis_coordinator.rb', line 62 def reclaimed_failed_tests @reclaimed_failed_tests end |
#reclaimed_timeout_tests ⇒ Object (readonly)
Returns the value of attribute reclaimed_timeout_tests.
59 60 61 |
# File 'lib/minitest/distributed/coordinators/redis_coordinator.rb', line 59 def reclaimed_timeout_tests @reclaimed_timeout_tests end |
#stream_key ⇒ Object (readonly)
Returns the value of attribute stream_key.
50 51 52 |
# File 'lib/minitest/distributed/coordinators/redis_coordinator.rb', line 50 def stream_key @stream_key end |
Instance Method Details
#aborted? ⇒ Boolean
121 122 123 |
# File 'lib/minitest/distributed/coordinators/redis_coordinator.rb', line 121 def aborted? @aborted end |
#combined_results ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/minitest/distributed/coordinators/redis_coordinator.rb', line 84 def combined_results @combined_results ||= begin stats_as_string = redis.mget( key("runs"), key("assertions"), key("passes"), key("failures"), key("errors"), key("skips"), key("requeues"), key("discards"), key("acks"), key("size"), ) ResultAggregate.new( max_failures: configuration.max_failures, runs: Integer(stats_as_string.fetch(0) || 0), assertions: Integer(stats_as_string.fetch(1) || 0), passes: Integer(stats_as_string.fetch(2) || 0), failures: Integer(stats_as_string.fetch(3) || 0), errors: Integer(stats_as_string.fetch(4) || 0), skips: Integer(stats_as_string.fetch(5) || 0), requeues: Integer(stats_as_string.fetch(6) || 0), discards: Integer(stats_as_string.fetch(7) || 0), acks: Integer(stats_as_string.fetch(8) || 0), # In the case where we have no build size number published yet, we initialize # thesize of the test suite to be arbitrarity large, to make sure it is # higher than the number of acks, so the run is not consider completed yet. size: Integer(stats_as_string.fetch(9) || 2_147_483_647), ) end end |
#consume(reporter:) ⇒ Object
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/minitest/distributed/coordinators/redis_coordinator.rb', line 221 def consume(reporter:) exponential_backoff = INITIAL_BACKOFF loop do # First, see if there are any pending tests from other workers to claim. stale_runnables = claim_stale_runnables process_batch(stale_runnables, reporter) # Then, try to process a regular batch of messages fresh_runnables = claim_fresh_runnables(block: exponential_backoff) process_batch(fresh_runnables, reporter) # If we have acked the same amount of tests as we were supposed to, the run # is complete and we can exit our loop. Generally, only one worker will detect # this condition. The pther workers will quit their consumer loop because the # consumergroup will be deleted by the first worker, and their Redis commands # will start to fail - see the rescue block below. break if combined_results.complete? # We also abort a run if we reach the maximum number of failures break if combined_results.abort? # To make sure we don't end up in a busy loop overwhelming Redis with commands # when there is no work to do, we increase the blocking time exponentially, # and reset it to the initial value if we processed any tests. if stale_runnables.empty? && fresh_runnables.empty? exponential_backoff <<= 1 else exponential_backoff = INITIAL_BACKOFF end end cleanup rescue Redis::CommandError => ce if ce..start_with?("NOGROUP") # When a redis conumer group commands fails with a NOGROUP error, we assume the # consumer group was deleted by the first worker that detected the run is complete. # So this worker can exit its loop as well. # We have to invalidate the local combined_results cache so we get fresh # final values from Redis when we try to report results in our summarizer. @combined_results = nil else raise end end |
#produce(test_selector:) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/minitest/distributed/coordinators/redis_coordinator.rb', line 126 def produce(test_selector:) # Whoever ends up creating the consumer group will act as leader, # and publish the list of tests to the stream. consumer_group_exists = false initial_attempt = begin # When using `redis.multi`, the second DEL command gets executed even if the initial GROUP # fails. This is bad, because only the leader should be issuing the DEL command. # When using EVAL and a Lua script, the script aborts after the first XGROUP command # fails, and the DEL never gets executed for followers. keys_deleted = redis.evalsha( register_consumergroup_script, keys: [stream_key, key("size"), key("acks")], argv: [group_name], ) keys_deleted == 0 rescue Redis::CommandError => ce if ce..include?("BUSYGROUP") # If Redis returns a BUSYGROUP error, it means that the consumer group already # exists. In our case, it means that another worker managed to successfully # run the XGROUP command, and will act as leader and publish the tests. # This worker can simply move on the consumer mode. consumer_group_exists = true else raise end end return if consumer_group_exists tests = T.let([], T::Array[Minitest::Runnable]) tests = if initial_attempt # If this is the first attempt for this run ID, we will schedule the full # test suite as returned by the test selector to run. tests_from_selector = test_selector.tests adjust_combined_results(ResultAggregate.new(size: tests_from_selector.size)) tests_from_selector elsif configuration.retry_failures # Before starting a retry attempt, we first check if the previous attempt # was aborted before it was completed. If this is the case, we cannot use # retry mode, and should immediately fail the attempt. if combined_results.abort? # We mark this run as aborted, which causes this worker to not be successful. @aborted = true # We still publish an empty size run to Redis, so if there are any followers, # they will wind down normally. Only the leader will exit # with a non-zero exit status and fail the build; any follower will # exit with status 0. adjust_combined_results(ResultAggregate.new(size: 0)) T.let([], T::Array[Minitest::Runnable]) else previous_failures, previous_errors, _deleted = redis.multi do |pipeline| pipeline.lrange(list_key(ResultType::Failed.serialize), 0, -1) pipeline.lrange(list_key(ResultType::Error.serialize), 0, -1) pipeline.del(list_key(ResultType::Failed.serialize), list_key(ResultType::Error.serialize)) end # We set the `size` key to the number of tests we are planning to schedule. # We also adjust the number of failures and errors back to 0. # We set the number of requeues to the number of tests that failed, so the # run statistics will reflect that we retried some failed test. # # However, normally requeues are not acked, as we expect the test to be acked # by another worker later. This makes the test loop think iot is already done. # To prevent this, we initialize the number of acks negatively, so it evens out # in the statistics. total_failures = previous_failures.length + previous_errors.length adjust_combined_results(ResultAggregate.new( size: total_failures, failures: -previous_failures.length, errors: -previous_errors.length, requeues: total_failures, )) # For subsequent attempts, we check the list of previous failures and # errors, and only schedule to re-run those tests. This allows for faster # retries of potentially flaky tests. test_identifiers_to_retry = T.let(previous_failures + previous_errors, T::Array[String]) test_identifiers_to_retry.map { |identifier| DefinedRunnable.from_identifier(identifier) } end else adjust_combined_results(ResultAggregate.new(size: 0)) T.let([], T::Array[Minitest::Runnable]) end redis.pipelined do |pipeline| tests.each do |test| pipeline.xadd(stream_key, { class_name: T.must(test.class.name), method_name: test.name }) end end end |
#register_reporters(reporter:, options:) ⇒ Object
79 80 81 |
# File 'lib/minitest/distributed/coordinators/redis_coordinator.rb', line 79 def register_reporters(reporter:, options:) reporter << Reporters::RedisCoordinatorWarningsReporter.new([:io], ) end |