Class: Fluent::Plugin::LocalStorage

Inherits:
Storage show all
Defined in:
lib/fluent/plugin/storage_local.rb

Constant Summary collapse

DEFAULT_DIR_MODE =
0755
DEFAULT_FILE_MODE =
0644

Constants inherited from Storage

Storage::DEFAULT_TYPE

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Storage

#log

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods inherited from Storage

#implementation, #persistent_always?, #synchronized?, validate_key

Methods included from OwnedByMixin

#log, #owner, #owner=

Methods inherited from Base

#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #start, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type

Constructor Details

#initializeLocalStorage

Returns a new instance of LocalStorage.



42
43
44
45
46
# File 'lib/fluent/plugin/storage_local.rb', line 42

def initialize
  super
  @store = {}
  @multi_workers_available = nil
end

Instance Attribute Details

#storeObject (readonly)

for test



40
41
42
# File 'lib/fluent/plugin/storage_local.rb', line 40

def store
  @store
end

Instance Method Details

#configure(conf) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/storage_local.rb', line 48

def configure(conf)
  super

  @on_memory = false
  if @path
    if File.exist?(@path) && File.file?(@path)
      @multi_workers_available = false
    elsif File.exist?(@path) && File.directory?(@path)
      @path = File.join(@path, "worker#{fluentd_worker_id}", "storage.json")
      @multi_workers_available = true
    else # path file/directory doesn't exist
      if @path.end_with?('.json') # file
        @multi_workers_available = false
      else # directory
        @path = File.join(@path, "worker#{fluentd_worker_id}", "storage.json")
        @multi_workers_available = true
      end
    end
  elsif root_dir = owner.plugin_root_dir
    basename = (conf.arg && !conf.arg.empty?) ? "storage.#{conf.arg}.json" : "storage.json"
    @path = File.join(root_dir, basename)
    @multi_workers_available = true
  else
    if @persistent
      raise Fluent::ConfigError, "Plugin @id or path for <storage> required when 'persistent' is true"
    else
      if @autosave
        log.warn "both of Plugin @id and path for <storage> are not specified. Using on-memory store."
      else
        log.info "both of Plugin @id and path for <storage> are not specified. Using on-memory store."
      end
      @on_memory = true
      @multi_workers_available = true
    end
  end

  if !@on_memory
    dir = File.dirname(@path)
    FileUtils.mkdir_p(dir, mode: @dir_mode) unless Dir.exist?(dir)
    if File.exist?(@path)
      raise Fluent::ConfigError, "Plugin storage path '#{@path}' is not readable/writable" unless File.readable?(@path) && File.writable?(@path)
      begin
        data = File.open(@path, 'r:utf-8') { |io| io.read }
        if data.empty?
          log.warn "detect empty plugin storage file during startup. Ignored: #{@path}"
          return
        end
        data = Yajl::Parser.parse(data)
        raise Fluent::ConfigError, "Invalid contents (not object) in plugin storage file: '#{@path}'" unless data.is_a?(Hash)
      rescue => e
        log.error "failed to read data from plugin storage file", path: @path, error: e
        raise Fluent::ConfigError, "Unexpected error: failed to read data from plugin storage file: '#{@path}'"
      end
    else
      raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{@path}'" unless File.stat(dir).writable?
    end
  end
end

#delete(key) ⇒ Object



155
156
157
# File 'lib/fluent/plugin/storage_local.rb', line 155

def delete(key)
  @store.delete(key.to_s)
end

#fetch(key, defval) ⇒ Object



147
148
149
# File 'lib/fluent/plugin/storage_local.rb', line 147

def fetch(key, defval)
  @store.fetch(key.to_s, defval)
end

#get(key) ⇒ Object



143
144
145
# File 'lib/fluent/plugin/storage_local.rb', line 143

def get(key)
  @store[key.to_s]
end

#loadObject



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/fluent/plugin/storage_local.rb', line 114

def load
  return if @on_memory
  return unless File.exist?(@path)
  begin
    json_string = File.open(@path, 'r:utf-8'){ |io| io.read }
    json = Yajl::Parser.parse(json_string)
    unless json.is_a?(Hash)
      log.error "broken content for plugin storage (Hash required: ignored)", type: json.class
      log.debug "broken content", content: json_string
      return
    end
    @store = json
  rescue => e
    log.error "failed to load data for plugin storage from file", path: @path, error: e
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


107
108
109
110
111
112
# File 'lib/fluent/plugin/storage_local.rb', line 107

def multi_workers_ready?
  unless @multi_workers_available
    log.error "local plugin storage with multi workers should be configured to use directory 'path', or system root_dir and plugin id"
  end
  @multi_workers_available
end

#put(key, value) ⇒ Object



151
152
153
# File 'lib/fluent/plugin/storage_local.rb', line 151

def put(key, value)
  @store[key.to_s] = value
end

#saveObject



131
132
133
134
135
136
137
138
139
140
141
# File 'lib/fluent/plugin/storage_local.rb', line 131

def save
  return if @on_memory
  tmp_path = @path + '.tmp.' + Fluent::UniqueId.hex(Fluent::UniqueId.generate)
  begin
    json_string = Yajl::Encoder.encode(@store, pretty: @pretty_print)
    File.open(tmp_path, 'w:utf-8', @mode) { |io| io.write json_string; io.fsync }
    File.rename(tmp_path, @path)
  rescue => e
    log.error "failed to save data for plugin storage to file", path: @path, tmp: tmp_path, error: e
  end
end

#update(key, &block) ⇒ Object



159
160
161
# File 'lib/fluent/plugin/storage_local.rb', line 159

def update(key, &block)
  @store[key.to_s] = block.call(@store[key.to_s])
end