Class: Fluent::Plugin::RedisListPollerInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::RedisListPollerInput
- Includes:
- Fluent::PluginMixin::Redis
- Defined in:
- lib/fluent/plugin/in_redis_list_poller.rb
Instance Method Summary collapse
-
#action_locking_monitor ⇒ NilClass
Action to execute when polling for the lock key.
-
#action_poll ⇒ NilClass
Action to execute when the poller event watcher executes.
-
#batched? ⇒ TrueClass, FalseClass
Whether to fetch a single item or a multiple items in batch.
-
#configure(config) ⇒ NilClass
Initialize attributes and parameters.
-
#configure_locking(config) ⇒ NilClass
Configure locking.
-
#configure_params(config) ⇒ NilClass
Configure plugin parameters.
-
#configure_parser(config) ⇒ NilClass
Configure record parser.
-
#initialize ⇒ NilClass
constructor
Initialize new input plugin.
-
#locked? ⇒ TrueClass, FalseClass
Whether the poller has been locked.
-
#poll_messages ⇒ NilClass
Poll messages from the redis server in either single message or batch mode.
-
#shutdown ⇒ NilClass
Tear down the plugin.
-
#sleep!(delay = @sleep_interval) ⇒ Integer
Set a sleep delay, ensuring that we will not attempt to fetch messages.
-
#sleeping? ⇒ TrueClass, FalseClass
Wether the poller has been temporarily disabled or should fetch messages been temporarily disabled.
-
#start ⇒ NilClass
Prepare the plugin event loop.
-
#start_monitor ⇒ NilClass
Prepare the Redis queue monitor.
-
#start_poller ⇒ NilClass
Prepare the Redis queue poller.
Methods included from Fluent::PluginMixin::Redis
included, #shutdown_redis, #start_redis
Constructor Details
#initialize ⇒ NilClass
Initialize new input plugin
32 33 34 35 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 32 def initialize super require 'msgpack' end |
Instance Method Details
#action_locking_monitor ⇒ NilClass
Action to execute when polling for the lock key
186 187 188 189 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 186 def action_locking_monitor lock_value = @redis.get(@lock_key) @storage.put(@lock_key, lock_value) end |
#action_poll ⇒ NilClass
Action to execute when the poller event watcher executes
Given that the watcher is pretty lightweight, we simply return if the worker has been set to sleep instead of actually sleeping. Doing otherwise seemed to cause locking.
Otherwise we iterate through messages, parse and emit them.
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 201 def action_poll now = Engine.now = [] if sleeping? log.trace "redis worker is sleeping" return end if locked? log.trace "redis queue is locked" return end do || if .nil? log.debug "redis queue is empty" sleep!(@sleep_interval) break end @parser.parse() do |time, record| if time && record router.emit @tag || @key, time || Engine.now, record else log.warn "failed to parse message: #{}" end end end rescue => e log.error "error fetching record", :error => e log.error_backtrace sleep!(@retry_interval) end |
#batched? ⇒ TrueClass, FalseClass
Whether to fetch a single item or a multiple items in batch
133 134 135 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 133 def batched? @batch_size and @batch_size > 1 end |
#configure(config) ⇒ NilClass
Initialize attributes and parameters
40 41 42 43 44 45 46 47 48 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 40 def configure(config) super configure_params(config) configure_parser(config) configure_locking(config) @retry_at = nil end |
#configure_locking(config) ⇒ NilClass
Configure locking
77 78 79 80 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 77 def configure_locking(config) @storage = storage_create(type: 'local') @lock_key = "fluentd:#{@key}:lock" end |
#configure_params(config) ⇒ NilClass
Configure plugin parameters
53 54 55 56 57 58 59 60 61 62 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 53 def configure_params(config) %w(host port key command tag).each do |key| next if instance_variable_get("@#{key}") raise Fluent::ConfigError, "configuration key missing: #{key}" end unless %w(lpop rpop).include?(@command) raise Fluent::ConfigError, "command must be either lpop or rpop" end end |
#configure_parser(config) ⇒ NilClass
Configure record parser
67 68 69 70 71 72 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 67 def configure_parser(config) parser_config = @parse.corresponding_config_element parser_type = parser_config['@type'] @parser = Fluent::Plugin.new_parser(parser_type, :parent => self) @parser.configure(parser_config) end |
#locked? ⇒ TrueClass, FalseClass
Whether the poller has been locked
148 149 150 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 148 def locked? @storage.get(@lock_key) end |
#poll_messages ⇒ NilClass
Poll messages from the redis server in either single message or batch mode.
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 165 def commands = [] if batched? @redis.pipelined do @batch_size.times do commands << @redis.call(@command, @key) end end else commands << @redis.call(@command, @key) end commands.each do |command| yield command.is_a?(Redis::Future) ? command.value : command end end |
#shutdown ⇒ NilClass
Tear down the plugin
125 126 127 128 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 125 def shutdown super shutdown_redis end |
#sleep!(delay = @sleep_interval) ⇒ Integer
Set a sleep delay, ensuring that we will not attempt to fetch messages
156 157 158 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 156 def sleep!(delay = @sleep_interval) @retry_at = Engine.now + delay end |
#sleeping? ⇒ TrueClass, FalseClass
Wether the poller has been temporarily disabled or should fetch messages been temporarily disabled
141 142 143 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 141 def sleeping? @retry_at and @retry_at >= Engine.now end |
#start ⇒ NilClass
Prepare the plugin event loop
This method will initialize the Redis connection object, create any required Redis structures as well as define and begin the event pollers.
89 90 91 92 93 94 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 89 def start super start_redis start_poller start_monitor end |
#start_monitor ⇒ NilClass
Prepare the Redis queue monitor
This timed event will routinely poll for a lock key and disable the queue poller if required
116 117 118 119 120 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 116 def start_monitor timer_execute(:monitor, 1) do action_locking_monitor end end |
#start_poller ⇒ NilClass
Prepare the Redis queue poller
This timed event will routinely poll items from the Redis list and emit those through the pipeline.
103 104 105 106 107 |
# File 'lib/fluent/plugin/in_redis_list_poller.rb', line 103 def start_poller timer_execute(:poller, @poll_interval) do action_poll end end |