Class: Fluent::Plugin::EverySenseOutput

Inherits:
Output
  • Object
show all
Includes:
EverySenseProxy
Defined in:
lib/fluent/plugin/out_everysense.rb

Overview

EverySenseOutput output data to EverySense server this module assumes the input format follows everysense output specification

Instance Method Summary collapse

Methods included from EverySenseProxy

#create_session, #create_session_request, #delete_session, #delete_session_request, #error_handler, #get_messages, #get_messages_params, #get_messages_request, #put_message, #put_message_request, #shutdown_proxy, #start_proxy, #target_path, #valid_session?

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/out_everysense.rb', line 60

def configure(conf)
  super
  compat_parameters_convert(conf, :formatter, :inject, :buffer, default_chunk_key: "time")
  formatter_config = conf.elements(name: 'format').first
  @formatter = formatter_create(conf: formatter_config)
  @has_buffer_section = conf.elements(name: 'buffer').size > 0
  @out_sensors = {}
  @sensors.each do |sensor|
    if sensor.input_name.nil?
      @out_sensors[sensor.output_name] = sensor
    else
      @out_sensors[sensor.input_name] = sensor
    end
  end
  log.debug @out_sensors.inspect
end

#force_type(value, out_sensor) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/out_everysense.rb', line 88

def force_type(value, out_sensor)
  case out_sensor.type_of_value
  when "Integer"
    return value.to_i
  when "Float"
    return value.to_f
  when "String"
    return value.to_s
  else
    return value.to_i
  end
end

#get_out_sensor_by_index(index) ⇒ Object



136
137
138
# File 'lib/fluent/plugin/out_everysense.rb', line 136

def get_out_sensor_by_index(index)
  @out_sensors[@out_sensors.keys[index]]
end

#get_out_sensor_by_name(input_name) ⇒ Object



132
133
134
# File 'lib/fluent/plugin/out_everysense.rb', line 132

def get_out_sensor_by_name(input_name)
  @out_sensors[input_name]
end

#prefer_buffered_processingObject



84
85
86
# File 'lib/fluent/plugin/out_everysense.rb', line 84

def prefer_buffered_processing
  @has_buffer_section
end

#process(tag, es) ⇒ Object



225
226
227
# File 'lib/fluent/plugin/out_everysense.rb', line 225

def process(tag, es)
  put_event_stream(tag, es)
end

#put_event_stream(tag, es) ⇒ Object

Emitted record inside fluentd network [

{"farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
 "device":
  [
    {
      "farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
      "sensor_name": "collection_data_1",
      "data_class_name": "AirTemperature",
      "data": {
        "at": "2016-05-12 21:38:52 UTC",
        "memo": null,
        "value": 23,
        "unit": "degree Celsius"
      }
    },
    {
      "farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
      "sensor_name": "collection_data_2",
      "data_class_name": "AirHygrometer",
      "data": {
        "at": "2016-05-12 21:38:52 UTC",
        "memo": null,
        "value": 30,
        "unit": "%RH"
      }
    }
  ]
}

]



217
218
219
220
221
222
223
# File 'lib/fluent/plugin/out_everysense.rb', line 217

def put_event_stream(tag, es)
  es = inject_values_to_event_stream(tag, es)
  es.each do |time, record|
    log.debug "#{tag}, #{record}"
    put_message(@formatter.format(tag, time, transform_in_device(record["device"])))
  end
end

#shutdownObject

This method is called when shutting down. Shutdown the thread and close sockets or files here.



238
239
240
241
# File 'lib/fluent/plugin/out_everysense.rb', line 238

def shutdown
  shutdown_proxy
  super
end

#startObject

This method is called when starting. Open sockets or files here.



79
80
81
82
# File 'lib/fluent/plugin/out_everysense.rb', line 79

def start
  super
  start_proxy
end

#transform_in_device(in_device) ⇒ Object

Assumed input message format is as follows

[

[
  {
    "farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "sensor_name": "collection_data_1",
    "data_class_name": "AirTemperature",
    "data": {
      "at": "2016-05-12 21:38:52 UTC",
      "memo": null,
      "value": 23,
      "unit": "degree Celsius"
    }
  },
  {
    "farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "sensor_name": "collection_data_2",
    "data_class_name": "AirHygrometer",
    "data": {
      "at": "2016-05-12 21:38:52 UTC",
      "memo": null,
      "value": 30,
      "unit": "%RH"
    }
  }
]

]



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/fluent/plugin/out_everysense.rb', line 168

def transform_in_device(in_device)
  if in_device[0]["sensor_name"].nil?
    # if first input data does not include sensor_name,
    # output_names are used in the specified order.
    return in_device.map.with_index do |in_sensor, i|
      if !get_out_sensor_by_index(i).nil?
        transform_in_sensor(in_sensor, get_out_sensor_by_index(i))
      end
    end.compact
  else
    return in_device.map do |in_sensor|
      #log.debug in_sensor["sensor_name"]
      if @out_sensors.keys.include?(in_sensor["sensor_name"])
        transform_in_sensor(in_sensor, get_out_sensor_by_name(in_sensor["sensor_name"]))
      end
    end.compact
  end
end

#transform_in_sensor(in_sensor, out_sensor) ⇒ Object

output message format of EverySense is as follows

[

{
  "data": {
    "at":"2016-04-14 17:15:00 +0900",
    "unit":"degree Celsius",
    "value":23
  },
  "sensor_name":"FESTIVAL_Test1_Sensor"
},
{
  "data": {
    "at":"2016-04-14 17:15:00 +0900",
    "unit":"%RH",
    "value":30
  },
  "sensor_name":"FESTIVAL_Test1_Sensor2"
}

]



121
122
123
124
125
126
127
128
129
130
# File 'lib/fluent/plugin/out_everysense.rb', line 121

def transform_in_sensor(in_sensor, out_sensor) # modify sensor_name
  {
    data: {
      at: Time.parse(in_sensor["data"]["at"]),
      unit: in_sensor["data"]["unit"],
      value: force_type(in_sensor["data"]["value"], out_sensor)
    },
    sensor_name: out_sensor.output_name
  }
end

#write(chunk) ⇒ Object



229
230
231
232
233
234
# File 'lib/fluent/plugin/out_everysense.rb', line 229

def write(chunk)
  return if chunk.empty?
  tag = chunk..tag

  put_event_stream(tag, es)
end