Class: RedisClient::Cluster::ConcurrentWorker::Group
- Inherits:
-
Object
- Object
- RedisClient::Cluster::ConcurrentWorker::Group
- Defined in:
- lib/redis_client/cluster/concurrent_worker.rb
Defined Under Namespace
Classes: Task
Instance Method Summary collapse
- #close ⇒ Object
- #each ⇒ Object
-
#initialize(worker:, queue:, size:) ⇒ Group
constructor
A new instance of Group.
- #inspect ⇒ Object
- #push(id, *args, **kwargs, &block) ⇒ Object
Constructor Details
#initialize(worker:, queue:, size:) ⇒ Group
Returns a new instance of Group.
33 34 35 36 37 38 |
# File 'lib/redis_client/cluster/concurrent_worker.rb', line 33 def initialize(worker:, queue:, size:) @worker = worker @queue = queue @size = size @count = 0 end |
Instance Method Details
#close ⇒ Object
60 61 62 63 64 65 |
# File 'lib/redis_client/cluster/concurrent_worker.rb', line 60 def close @queue.clear @queue.close if @queue.respond_to?(:close) @count = 0 nil end |
#each ⇒ Object
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/redis_client/cluster/concurrent_worker.rb', line 49 def each raise InvalidNumberOfTasks, "expected: #{@size}, actual: #{@count}" if @count != @size @size.times do task = @queue.pop yield(task.id, task.result) end nil end |
#inspect ⇒ Object
67 68 69 |
# File 'lib/redis_client/cluster/concurrent_worker.rb', line 67 def inspect "#<#{self.class.name} size: #{@count}, max: #{@size}, worker: #{@worker.class.name}>" end |
#push(id, *args, **kwargs, &block) ⇒ Object
40 41 42 43 44 45 46 47 |
# File 'lib/redis_client/cluster/concurrent_worker.rb', line 40 def push(id, *args, **kwargs, &block) raise InvalidNumberOfTasks, "max size reached: #{@count}" if @count == @size task = Task.new(id: id, queue: @queue, args: args, kwargs: kwargs, block: block) @worker.push(task) @count += 1 nil end |