Class: HistoryCommander
- Inherits:
-
EventMachine::FileTail
- Object
- EventMachine::FileTail
- HistoryCommander
- Defined in:
- lib/history_commander.rb
Instance Attribute Summary collapse
-
#pause ⇒ Object
Returns the value of attribute pause.
-
#uuid ⇒ Object
Returns the value of attribute uuid.
Instance Method Summary collapse
-
#initialize(path, startpos = -1,, mode = "full") ⇒ HistoryCommander
constructor
path <~String> File path to monitor mode <~String> Can be set to “full” for read/write mode, and any other value for write only mode.
-
#receive_data(data) ⇒ Object
Receive data from the FileTail and submit it to the MQ.
- #safe_path ⇒ Object
- #schedule_next_read ⇒ Object
- #skip_ahead ⇒ Object
-
#subscribe ⇒ Object
Subscribe to the global history exchange and sync the history file with any new inbound global history.
Constructor Details
#initialize(path, startpos = -1,, mode = "full") ⇒ HistoryCommander
path <~String> File path to monitor mode <~String> Can be set to “full” for read/write mode, and any other value for write only mode. startpos <~Integer> File position to start tailing the file. Default of -1 starts at the end of the file
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/history_commander.rb', line 21 def initialize(path, startpos=-1, mode="full") super(path, startpos) FileUtils.cp(path, safe_path) @buffer = BufferedTokenizer.new @global_history_fanout = MQ.new.fanout('global_history') @uuid = UUIDTools::UUID.random_create.to_s @host = `hostname`.chomp @user = `whoami`.chomp @pause = false subscribe if mode == "full" end |
Instance Attribute Details
#pause ⇒ Object
Returns the value of attribute pause.
12 13 14 |
# File 'lib/history_commander.rb', line 12 def pause @pause end |
#uuid ⇒ Object
Returns the value of attribute uuid.
11 12 13 |
# File 'lib/history_commander.rb', line 11 def uuid @uuid end |
Instance Method Details
#receive_data(data) ⇒ Object
Receive data from the FileTail and submit it to the MQ
35 36 37 38 39 40 41 42 43 |
# File 'lib/history_commander.rb', line 35 def receive_data(data) @buffer.extract(data).each do |line| payload = { :uuid => @uuid, :message => line, :host => @host, :user => @user } @global_history_fanout.publish(payload.to_json) end end |
#safe_path ⇒ Object
14 15 16 |
# File 'lib/history_commander.rb', line 14 def safe_path "#{path}_safe" end |
#schedule_next_read ⇒ Object
45 46 47 48 49 50 51 |
# File 'lib/history_commander.rb', line 45 def schedule_next_read unless @pause EventMachine::add_timer(@naptime) do read end end end |
#skip_ahead ⇒ Object
53 54 55 56 57 |
# File 'lib/history_commander.rb', line 53 def skip_ahead def skip_ahead @pos = @file.sysseek(0, IO::SEEK_END) end end |
#subscribe ⇒ Object
Subscribe to the global history exchange and sync the history file with any new inbound global history. Pauses FileTail and skips the output when writing to the history file.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/history_commander.rb', line 60 def subscribe @subscription = MQ.new @subscription.queue(@uuid).bind(@subscription.fanout('global_history')).subscribe do |result| x = Mash.new(JSON::parse(result)) puts "received: #{x[:uuid]} #{x[:user]}@#{x[:host]}$ #{x[:message]}" if x[:uuid] != @uuid @pause = true File.open(path, "a") {|f| f.puts(x[:message])} skip_ahead @pause = false schedule_next_read end end end |