Class: Fluent::Plugin::SampleInput
- Inherits:
-
Input
- Object
- Base
- Input
- Fluent::Plugin::SampleInput
show all
- Defined in:
- lib/fluent/plugin/in_sample.rb
Constant Summary
collapse
- BIN_NUM =
10
- DEFAULT_STORAGE_TYPE =
'local'
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary
#log
Attributes inherited from Base
#under_plugin_development
Instance Method Summary
collapse
Methods inherited from Input
#emit_records, #emit_size, #metric_callback, #statistics
included
included, #terminate
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop
Methods inherited from Base
#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?
#system_config, #system_config_override
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
Returns a new instance of SampleInput.
61
62
63
64
|
# File 'lib/fluent/plugin/in_sample.rb', line 61
def initialize
super
@storage = nil
end
|
Instance Method Details
66
67
68
69
70
71
|
# File 'lib/fluent/plugin/in_sample.rb', line 66
def configure(conf)
super
@sample_index = 0
config = conf.elements.find{|e| e.name == 'storage' }
@storage = storage_create(usage: 'suspend', conf: config, default_type: DEFAULT_STORAGE_TYPE)
end
|
#emit(num) ⇒ Object
108
109
110
111
112
113
114
115
116
117
118
119
120
|
# File 'lib/fluent/plugin/in_sample.rb', line 108
def emit(num)
begin
if @size > 1
num.times do
router.emit_array(@tag, Array.new(@size) { [Fluent::EventTime.now, generate] })
end
else
num.times { router.emit(@tag, Fluent::EventTime.now, generate) }
end
rescue => _
end
end
|
#generate ⇒ Object
131
132
133
134
135
136
137
138
|
# File 'lib/fluent/plugin/in_sample.rb', line 131
def generate
d = next_sample
if @auto_increment_key
d = d.dup if @reuse_record
d[@auto_increment_key] = @storage.update(:auto_increment_value){|v| v + 1 }
end
d
end
|
#multi_workers_ready? ⇒ Boolean
73
74
75
|
# File 'lib/fluent/plugin/in_sample.rb', line 73
def multi_workers_ready?
true
end
|
#next_sample ⇒ Object
122
123
124
125
126
127
128
129
|
# File 'lib/fluent/plugin/in_sample.rb', line 122
def next_sample
d = @reuse_record ? @sample[@sample_index] : @sample[@sample_index].dup
@sample_index += 1
return d if d
@sample_index = 0
next_sample
end
|
#run ⇒ Object
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/fluent/plugin/in_sample.rb', line 91
def run
batch_num = (@rate / BIN_NUM).to_i
residual_num = (@rate % BIN_NUM)
while thread_current_running?
current_time = Time.now.to_i
BIN_NUM.times do
break unless (thread_current_running? && Time.now.to_i <= current_time)
wait(0.1) { emit(batch_num) }
end
emit(residual_num) if thread_current_running?
while thread_current_running? && Time.now.to_i <= current_time
sleep 0.01
end
end
end
|
#start ⇒ Object
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/fluent/plugin/in_sample.rb', line 77
def start
super
@storage.put(:increment_value, 0) unless @storage.get(:increment_value)
@storage.put(:dummy_index, 0) unless @storage.get(:dummy_index)
if @auto_increment_key && !@storage.get(:auto_increment_value)
@storage.put(:auto_increment_value, -1)
end
thread_create(:sample_input, &method(:run))
end
|
#wait(time) ⇒ Object
140
141
142
143
144
145
|
# File 'lib/fluent/plugin/in_sample.rb', line 140
def wait(time)
start_time = Time.now
yield
sleep_time = time - (Time.now - start_time)
sleep sleep_time if sleep_time > 0
end
|