Class: RedisRepeater::TransferSchedulerJob

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_repeater/transfer_scheduler_job.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(repeater, config) ⇒ TransferSchedulerJob

Returns a new instance of TransferSchedulerJob.



7
8
9
10
11
12
13
14
# File 'lib/redis_repeater/transfer_scheduler_job.rb', line 7

def initialize(repeater, config)
  @source = config[:source]
  @destinations = config[:destinations]
  @queue = config[:queue]
  @timeout = config[:timeout]
  @maintain_count = config[:maintain_count]
  @logger = repeater.logger
end

Instance Attribute Details

#destinationsObject (readonly)

Returns the value of attribute destinations.



5
6
7
# File 'lib/redis_repeater/transfer_scheduler_job.rb', line 5

def destinations
  @destinations
end

#maintain_countObject (readonly)

Returns the value of attribute maintain_count.



5
6
7
# File 'lib/redis_repeater/transfer_scheduler_job.rb', line 5

def maintain_count
  @maintain_count
end

#queueObject (readonly)

Returns the value of attribute queue.



5
6
7
# File 'lib/redis_repeater/transfer_scheduler_job.rb', line 5

def queue
  @queue
end

#sourceObject (readonly)

Returns the value of attribute source.



5
6
7
# File 'lib/redis_repeater/transfer_scheduler_job.rb', line 5

def source
  @source
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



5
6
7
# File 'lib/redis_repeater/transfer_scheduler_job.rb', line 5

def timeout
  @timeout
end

Instance Method Details

#performObject



16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/redis_repeater/transfer_scheduler_job.rb', line 16

def perform
  # Transport everything we can
  count = 0
  while (item = @source.lpop @queue)
    @destinations.each do |destination|
      destination[:server].rpush destination[:queue], item
    end
    count += 1
  end
  # Keep the count and log a debug message
  @source.incrby("redis-repeater:#{@queue}:count", count) if @maintain_count
  @logger.debug "Transported #{count} items from #{@queue} to #{@destinations.count} #{@destinations.count == 1 ? 'destination' : 'destinations'}" unless count == 0
end