Class: Log2Json::MultilineLogBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/log2json/mlogbuffer.rb

Constant Summary collapse

LOG =
Logger.new(STDERR)
STDOUT_LOCK =

This class workaround the flaw that for some applications a log record may consist of mulitple lines without an end-of-record marker.

Example 1:

ERROR:12980 2013-11-05 09:46:29

IO_ENCODER

Traceback (most recent call last):

File "/opt/score_feeder/feeds/rotowire/worker.py", line 59, in process_queue
  (errorlog, rs), processed = method(xmldoc), True
File "/opt/score_feeder/feeds/__init__.py", line 206, in handle
  results = fn(*args, **kwargs)
File "/opt/score_feeder/feeds/__init__.py", line 229, in handle
  results = fn(*args, **kwargs)
File "/opt/score_feeder/feeds/__init__.py", line 141, in handle
  results = fn(*args, **kwargs)
File "/opt/score_feeder/feeds/__init__.py", line 261, in handle
  results = fn(*args, **kwargs)
File "/opt/score_feeder/feeds/__init__.py", line 174, in handle
  results = fn(*args, **kwargs)
File "/opt/score_feeder/feeds/rotowire/leagues/nfl/player_news.py", line 40, in NFL_PLAYER_NEWS
  snippet = xml_parse(xml_serialize(snippet))
File "/opt/score_feeder/lib/utils.py", line 239, in xml_serialize
  xmlstr = lxml.etree.tostring(node).strip()
File "lxml.etree.pyx", line 3119, in lxml.etree.tostring (src/lxml/lxml.etree.c:64447)
File "serializer.pxi", line 132, in lxml.etree._tostring (src/lxml/lxml.etree.c:101602)
File "serializer.pxi", line 192, in lxml.etree._raiseSerialisationError (src/lxml/lxml.etree.c:102217)

SerialisationError: IO_ENCODER

Example 2:

INFO:7091 2013-11-05 09:32:46
job 6, pri 0, req 0

NBA_MOVEMENTS finished. [c 1, u 0, d 1]

No players that match global id [2] and name [Abdul-Wahad, Tariq]. Skipping.
league: nba, method: NBA_MOVEMENTS, filename: TPnBh5movements_basketball_ab.xml

No players that match global id [3] and name [Abdur-Rahim, Shareef]. Skipping.
league: nba, method: NBA_MOVEMENTS, filename: TPnBh5movements_basketball_ab.xml

No players that match global id [3358] and name [Abromaitis, Tim]. Skipping.
league: nba, method: NBA_MOVEMENTS, filename: TPnBh5movements_basketball_ab.xml#

This problem is addressed by accumulating log lines in the buffer until another start-of-log- record line appears. In the case that a multi-line log record is the last record, it will be flushed after a timeout. This assumes that even for a multi-line log record, it’s often that all the lines are written out by the application together as one log record.

Mutex.new

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(timeout = 5, &block) ⇒ MultilineLogBuffer

Returns a new instance of MultilineLogBuffer.



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/log2json/mlogbuffer.rb', line 66

def initialize(timeout=5, &block)
  @lock = Mutex.new
  @flush_timeout = timeout

  # buffer to aggregate a multi-line log record
  # The first item will be a log record and the rest will
  # be lines that are part of the @message field of the log record.
  @log_buffer = []

  @last_popped_record = nil

  if not block.nil?
    Thread.new do
      begin
        loop do 
          sleep @flush_timeout
          rec = pop_push
          block.call(rec) if not rec.nil?
        end
      rescue => e
        LOG.error(e.backtrace.join("\n"))
        retry
      end
    end
  end
end

Class Method Details

.write_log_record(rec) ⇒ Object



57
58
59
60
61
62
63
64
# File 'lib/log2json/mlogbuffer.rb', line 57

def self.write_log_record(rec)
  if not rec.nil?
    STDOUT_LOCK.synchronize do
      STDOUT.write(rec.to_json << "\n")
      STDOUT.flush()
    end
  end
end

Instance Method Details

#pop_push(in_record = nil) ⇒ Object

pops the log record off the buffer and then optionally push a log record into the buffer. returns the popped record.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/log2json/mlogbuffer.rb', line 104

def pop_push(in_record=nil)
  rec = nil
  @lock.synchronize do
    rec = @log_buffer.shift() 
    unless rec.nil?
      if not rec.is_a?(Hash)
        # the only way this can happen is that a timeout happened
        # and the buffered multi-line log record got popped off before 
        # all its lines were pushed to the buffer.
        @log_buffer.unshift(rec)
        rec = @last_popped_record
      end
      rest = @log_buffer.join("\n")
      if not rest.empty?
        rec['@message'] += "\n" + rest
        rec['@tags'] << 'multiline'
      end
      @log_buffer.clear()
      @last_popped_record = rec
    end
    @log_buffer << in_record unless in_record.nil?
  end
#    LOG.debug("Popped #{rec}")
#    LOG.debug("Pushed #{in_record}")
  rec
end

#push_part(part) ⇒ Object

push a log record or part of its message to the buffer.



94
95
96
97
98
99
100
# File 'lib/log2json/mlogbuffer.rb', line 94

def push_part(part)
  @lock.synchronize do
#      LOG.debug("pushing part: #{part}") unless @log_buffer.empty?
    @log_buffer << part unless @log_buffer.empty?
    # Note: we skip pushing any partial lines before there's a log record.
  end
end