Class: Fluent::Plugin::LocalStorage

Inherits:
Storage show all
Defined in:
lib/fluent/plugin/storage_local.rb

Constant Summary

Constants inherited from Storage

Storage::DEFAULT_TYPE

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Storage

#log

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods inherited from Storage

#implementation, #persistent_always?, #synchronized?, validate_key

Methods included from OwnedByMixin

#log, #owner, #owner=

Methods inherited from Base

#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #start, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type

Constructor Details

#initializeLocalStorage

Returns a new instance of LocalStorage.



40
41
42
43
44
# File 'lib/fluent/plugin/storage_local.rb', line 40

def initialize
  super
  @store = {}
  @multi_workers_available = nil
end

Instance Attribute Details

#storeObject (readonly)

for test



38
39
40
# File 'lib/fluent/plugin/storage_local.rb', line 38

def store
  @store
end

Instance Method Details

#configure(conf) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/fluent/plugin/storage_local.rb', line 46

def configure(conf)
  super

  @on_memory = false
  if @path
    if File.exist?(@path) && File.file?(@path)
      @multi_workers_available = false
    elsif File.exist?(@path) && File.directory?(@path)
      @path = File.join(@path, "worker#{fluentd_worker_id}", "storage.json")
      @multi_workers_available = true
    else # path file/directory doesn't exist
      if @path.end_with?('.json') # file
        @multi_workers_available = false
      else # directory
        @path = File.join(@path, "worker#{fluentd_worker_id}", "storage.json")
        @multi_workers_available = true
      end
    end
  elsif root_dir = owner.plugin_root_dir
    basename = (conf.arg && !conf.arg.empty?) ? "storage.#{conf.arg}.json" : "storage.json"
    @path = File.join(root_dir, basename)
    @multi_workers_available = true
  else
    if @persistent
      raise Fluent::ConfigError, "Plugin @id or path for <storage> required when 'persistent' is true"
    else
      if @autosave
        log.warn "both of Plugin @id and path for <storage> are not specified. Using on-memory store."
      else
        log.info "both of Plugin @id and path for <storage> are not specified. Using on-memory store."
      end
      @on_memory = true
      @multi_workers_available = true
    end
  end

  if !@on_memory
    dir = File.dirname(@path)
    FileUtils.mkdir_p(dir, mode: @dir_mode) unless Dir.exist?(dir)
    if File.exist?(@path)
      raise Fluent::ConfigError, "Plugin storage path '#{@path}' is not readable/writable" unless File.readable?(@path) && File.writable?(@path)
      begin
        data = File.open(@path, 'r:utf-8') { |io| io.read }
        if data.empty?
          log.warn "detect empty plugin storage file during startup. Ignored: #{@path}"
          return
        end
        data = Yajl::Parser.parse(data)
        raise Fluent::ConfigError, "Invalid contents (not object) in plugin storage file: '#{@path}'" unless data.is_a?(Hash)
      rescue => e
        log.error "failed to read data from plugin storage file", path: @path, error: e
        raise Fluent::ConfigError, "Unexpected error: failed to read data from plugin storage file: '#{@path}'"
      end
    else
      raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{@path}'" unless File.stat(dir).writable?
    end
  end
end

#delete(key) ⇒ Object



153
154
155
# File 'lib/fluent/plugin/storage_local.rb', line 153

def delete(key)
  @store.delete(key.to_s)
end

#fetch(key, defval) ⇒ Object



145
146
147
# File 'lib/fluent/plugin/storage_local.rb', line 145

def fetch(key, defval)
  @store.fetch(key.to_s, defval)
end

#get(key) ⇒ Object



141
142
143
# File 'lib/fluent/plugin/storage_local.rb', line 141

def get(key)
  @store[key.to_s]
end

#loadObject



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/fluent/plugin/storage_local.rb', line 112

def load
  return if @on_memory
  return unless File.exist?(@path)
  begin
    json_string = File.open(@path, 'r:utf-8'){ |io| io.read }
    json = Yajl::Parser.parse(json_string)
    unless json.is_a?(Hash)
      log.error "broken content for plugin storage (Hash required: ignored)", type: json.class
      log.debug "broken content", content: json_string
      return
    end
    @store = json
  rescue => e
    log.error "failed to load data for plugin storage from file", path: @path, error: e
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


105
106
107
108
109
110
# File 'lib/fluent/plugin/storage_local.rb', line 105

def multi_workers_ready?
  unless @multi_workers_available
    log.error "local plugin storage with multi workers should be configured to use directory 'path', or system root_dir and plugin id"
  end
  @multi_workers_available
end

#put(key, value) ⇒ Object



149
150
151
# File 'lib/fluent/plugin/storage_local.rb', line 149

def put(key, value)
  @store[key.to_s] = value
end

#saveObject



129
130
131
132
133
134
135
136
137
138
139
# File 'lib/fluent/plugin/storage_local.rb', line 129

def save
  return if @on_memory
  tmp_path = @path + '.tmp.' + Fluent::UniqueId.hex(Fluent::UniqueId.generate)
  begin
    json_string = Yajl::Encoder.encode(@store, pretty: @pretty_print)
    File.open(tmp_path, 'w:utf-8', @mode) { |io| io.write json_string; io.fsync }
    File.rename(tmp_path, @path)
  rescue => e
    log.error "failed to save data for plugin storage to file", path: @path, tmp: tmp_path, error: e
  end
end

#update(key, &block) ⇒ Object



157
158
159
# File 'lib/fluent/plugin/storage_local.rb', line 157

def update(key, &block)
  @store[key.to_s] = block.call(@store[key.to_s])
end