Class: Fluent::Plugin::SampleInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_sample.rb

Constant Summary collapse

BIN_NUM =
10
DEFAULT_STORAGE_TYPE =
'local'

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary

Attributes included from Fluent::PluginLoggerMixin

#log

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods inherited from Input

#emit_records, #emit_size, #metric_callback, #statistics

Methods included from Fluent::PluginHelper::Mixin

included

Methods included from Fluent::PluginLoggerMixin

included, #terminate

Methods included from Fluent::PluginId

#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop

Methods inherited from Base

#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type

Constructor Details

#initializeSampleInput

Returns a new instance of SampleInput.



61
62
63
64
# File 'lib/fluent/plugin/in_sample.rb', line 61

def initialize
  super
  @storage = nil
end

Instance Method Details

#configure(conf) ⇒ Object



66
67
68
69
70
71
# File 'lib/fluent/plugin/in_sample.rb', line 66

def configure(conf)
  super
  @sample_index = 0
  config = conf.elements.find{|e| e.name == 'storage' }
  @storage = storage_create(usage: 'suspend', conf: config, default_type: DEFAULT_STORAGE_TYPE)
end

#emit(num) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/fluent/plugin/in_sample.rb', line 108

def emit(num)
  begin
    if @size > 1
      num.times do
        router.emit_array(@tag, Array.new(@size) { [Fluent::EventTime.now, generate] })
      end
    else
      num.times { router.emit(@tag, Fluent::EventTime.now, generate) }
    end
  rescue => _
    # ignore all errors not to stop emits by emit errors
  end
end

#generateObject



131
132
133
134
135
136
137
138
# File 'lib/fluent/plugin/in_sample.rb', line 131

def generate
  d = next_sample
  if @auto_increment_key
    d = d.dup if @reuse_record
    d[@auto_increment_key] = @storage.update(:auto_increment_value){|v| v + 1 }
  end
  d
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


73
74
75
# File 'lib/fluent/plugin/in_sample.rb', line 73

def multi_workers_ready?
  true
end

#next_sampleObject



122
123
124
125
126
127
128
129
# File 'lib/fluent/plugin/in_sample.rb', line 122

def next_sample
  d = @reuse_record ? @sample[@sample_index] : @sample[@sample_index].dup
  @sample_index += 1
  return d if d

  @sample_index = 0
  next_sample
end

#runObject



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/fluent/plugin/in_sample.rb', line 91

def run
  batch_num    = (@rate / BIN_NUM).to_i
  residual_num = (@rate % BIN_NUM)
  while thread_current_running?
    current_time = Time.now.to_i
    BIN_NUM.times do
      break unless (thread_current_running? && Time.now.to_i <= current_time)
      wait(0.1) { emit(batch_num) }
    end
    emit(residual_num) if thread_current_running?
    # wait for next second
    while thread_current_running? && Time.now.to_i <= current_time
      sleep 0.01
    end
  end
end

#startObject



77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin/in_sample.rb', line 77

def start
  super

  @storage.put(:increment_value, 0) unless @storage.get(:increment_value)
  # keep 'dummy' to avoid breaking changes for existing environment. Change it in fluentd v2
  @storage.put(:dummy_index, 0) unless @storage.get(:dummy_index)

  if @auto_increment_key && !@storage.get(:auto_increment_value)
    @storage.put(:auto_increment_value, -1)
  end

  thread_create(:sample_input, &method(:run))
end

#wait(time) ⇒ Object



140
141
142
143
144
145
# File 'lib/fluent/plugin/in_sample.rb', line 140

def wait(time)
  start_time = Time.now
  yield
  sleep_time = time - (Time.now - start_time)
  sleep sleep_time if sleep_time > 0
end