Class: Fluent::Plugin::RedisListPollerInput

Inherits:
Input
  • Object
show all
Includes:
Fluent::PluginMixin::Redis
Defined in:
lib/fluent/plugin/in_redis_list_source.rb

Instance Method Summary collapse

Methods included from Fluent::PluginMixin::Redis

included, #shutdown_redis, #start_redis

Constructor Details

#initializeNilClass

Initialize new input plugin

Since:

  • 0.1.0



32
33
34
35
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 32

def initialize
  super
  require 'msgpack'
end

Instance Method Details

#action_locking_monitorNilClass

Action to execute when polling for the lock key

Returns:

  • (NilClass)

Since:

  • 0.1.0



186
187
188
189
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 186

def action_locking_monitor
  lock_value = @redis.get(@lock_key)
  @storage.put(@lock_key, lock_value)
end

#action_pollNilClass

Action to execute when the poller event watcher executes

Given that the watcher is pretty lightweight, we simply return if the worker has been set to sleep instead of actually sleeping. Doing otherwise seemed to cause locking.

Otherwise we iterate through messages, parse and emit them.

Returns:

  • (NilClass)

Since:

  • 0.1.0



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 201

def action_poll
  now = Engine.now
  messages = []

  if sleeping?
    log.trace "redis worker is sleeping"
    return
  end

  if locked?
    log.trace "redis queue is locked"
    return
  end

  poll_messages do |message|
    if message.nil?
      log.debug "redis queue is empty"
      sleep!(@sleep_interval)
      break
    end

    @parser.parse(message) do |time, record|
      if time && record
        router.emit @tag || @key, time || Engine.now, record
      else
        log.warn "failed to parse message: #{message}"
      end
    end
  end
rescue => e
  log.error "error fetching record", :error => e
  log.error_backtrace
  sleep!(@retry_interval)
end

#batched?TrueClass, FalseClass

Whether to fetch a single item or a multiple items in batch

Returns:

  • (TrueClass, FalseClass)

Since:

  • 0.1.0



133
134
135
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 133

def batched?
  @batch_size and @batch_size > 1
end

#configure(config) ⇒ NilClass

Initialize attributes and parameters

Returns:

  • (NilClass)

Since:

  • 0.1.0



40
41
42
43
44
45
46
47
48
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 40

def configure(config)
  super

  configure_params(config)
  configure_parser(config)
  configure_locking(config)

  @retry_at     = nil
end

#configure_locking(config) ⇒ NilClass

Configure locking

Returns:

  • (NilClass)

Since:

  • 0.1.0



77
78
79
80
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 77

def configure_locking(config)
  @storage  = storage_create(type: 'local')
  @lock_key = "fluentd:#{@key}:lock"
end

#configure_params(config) ⇒ NilClass

Configure plugin parameters

Returns:

  • (NilClass)

Since:

  • 0.1.0



53
54
55
56
57
58
59
60
61
62
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 53

def configure_params(config)
  %w(host port key command tag).each do |key|
    next if instance_variable_get("@#{key}")
    raise Fluent::ConfigError, "configuration key missing: #{key}"
  end

  unless %w(lpop rpop).include?(@command)
    raise Fluent::ConfigError, "command must be either lpop or rpop"
  end
end

#configure_parser(config) ⇒ NilClass

Configure record parser

Returns:

  • (NilClass)

Since:

  • 0.1.0



67
68
69
70
71
72
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 67

def configure_parser(config)
  parser_config = @parse.corresponding_config_element
  parser_type = parser_config['@type']
  @parser = Fluent::Plugin.new_parser(parser_type, :parent => self)
  @parser.configure(parser_config)
end

#locked?TrueClass, FalseClass

Whether the poller has been locked

Returns:

  • (TrueClass, FalseClass)

Since:

  • 0.1.0



148
149
150
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 148

def locked?
  @storage.get(@lock_key)
end

#poll_messagesNilClass

Poll messages from the redis server in either single message or batch mode.

Parameters:

  • the (&block)

    block to yield single messages to

Returns:

  • (NilClass)

Since:

  • 0.1.0



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 165

def poll_messages
  commands = []

  if batched?
    @redis.pipelined do
      @batch_size.times do
        commands << @redis.call(@command, @key)
      end
    end
  else
    commands << @redis.call(@command, @key)
  end

  commands.each do |command|
    yield command.is_a?(Redis::Future) ? command.value : command
  end
end

#shutdownNilClass

Tear down the plugin

Returns:

  • (NilClass)

Since:

  • 0.1.0



125
126
127
128
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 125

def shutdown
  super
  shutdown_redis
end

#sleep!(delay = @sleep_interval) ⇒ Integer

Set a sleep delay, ensuring that we will not attempt to fetch messages

Parameters:

  • delay, (Integer)

    the amount of seconds to wait

Returns:

  • (Integer)

    timestamp when this expires

Since:

  • 0.1.0



156
157
158
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 156

def sleep!(delay = @sleep_interval)
  @retry_at = Engine.now + delay
end

#sleeping?TrueClass, FalseClass

Wether the poller has been temporarily disabled or should fetch messages been temporarily disabled

Returns:

  • (TrueClass, FalseClass)

Since:

  • 0.1.0



141
142
143
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 141

def sleeping?
  @retry_at and @retry_at >= Engine.now
end

#startNilClass

Prepare the plugin event loop

This method will initialize the Redis connection object, create any required Redis structures as well as define and begin the event pollers.

Returns:

  • (NilClass)

Since:

  • 0.1.0



89
90
91
92
93
94
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 89

def start
  super
  start_redis
  start_poller
  start_monitor
end

#start_monitorNilClass

Prepare the Redis queue monitor

This timed event will routinely poll for a lock key and disable the queue poller if required

Returns:

  • (NilClass)

Since:

  • 0.1.1



116
117
118
119
120
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 116

def start_monitor
  timer_execute(:monitor, 1) do
    action_locking_monitor
  end
end

#start_pollerNilClass

Prepare the Redis queue poller

This timed event will routinely poll items from the Redis list and emit those through the pipeline.

Returns:

  • (NilClass)

Since:

  • 0.1.0



103
104
105
106
107
# File 'lib/fluent/plugin/in_redis_list_source.rb', line 103

def start_poller
  timer_execute(:poller, @poll_interval) do
    action_poll
  end
end