Class: Fluent::Plugin::RedisListMonitorInput

Inherits:
Input
  • Object
show all
Includes:
Fluent::PluginMixin::Redis
Defined in:
lib/fluent/plugin/in_redis_list_monitor.rb

Overview

Input plugin which will monitor the size of a redis list and periodically output metrics to the login pipeline.

Since:

  • 0.1.0

Instance Method Summary collapse

Methods included from Fluent::PluginMixin::Redis

included, #shutdown_redis, #start_redis

Constructor Details

#initializeNilClass

Initialize new input plugin

Since:

  • 0.1.0



21
22
23
# File 'lib/fluent/plugin/in_redis_list_monitor.rb', line 21

def initialize
  super
end

Instance Method Details

#action_pollNilClass

Action to execute when the monitor event watcher executes

The monitor is simply responsible for outputting the queue length to the logs as well as detecting zero length lists.

Returns:

  • (NilClass)

Since:

  • 0.1.0



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/fluent/plugin/in_redis_list_monitor.rb', line 107

def action_poll
  now = Engine.now

  if sleeping?
    log.trace "redis worker is sleeping"
    return
  end

  list_size = @redis.llen(@key)

  event = {
    "timestamp" => now,
    "message" => "redis queue monitor",
    "hostname" => @host,
    "key" => @key,
    "size" => list_size
  }

  router.emit @tag, now, event
rescue => e
  log.error "error monitoring queue", :error => e
  log.error_backtrace
  sleep!(@retry_interval)
end

#configure(config) ⇒ NilClass

Initialize attributes and parameters

Returns:

  • (NilClass)

Since:

  • 0.1.0



28
29
30
31
32
33
34
35
36
# File 'lib/fluent/plugin/in_redis_list_monitor.rb', line 28

def configure(config)
  super

  configure_params(config)
  configure_locking(config)

  @queue_length = 0
  @retry_at     = nil
end

#configure_locking(config) ⇒ NilClass

Configure locking

Returns:

  • (NilClass)

Since:

  • 0.1.0



51
52
53
54
# File 'lib/fluent/plugin/in_redis_list_monitor.rb', line 51

def configure_locking(config)
  @storage  = storage_create(type: 'local')
  @lock_key = "redis:#{@key}:lock"
end

#configure_params(config) ⇒ NilClass

Configure plugin parameters

Returns:

  • (NilClass)

Since:

  • 0.1.0



41
42
43
44
45
46
# File 'lib/fluent/plugin/in_redis_list_monitor.rb', line 41

def configure_params(config)
  %w(host port key tag).each do |key|
    next if instance_variable_get("@#{key}")
    raise Fluent::ConfigError, "configuration key missing: #{key}"
  end
end

#shutdownNilClass

Tear down the plugin

Returns:

  • (NilClass)

Since:

  • 0.1.0



79
80
81
82
# File 'lib/fluent/plugin/in_redis_list_monitor.rb', line 79

def shutdown
  super
  shutdown_redis
end

#sleep!(delay = @sleep_interval) ⇒ Integer

Set a sleep delay, ensuring that we will not attempt to fetch messages

Parameters:

  • delay, (Integer)

    the amount of seconds to wait

Returns:

  • (Integer)

    timestamp when this expires

Since:

  • 0.1.0



96
97
98
# File 'lib/fluent/plugin/in_redis_list_monitor.rb', line 96

def sleep!(delay = @sleep_interval)
  @retry_at = Engine.now + delay
end

#sleeping?TrueClass, FalseClass

Wether the poller has been temporarily disabled or should fetch messages been temporarily disabled

Returns:

  • (TrueClass, FalseClass)

Since:

  • 0.1.0



88
89
90
# File 'lib/fluent/plugin/in_redis_list_monitor.rb', line 88

def sleeping?
  @retry_at and @retry_at >= Engine.now
end

#startNilClass

Prepare the plugin event loop

This method will initialize the Redis connection object, create any required Redis structures as well as define and begin the event pollers.

Returns:

  • (NilClass)

Since:

  • 0.1.0



63
64
65
66
67
68
# File 'lib/fluent/plugin/in_redis_list_monitor.rb', line 63

def start
  super

  start_redis
  start_poller
end

#start_pollerObject

Since:

  • 0.1.0



70
71
72
73
74
# File 'lib/fluent/plugin/in_redis_list_monitor.rb', line 70

def start_poller
  timer_execute(:poll, @poll_interval) do
    action_poll
  end
end