Class: Fluent::Plugin::SampleInput
- Defined in:
- lib/fluent/plugin/in_sample.rb
Constant Summary collapse
- BIN_NUM =
10
- DEFAULT_STORAGE_TYPE =
'local'
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary
Attributes included from Fluent::PluginLoggerMixin
Attributes inherited from Base
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(num) ⇒ Object
- #generate ⇒ Object
-
#initialize ⇒ SampleInput
constructor
A new instance of SampleInput.
- #multi_workers_ready? ⇒ Boolean
- #run ⇒ Object
- #start ⇒ Object
- #wait(time) ⇒ Object
Methods inherited from Input
#emit_records, #emit_size, #metric_callback, #statistics
Methods included from Fluent::PluginHelper::Mixin
Methods included from Fluent::PluginLoggerMixin
Methods included from Fluent::PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop
Methods inherited from Base
#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Configurable
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
#initialize ⇒ SampleInput
Returns a new instance of SampleInput.
59 60 61 62 |
# File 'lib/fluent/plugin/in_sample.rb', line 59 def initialize super @storage = nil end |
Instance Method Details
#configure(conf) ⇒ Object
64 65 66 67 68 69 |
# File 'lib/fluent/plugin/in_sample.rb', line 64 def configure(conf) super @sample_index = 0 config = conf.elements.find{|e| e.name == 'storage' } @storage = storage_create(usage: 'suspend', conf: config, default_type: DEFAULT_STORAGE_TYPE) end |
#emit(num) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/fluent/plugin/in_sample.rb', line 106 def emit(num) begin if @size > 1 num.times do router.emit_array(@tag, Array.new(@size) { [Fluent::EventTime.now, generate] }) end else num.times { router.emit(@tag, Fluent::EventTime.now, generate) } end rescue => _ # ignore all errors not to stop emits by emit errors end end |
#generate ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/fluent/plugin/in_sample.rb', line 120 def generate d = @sample[@sample_index] unless d @sample_index = 0 d = @sample[@sample_index] end @sample_index += 1 if @auto_increment_key d = d.dup d[@auto_increment_key] = @storage.update(:auto_increment_value){|v| v + 1 } end d end |
#multi_workers_ready? ⇒ Boolean
71 72 73 |
# File 'lib/fluent/plugin/in_sample.rb', line 71 def multi_workers_ready? true end |
#run ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/fluent/plugin/in_sample.rb', line 89 def run batch_num = (@rate / BIN_NUM).to_i residual_num = (@rate % BIN_NUM) while thread_current_running? current_time = Time.now.to_i BIN_NUM.times do break unless (thread_current_running? && Time.now.to_i <= current_time) wait(0.1) { emit(batch_num) } end emit(residual_num) if thread_current_running? # wait for next second while thread_current_running? && Time.now.to_i <= current_time sleep 0.01 end end end |
#start ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/fluent/plugin/in_sample.rb', line 75 def start super @storage.put(:increment_value, 0) unless @storage.get(:increment_value) # keep 'dummy' to avoid breaking changes for existing environment. Change it in fluentd v2 @storage.put(:dummy_index, 0) unless @storage.get(:dummy_index) if @auto_increment_key && !@storage.get(:auto_increment_value) @storage.put(:auto_increment_value, -1) end thread_create(:sample_input, &method(:run)) end |
#wait(time) ⇒ Object
134 135 136 137 138 139 |
# File 'lib/fluent/plugin/in_sample.rb', line 134 def wait(time) start_time = Time.now yield sleep_time = time - (Time.now - start_time) sleep sleep_time if sleep_time > 0 end |