Class: Fluent::Plugin::ProxysqlQueryLogInput::Watcher

Inherits:
Coolio::StatWatcher
  • Object
show all
Defined in:
lib/fluent/plugin/in_proxysql_query_log/watcher.rb

Instance Method Summary collapse

Constructor Details

#initialize(path, interval, pos_storage, router, tag, log) ⇒ Watcher

Returns a new instance of Watcher.



5
6
7
8
9
10
11
12
13
14
15
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 5

def initialize(path, interval, pos_storage, router, tag, log)
  super(path, interval)

  @parser = ProxysqlQueryLog::Parser.new
  @pos_storage = pos_storage
  @router = router
  @tag = tag
  @log = log
  @attached = false
  read
end

Instance Method Details

#attach(loop) ⇒ Object



73
74
75
76
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 73

def attach(loop)
  @attached = true
  super
end

#attached?Boolean

Returns:

  • (Boolean)


82
83
84
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 82

def attached?
  @attached
end

#convert_time(t) ⇒ Object



69
70
71
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 69

def convert_time(t)
  Time.at(t/1000/1000).utc.strftime('%Y-%m-%d %H:%M:%S')
end

#detachObject



78
79
80
81
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 78

def detach
  @attached = false
  super
end

#hostnameObject



86
87
88
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 86

def hostname
  @hostname ||= Socket.gethostname
end

#on_change(previous, current) ⇒ Object



22
23
24
25
26
27
28
29
30
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 22

def on_change(previous, current)
  if current.nlink == 0
    @log.debug("stop watch: #{@path} (deleted)")
    @pos_storage.delete(@path)
    detach
  else
    read
  end
end

#readObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 32

def read
  @io = File.open(@path)
  seek(@path)

  while true
    @pos = @io.pos
    raw_total_bytes = @io.read(8)
    return unless raw_total_bytes

    query = @parser.parse(@io)
    @router.emit(@tag, query.start_time/1000/1000, record(query))
    @pos_storage.put(@path, @io.pos)
  end

ensure
  @io.close
end

#record(query) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 50

def record(query)
  {
      'thread_id' => query.thread_id,
      'username' => query.username,
      'schema_name' => query.schema_name,
      'client' => query.client,
      'HID' => query.hid,
      'server' => query.server,
      'start_time' => convert_time(query.start_time),
      'end_time' => convert_time(query.end_time),
      'duration' => query.end_time - query.start_time,
      'digest' => query.digest,
      'query' => query.query,
      'hostname' => hostname,
      'filename' => @path,
      'pos' => @pos
  }
end

#seek(path) ⇒ Object



17
18
19
20
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 17

def seek(path)
  cursor = @pos_storage.get(path)
  @io.seek(cursor, IO::SEEK_SET) if cursor
end