Class: Fluent::Plugin::SakuraIOInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_sakuraio.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



17
18
19
20
21
# File 'lib/fluent/plugin/in_sakuraio.rb', line 17

def configure(conf)
  super

  @time_parser = Fluent::TimeParser.new(nil)
end

#ensure_reactor_runningObject



30
31
32
33
34
35
36
# File 'lib/fluent/plugin/in_sakuraio.rb', line 30

def ensure_reactor_running
  return if EM.reactor_running?

  thread_create(:in_sakuraio_reactor) do
    EM.run
  end
end

#handle_message(event) ⇒ Object



68
69
70
71
72
73
74
75
76
# File 'lib/fluent/plugin/in_sakuraio.rb', line 68

def handle_message(event)
  log.debug "sakuraio: received message #{event.data}"
  records = parse(event.data)
  return if records.empty?

  records.each do |r|
    router.emit(r['tag'], r['time'], r['record'])
  end
end

#parse(text) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/fluent/plugin/in_sakuraio.rb', line 78

def parse(text)
  parser = Yajl::Parser.new
  j = parser.parse(text)
  records = []
  case j['type']
  when 'connection'
    parse_connection(records, j)
  when 'location'
    parse_location(records, j)
  when 'channels'
    parse_channels(records, j)
  else
    log.debug "unknown type: #{j['type']}: #{text}"
  end
  records
end

#parse_channel(mod, msg_time, chan) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/fluent/plugin/in_sakuraio.rb', line 135

def parse_channel(mod, msg_time, chan)
  {
    'tag' => mod + '.channels.' + chan['channel'].to_s,
    'record' => {
      'module' => mod,
      'channel' => chan['channel'],
      'type' => chan['type'],
      'value' => chan['value']
    },
    'time' => @time_parser.parse(chan['datetime']) || msg_time
  }
end

#parse_channels(records, data) ⇒ Object



126
127
128
129
130
131
132
133
# File 'lib/fluent/plugin/in_sakuraio.rb', line 126

def parse_channels(records, data)
  msg_time = @time_parser.parse(data['datetime'])
  mod = data['module']
  data['payload']['channels'].each do |c|
    records.push(parse_channel(mod, msg_time, c))
  end
  records
end

#parse_connection(records, data) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/fluent/plugin/in_sakuraio.rb', line 95

def parse_connection(records, data)
  record = {
    'tag' => data['module'] + '.connection',
    'record' => {
      'module' => data['module'],
      'is_online' => data['payload']['is_online']
    },
    'time' => @time_parser.parse(data['datetime'])
  }
  records.push(record)
  records
end

#parse_location(records, data) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/fluent/plugin/in_sakuraio.rb', line 108

def parse_location(records, data)
  c = data['payload']['coordinate']
  if c != 'null'
    record = {
      'tag' => data['module'] + '.location',
      'record' => {
        'module' => data['module'],
        'latitude' => c['latitude'],
        'longitude' => c['longitude'],
        'range_m' => c['range_m']
      },
      'time' => @time_parser.parse(data['datetime'])
    }
    records.push(record)
  end
  records
end

#runObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fluent/plugin/in_sakuraio.rb', line 44

def run
  options = {}
  options[:ping] = @ping if @ping.positive?
  client = Faye::WebSocket::Client.new(@url, nil, options)
  EM.next_tick do
    client.on :open do
      log.info "sakuraio: starting websocket connection for #{@url}."
    end

    client.on :message do |event|
      handle_message(event)
    end

    client.on :error do |event|
      log.warn "sakuraio: #{event.message}"
    end

    client.on :close do |event|
      log.warn "sakuraio: #{event.code} #{event.reason}"
      run
    end
  end
end

#shutdownObject



38
39
40
41
42
# File 'lib/fluent/plugin/in_sakuraio.rb', line 38

def shutdown
  EM.stop if EM.reactor_running?

  super
end

#startObject



23
24
25
26
27
28
# File 'lib/fluent/plugin/in_sakuraio.rb', line 23

def start
  super

  ensure_reactor_running
  thread_create(:in_sakuraio, &method(:run))
end