Class: Fluent::Plugin::EverySenseOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::EverySenseOutput
- Includes:
- EverySenseProxy
- Defined in:
- lib/fluent/plugin/out_everysense.rb
Overview
EverySenseOutput output data to EverySense server this module assumes the input format follows everysense output specification
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
This method is called before starting.
- #force_type(value, out_sensor) ⇒ Object
- #get_out_sensor_by_index(index) ⇒ Object
- #get_out_sensor_by_name(input_name) ⇒ Object
- #prefer_buffered_processing ⇒ Object
- #process(tag, es) ⇒ Object
-
#put_event_stream(tag, es) ⇒ Object
Emitted record inside fluentd network [ {“farm_uuid”: “XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX”, “device”: [ { “farm_uuid”: “XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX”, “sensor_name”: “collection_data_1”, “data_class_name”: “AirTemperature”, “data”: { “at”: “2016-05-12 21:38:52 UTC”, “memo”: null, “value”: 23, “unit”: “degree Celsius” } }, { “farm_uuid”: “XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX”, “sensor_name”: “collection_data_2”, “data_class_name”: “AirHygrometer”, “data”: { “at”: “2016-05-12 21:38:52 UTC”, “memo”: null, “value”: 30, “unit”: “%RH” } } ] } ].
-
#shutdown ⇒ Object
This method is called when shutting down.
-
#start ⇒ Object
This method is called when starting.
-
#transform_in_device(in_device) ⇒ Object
Assumed input message format is as follows.
-
#transform_in_sensor(in_sensor, out_sensor) ⇒ Object
output message format of EverySense is as follows.
- #write(chunk) ⇒ Object
Methods included from EverySenseProxy
#create_session, #create_session_request, #delete_session, #delete_session_request, #error_handler, #get_messages, #get_messages_params, #get_messages_request, #put_message, #put_message_request, #shutdown_proxy, #start_proxy, #target_path, #valid_session?
Instance Method Details
#configure(conf) ⇒ Object
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/fluent/plugin/out_everysense.rb', line 60 def configure(conf) super compat_parameters_convert(conf, :formatter, :inject, :buffer, default_chunk_key: "time") formatter_config = conf.elements(name: 'format').first @formatter = formatter_create(conf: formatter_config) @has_buffer_section = conf.elements(name: 'buffer').size > 0 @out_sensors = {} @sensors.each do |sensor| if sensor.input_name.nil? @out_sensors[sensor.output_name] = sensor else @out_sensors[sensor.input_name] = sensor end end log.debug @out_sensors.inspect end |
#force_type(value, out_sensor) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fluent/plugin/out_everysense.rb', line 88 def force_type(value, out_sensor) case out_sensor.type_of_value when "Integer" return value.to_i when "Float" return value.to_f when "String" return value.to_s else return value.to_i end end |
#get_out_sensor_by_index(index) ⇒ Object
136 137 138 |
# File 'lib/fluent/plugin/out_everysense.rb', line 136 def get_out_sensor_by_index(index) @out_sensors[@out_sensors.keys[index]] end |
#get_out_sensor_by_name(input_name) ⇒ Object
132 133 134 |
# File 'lib/fluent/plugin/out_everysense.rb', line 132 def get_out_sensor_by_name(input_name) @out_sensors[input_name] end |
#prefer_buffered_processing ⇒ Object
84 85 86 |
# File 'lib/fluent/plugin/out_everysense.rb', line 84 def prefer_buffered_processing @has_buffer_section end |
#process(tag, es) ⇒ Object
225 226 227 |
# File 'lib/fluent/plugin/out_everysense.rb', line 225 def process(tag, es) put_event_stream(tag, es) end |
#put_event_stream(tag, es) ⇒ Object
Emitted record inside fluentd network [
{"farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"device":
[
{
"farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"sensor_name": "collection_data_1",
"data_class_name": "AirTemperature",
"data": {
"at": "2016-05-12 21:38:52 UTC",
"memo": null,
"value": 23,
"unit": "degree Celsius"
}
},
{
"farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"sensor_name": "collection_data_2",
"data_class_name": "AirHygrometer",
"data": {
"at": "2016-05-12 21:38:52 UTC",
"memo": null,
"value": 30,
"unit": "%RH"
}
}
]
}
]
217 218 219 220 221 222 223 |
# File 'lib/fluent/plugin/out_everysense.rb', line 217 def put_event_stream(tag, es) es = inject_values_to_event_stream(tag, es) es.each do |time, record| log.debug "#{tag}, #{record}" (@formatter.format(tag, time, transform_in_device(record["device"]))) end end |
#shutdown ⇒ Object
This method is called when shutting down. Shutdown the thread and close sockets or files here.
238 239 240 241 |
# File 'lib/fluent/plugin/out_everysense.rb', line 238 def shutdown shutdown_proxy super end |
#start ⇒ Object
This method is called when starting. Open sockets or files here.
79 80 81 82 |
# File 'lib/fluent/plugin/out_everysense.rb', line 79 def start super start_proxy end |
#transform_in_device(in_device) ⇒ Object
Assumed input message format is as follows
[
[
{
"farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"sensor_name": "collection_data_1",
"data_class_name": "AirTemperature",
"data": {
"at": "2016-05-12 21:38:52 UTC",
"memo": null,
"value": 23,
"unit": "degree Celsius"
}
},
{
"farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"sensor_name": "collection_data_2",
"data_class_name": "AirHygrometer",
"data": {
"at": "2016-05-12 21:38:52 UTC",
"memo": null,
"value": 30,
"unit": "%RH"
}
}
]
]
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/fluent/plugin/out_everysense.rb', line 168 def transform_in_device(in_device) if in_device[0]["sensor_name"].nil? # if first input data does not include sensor_name, # output_names are used in the specified order. return in_device.map.with_index do |in_sensor, i| if !get_out_sensor_by_index(i).nil? transform_in_sensor(in_sensor, get_out_sensor_by_index(i)) end end.compact else return in_device.map do |in_sensor| #log.debug in_sensor["sensor_name"] if @out_sensors.keys.include?(in_sensor["sensor_name"]) transform_in_sensor(in_sensor, get_out_sensor_by_name(in_sensor["sensor_name"])) end end.compact end end |
#transform_in_sensor(in_sensor, out_sensor) ⇒ Object
output message format of EverySense is as follows
[
{
"data": {
"at":"2016-04-14 17:15:00 +0900",
"unit":"degree Celsius",
"value":23
},
"sensor_name":"FESTIVAL_Test1_Sensor"
},
{
"data": {
"at":"2016-04-14 17:15:00 +0900",
"unit":"%RH",
"value":30
},
"sensor_name":"FESTIVAL_Test1_Sensor2"
}
]
121 122 123 124 125 126 127 128 129 130 |
# File 'lib/fluent/plugin/out_everysense.rb', line 121 def transform_in_sensor(in_sensor, out_sensor) # modify sensor_name { data: { at: Time.parse(in_sensor["data"]["at"]), unit: in_sensor["data"]["unit"], value: force_type(in_sensor["data"]["value"], out_sensor) }, sensor_name: out_sensor.output_name } end |
#write(chunk) ⇒ Object
229 230 231 232 233 234 |
# File 'lib/fluent/plugin/out_everysense.rb', line 229 def write(chunk) return if chunk.empty? tag = chunk..tag put_event_stream(tag, es) end |