Class: Fluent::Plugin::ProxysqlQueryLogInput::Watcher
- Inherits:
-
Coolio::StatWatcher
- Object
- Coolio::StatWatcher
- Fluent::Plugin::ProxysqlQueryLogInput::Watcher
- Defined in:
- lib/fluent/plugin/in_proxysql_query_log/watcher.rb
Instance Method Summary collapse
- #attach(loop) ⇒ Object
- #attached? ⇒ Boolean
- #convert_time(t) ⇒ Object
- #detach ⇒ Object
- #hostname ⇒ Object
-
#initialize(path, interval, pos_storage, router, tag, log) ⇒ Watcher
constructor
A new instance of Watcher.
- #on_change(previous, current) ⇒ Object
- #read ⇒ Object
- #record(query) ⇒ Object
- #seek(path) ⇒ Object
Constructor Details
#initialize(path, interval, pos_storage, router, tag, log) ⇒ Watcher
Returns a new instance of Watcher.
5 6 7 8 9 10 11 12 13 14 15 |
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 5 def initialize(path, interval, pos_storage, router, tag, log) super(path, interval) @parser = ProxysqlQueryLog::Parser.new @pos_storage = pos_storage @router = router @tag = tag @log = log @attached = false read end |
Instance Method Details
#attach(loop) ⇒ Object
73 74 75 76 |
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 73 def attach(loop) @attached = true super end |
#attached? ⇒ Boolean
82 83 84 |
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 82 def attached? @attached end |
#convert_time(t) ⇒ Object
69 70 71 |
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 69 def convert_time(t) Time.at(t/1000/1000).utc.strftime('%Y-%m-%d %H:%M:%S') end |
#detach ⇒ Object
78 79 80 81 |
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 78 def detach @attached = false super end |
#hostname ⇒ Object
86 87 88 |
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 86 def hostname @hostname ||= Socket.gethostname end |
#on_change(previous, current) ⇒ Object
22 23 24 25 26 27 28 29 30 |
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 22 def on_change(previous, current) if current.nlink == 0 @log.debug("stop watch: #{@path} (deleted)") @pos_storage.delete(@path) detach else read end end |
#read ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 32 def read @io = File.open(@path) seek(@path) while true @pos = @io.pos raw_total_bytes = @io.read(8) return unless raw_total_bytes query = @parser.parse(@io) @router.emit(@tag, query.start_time/1000/1000, record(query)) @pos_storage.put(@path, @io.pos) end ensure @io.close end |
#record(query) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 50 def record(query) { 'thread_id' => query.thread_id, 'username' => query.username, 'schema_name' => query.schema_name, 'client' => query.client, 'HID' => query.hid, 'server' => query.server, 'start_time' => convert_time(query.start_time), 'end_time' => convert_time(query.end_time), 'duration' => query.end_time - query.start_time, 'digest' => query.digest, 'query' => query.query, 'hostname' => hostname, 'filename' => @path, 'pos' => @pos } end |
#seek(path) ⇒ Object
17 18 19 20 |
# File 'lib/fluent/plugin/in_proxysql_query_log/watcher.rb', line 17 def seek(path) cursor = @pos_storage.get(path) @io.seek(cursor, IO::SEEK_SET) if cursor end |