Class: InfluxDB::AsyncQueue::Adapters::Redis

Inherits:
Object
  • Object
show all
Defined in:
lib/influxdb/async_queue/adapters/redis.rb

Constant Summary collapse

DEFAULT_QUEUE_NAME =
'influxdb.async_queue'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis, queue_name = nil) ⇒ Redis

Returns a new instance of Redis.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/influxdb/async_queue/adapters/redis.rb', line 9

def initialize(redis, queue_name = nil)
  @redis =
    if redis.is_a?(::Redis)
      redis
    elsif redis.is_a?(String)
      ::Redis.new(url: redis)
    elsif redis.is_a?(Hash)
      ::Redis.new(redis)
    else
      raise ArgumentError
    end

  @queue_name = queue_name || DEFAULT_QUEUE_NAME
end

Instance Attribute Details

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



23
24
25
# File 'lib/influxdb/async_queue/adapters/redis.rb', line 23

def queue_name
  @queue_name
end

#redisObject (readonly)

Returns the value of attribute redis.



23
24
25
# File 'lib/influxdb/async_queue/adapters/redis.rb', line 23

def redis
  @redis
end

Instance Method Details

#pop(batch_size) ⇒ Object



25
26
27
28
29
30
# File 'lib/influxdb/async_queue/adapters/redis.rb', line 25

def pop(batch_size)
  redis.multi do |r|
    r.lrange(queue_name, 0, batch_size - 1)
    r.ltrim(queue_name, batch_size, -1)
  end.first
end

#push(*points) ⇒ Object



32
33
34
# File 'lib/influxdb/async_queue/adapters/redis.rb', line 32

def push(*points)
  redis.rpush(queue_name, points)
end