Module: Fluent::PluginHelper::Storage

Includes:
Timer
Defined in:
lib/fluent/plugin_helper/storage.rb

Defined Under Namespace

Modules: StorageParams Classes: PersistentWrapper, StorageState, SynchronizeWrapper

Constant Summary

Constants included from EventLoop

EventLoop::EVENT_LOOP_RUN_DEFAULT_TIMEOUT, EventLoop::EVENT_LOOP_SHUTDOWN_TIMEOUT

Constants included from Thread

Thread::THREAD_DEFAULT_WAIT_SECONDS, Thread::THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS

Instance Attribute Summary collapse

Attributes included from Timer

#_timers

Attributes included from EventLoop

#_event_loop

Attributes included from Thread

#_threads

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Timer

#timer_execute, #timer_running?

Methods included from EventLoop

#event_loop_attach, #event_loop_detach, #event_loop_running?, #event_loop_wait_until_start, #event_loop_wait_until_stop

Methods included from Thread

#thread_create, #thread_current_running?, #thread_exist?, #thread_running?, #thread_started?, #thread_wait_until_start, #thread_wait_until_stop

Instance Attribute Details

#_storagesObject (readonly)

for tests



92
93
94
# File 'lib/fluent/plugin_helper/storage.rb', line 92

def _storages
  @_storages
end

Class Method Details

.included(mod) ⇒ Object



88
89
90
# File 'lib/fluent/plugin_helper/storage.rb', line 88

def self.included(mod)
  mod.include StorageParams
end

Instance Method Details

#after_shutdownObject



166
167
168
169
# File 'lib/fluent/plugin_helper/storage.rb', line 166

def after_shutdown
  storage_operate(:after_shutdown)
  super
end

#before_shutdownObject



154
155
156
157
# File 'lib/fluent/plugin_helper/storage.rb', line 154

def before_shutdown
  storage_operate(:before_shutdown)
  super
end

#closeObject



171
172
173
174
# File 'lib/fluent/plugin_helper/storage.rb', line 171

def close
  storage_operate(:close){|s| s.running = false }
  super
end

#configure(conf) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/fluent/plugin_helper/storage.rb', line 100

def configure(conf)
  super

  @storage_configs.each do |section|
    if !section.usage.empty? && section.usage !~ /^[a-zA-Z][-_.a-zA-Z0-9]*$/
      raise Fluent::ConfigError, "Argument in <storage ARG> uses invalid characters: '#{section.usage}'"
    end
    if @_storages[section.usage]
      raise Fluent::ConfigError, "duplicated storages configured: #{section.usage}"
    end
    storage = Plugin.new_storage(section[:@type], parent: self)
    storage.configure(section.corresponding_config_element)
    @_storages[section.usage] = StorageState.new(wrap_instance(storage), false)
  end
end

#initializeObject



94
95
96
97
98
# File 'lib/fluent/plugin_helper/storage.rb', line 94

def initialize
  super
  @_storages_started = false
  @_storages = {} # usage => storage_state
end

#shutdownObject



159
160
161
162
163
164
# File 'lib/fluent/plugin_helper/storage.rb', line 159

def shutdown
  storage_operate(:shutdown) do |s|
    s.storage.save if s.storage.save_at_shutdown
  end
  super
end

#startObject



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/fluent/plugin_helper/storage.rb', line 116

def start
  super

  @_storages_started = true
  @_storages.each_pair do |usage, s|
    s.storage.start
    s.storage.load

    if s.storage.autosave && !s.storage.persistent
      timer_execute(:storage_autosave, s.storage.autosave_interval, repeat: true) do
        begin
          s.storage.save
        rescue => e
          log.error "plugin storage failed to save its data", usage: usage, type: type, error: e
        end
      end
    end
    s.running = true
  end
end

#stopObject



148
149
150
151
152
# File 'lib/fluent/plugin_helper/storage.rb', line 148

def stop
  super
  # timer stops automatically in super
  storage_operate(:stop)
end

#storage_create(usage: '', type: nil, conf: nil, default_type: nil) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fluent/plugin_helper/storage.rb', line 32

def storage_create(usage: '', type: nil, conf: nil, default_type: nil)
  if conf && conf.respond_to?(:arg) && !conf.arg.empty?
    usage = conf.arg
  end
  if !usage.empty? && usage !~ /^[a-zA-Z][-_.a-zA-Z0-9]*$/
    raise Fluent::ConfigError, "Argument in <storage ARG> uses invalid characters: '#{usage}'"
  end

  s = @_storages[usage]
  if s && s.running
    return s.storage
  elsif s
    # storage is already created, but not loaded / started
  else # !s
    type = if type
             type
           elsif conf && conf.respond_to?(:[])
             raise Fluent::ConfigError, "@type is required in <storage>" unless conf['@type']
             conf['@type']
           elsif default_type
             default_type
           else
             raise ArgumentError, "BUG: both type and conf are not specified"
           end
    storage = Plugin.new_storage(type, parent: self)
    config = case conf
             when Fluent::Config::Element
               conf
             when Hash
               # in code, programmer may use symbols as keys, but Element needs strings
               conf = Hash[conf.map{|k,v| [k.to_s, v]}]
               Fluent::Config::Element.new('storage', usage, conf, [])
             when nil
               Fluent::Config::Element.new('storage', usage, {'@type' => type}, [])
             else
               raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'"
             end
    storage.configure(config)
    if @_storages_started
      storage.start
    end
    s = @_storages[usage] = StorageState.new(wrap_instance(storage), false)
  end

  s.storage
end

#storage_operate(method_name, &block) ⇒ Object



137
138
139
140
141
142
143
144
145
146
# File 'lib/fluent/plugin_helper/storage.rb', line 137

def storage_operate(method_name, &block)
  @_storages.each_pair do |usage, s|
    begin
      block.call(s) if block_given?
      s.storage.__send__(method_name)
    rescue => e
      log.error "unexpected error while #{method_name}", usage: usage, storage: s.storage, error: e
    end
  end
end

#terminateObject



176
177
178
179
180
# File 'lib/fluent/plugin_helper/storage.rb', line 176

def terminate
  storage_operate(:terminate)
  @_storages = {}
  super
end

#wrap_instance(storage) ⇒ Object



182
183
184
185
186
187
188
189
190
191
192
# File 'lib/fluent/plugin_helper/storage.rb', line 182

def wrap_instance(storage)
  if storage.persistent && storage.persistent_always?
    storage
  elsif storage.persistent
    PersistentWrapper.new(storage)
  elsif !storage.synchronized?
    SynchronizeWrapper.new(storage)
  else
    storage
  end
end