Class: LogStash::Inputs::RedisCluster

Inherits:
Threadable
  • Object
show all
Defined in:
lib/logstash/inputs/redis_cluster.rb

Instance Method Summary collapse

Instance Method Details

#add_external_redis_builder(builder) ⇒ Object

public API use to store a proc that can provide a redis instance or mock



66
67
68
69
# File 'lib/logstash/inputs/redis_cluster.rb', line 66

def add_external_redis_builder(builder) #callable
  @redis_builder = builder
  self
end

#new_redis_instanceObject



77
78
79
# File 'lib/logstash/inputs/redis_cluster.rb', line 77

def new_redis_instance
  @redis_builder.call
end

#registerObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/logstash/inputs/redis_cluster.rb', line 81

def register
  @redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}"

	@batch_offset = Random.rand(1024)
  # TODO remove after setting key and data_type to true
	
	
	if @driver == "jedis" then
require 'org/apache/commons/commons-pool2/2.3/commons-pool2-2.3.jar'
require 'redis/clients/jedis/2.7.2/jedis-2.7.2.jar'
@error_handler = method(:error_handler_jedis)
	else
require 'org/apache/commons/commons-pool2/2.3/redis-rb-cluster'
@error_handler = method(:error_handler_redis)
	end
 
  if !@keys || !@data_type
    raise RuntimeError.new(
      "Must define queue, or key and data_type parameters"
    )
  end
  # end TODO

  @redis_builder ||= method(:internal_redis_builder)

  # just switch on data_type once
  if @data_type == 'list' || @data_type == 'dummy'
    @run_method = method(:list_runner)
    @stop_method = method(:list_stop)
  elsif @data_type == 'channel'
    @run_method = method(:channel_runner)
    @stop_method = method(:subscribe_stop)
  elsif @data_type == 'pattern_channel'
    @run_method = method(:pattern_channel_runner)
    @stop_method = method(:subscribe_stop)
  end

  # TODO(sissel, boertje): set @identity directly when @name config option is removed.
  @identity = @name != 'default' ? @name : "#{@redis_url} #{@data_type}"
  @logger.info("Registering Redis", :identity => @identity)
end

#run(output_queue) ⇒ Object

def register



123
124
125
126
127
# File 'lib/logstash/inputs/redis_cluster.rb', line 123

def run(output_queue)
  @run_method.call(output_queue)
rescue LogStash::ShutdownSignal
  # ignore and quit
end

#stopObject

def run



129
130
131
# File 'lib/logstash/inputs/redis_cluster.rb', line 129

def stop
  @stop_method.call
end

#use_redis(instance) ⇒ Object

use to apply an instance directly and bypass the builder



72
73
74
75
# File 'lib/logstash/inputs/redis_cluster.rb', line 72

def use_redis(instance)
  @redis = instance
  self
end