Module: Fluent::PluginHelper::Storage
- Includes:
- Timer
- Defined in:
- lib/fluent/plugin_helper/storage.rb
Defined Under Namespace
Modules: StorageParams
Classes: PersistentWrapper, StorageState, SynchronizeWrapper
Constant Summary
Constants included
from EventLoop
EventLoop::EVENT_LOOP_RUN_DEFAULT_TIMEOUT, EventLoop::EVENT_LOOP_SHUTDOWN_TIMEOUT
Constants included
from Thread
Thread::THREAD_DEFAULT_WAIT_SECONDS, Thread::THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS
Instance Attribute Summary collapse
Attributes included from Timer
#_timers
Attributes included from EventLoop
#_event_loop
Attributes included from Thread
#_threads
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Timer
#timer_execute, #timer_running?
Methods included from EventLoop
#event_loop_attach, #event_loop_detach, #event_loop_running?, #event_loop_wait_until_start, #event_loop_wait_until_stop
Methods included from Thread
#thread_create, #thread_current_running?, #thread_exist?, #thread_running?, #thread_started?, #thread_wait_until_start, #thread_wait_until_stop
Instance Attribute Details
#_storages ⇒ Object
92
93
94
|
# File 'lib/fluent/plugin_helper/storage.rb', line 92
def _storages
@_storages
end
|
Class Method Details
.included(mod) ⇒ Object
88
89
90
|
# File 'lib/fluent/plugin_helper/storage.rb', line 88
def self.included(mod)
mod.include StorageParams
end
|
Instance Method Details
#after_shutdown ⇒ Object
166
167
168
169
|
# File 'lib/fluent/plugin_helper/storage.rb', line 166
def after_shutdown
storage_operate(:after_shutdown)
super
end
|
#before_shutdown ⇒ Object
154
155
156
157
|
# File 'lib/fluent/plugin_helper/storage.rb', line 154
def before_shutdown
storage_operate(:before_shutdown)
super
end
|
#close ⇒ Object
171
172
173
174
|
# File 'lib/fluent/plugin_helper/storage.rb', line 171
def close
storage_operate(:close){|s| s.running = false }
super
end
|
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
# File 'lib/fluent/plugin_helper/storage.rb', line 100
def configure(conf)
super
@storage_configs.each do |section|
if !section.usage.empty? && section.usage !~ /^[a-zA-Z][-_.a-zA-Z0-9]*$/
raise Fluent::ConfigError, "Argument in <storage ARG> uses invalid characters: '#{section.usage}'"
end
if @_storages[section.usage]
raise Fluent::ConfigError, "duplicated storages configured: #{section.usage}"
end
storage = Plugin.new_storage(section[:@type], parent: self)
storage.configure(section.corresponding_config_element)
@_storages[section.usage] = StorageState.new(wrap_instance(storage), false)
end
end
|
#initialize ⇒ Object
94
95
96
97
98
|
# File 'lib/fluent/plugin_helper/storage.rb', line 94
def initialize
super
@_storages_started = false
@_storages = {} end
|
#shutdown ⇒ Object
159
160
161
162
163
164
|
# File 'lib/fluent/plugin_helper/storage.rb', line 159
def shutdown
storage_operate(:shutdown) do |s|
s.storage.save if s.storage.save_at_shutdown
end
super
end
|
#start ⇒ Object
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/fluent/plugin_helper/storage.rb', line 116
def start
super
@_storages_started = true
@_storages.each_pair do |usage, s|
s.storage.start
s.storage.load
if s.storage.autosave && !s.storage.persistent
timer_execute(:storage_autosave, s.storage.autosave_interval, repeat: true) do
begin
s.storage.save
rescue => e
log.error "plugin storage failed to save its data", usage: usage, type: type, error: e
end
end
end
s.running = true
end
end
|
#stop ⇒ Object
148
149
150
151
152
|
# File 'lib/fluent/plugin_helper/storage.rb', line 148
def stop
super
storage_operate(:stop)
end
|
#storage_create(usage: '', type: nil, conf: nil, default_type: nil) ⇒ Object
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
# File 'lib/fluent/plugin_helper/storage.rb', line 32
def storage_create(usage: '', type: nil, conf: nil, default_type: nil)
if conf && conf.respond_to?(:arg) && !conf.arg.empty?
usage = conf.arg
end
if !usage.empty? && usage !~ /^[a-zA-Z][-_.a-zA-Z0-9]*$/
raise Fluent::ConfigError, "Argument in <storage ARG> uses invalid characters: '#{usage}'"
end
s = @_storages[usage]
if s && s.running
return s.storage
elsif s
else type = if type
type
elsif conf && conf.respond_to?(:[])
raise Fluent::ConfigError, "@type is required in <storage>" unless conf['@type']
conf['@type']
elsif default_type
default_type
else
raise ArgumentError, "BUG: both type and conf are not specified"
end
storage = Plugin.new_storage(type, parent: self)
config = case conf
when Fluent::Config::Element
conf
when Hash
conf = Hash[conf.map{|k,v| [k.to_s, v]}]
Fluent::Config::Element.new('storage', usage, conf, [])
when nil
Fluent::Config::Element.new('storage', usage, {'@type' => type}, [])
else
raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'"
end
storage.configure(config)
if @_storages_started
storage.start
end
s = @_storages[usage] = StorageState.new(wrap_instance(storage), false)
end
s.storage
end
|
#storage_operate(method_name, &block) ⇒ Object
137
138
139
140
141
142
143
144
145
146
|
# File 'lib/fluent/plugin_helper/storage.rb', line 137
def storage_operate(method_name, &block)
@_storages.each_pair do |usage, s|
begin
block.call(s) if block_given?
s.storage.__send__(method_name)
rescue => e
log.error "unexpected error while #{method_name}", usage: usage, storage: s.storage, error: e
end
end
end
|
#terminate ⇒ Object
176
177
178
179
180
|
# File 'lib/fluent/plugin_helper/storage.rb', line 176
def terminate
storage_operate(:terminate)
@_storages = {}
super
end
|
#wrap_instance(storage) ⇒ Object
182
183
184
185
186
187
188
189
190
191
192
|
# File 'lib/fluent/plugin_helper/storage.rb', line 182
def wrap_instance(storage)
if storage.persistent && storage.persistent_always?
storage
elsif storage.persistent
PersistentWrapper.new(storage)
elsif !storage.synchronized?
SynchronizeWrapper.new(storage)
else
storage
end
end
|