Class: Logplex::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/logplex/session.rb

Overview

Public: A logplex event session.

This class allows you to read events from logplex.

You generally acquire an instance of this class through Logplex::Channel#create_session

Constant Summary collapse

DEFAULT_CHUNK_SIZE =
1024

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url, session_settings) ⇒ Session

Public: Initialize a logplex session.

url - the url to the session, usually of the form:

https://user:pass@logplex/sessions/session-id

session_settings - a hash of arguments; see

{Logplex::Channel#create_session} for values.


30
31
32
33
34
35
36
37
38
39
40
# File 'lib/logplex/session.rb', line 30

def initialize(url, session_settings)
  @url = url

  if session_settings[:tail]
    @tail = true
  else
    @tail = false
    @limit = session_settings[:num]
  end
  @chunk_size = session_settings[:chunk_size] || DEFAULT_CHUNK_SIZE
end

Instance Attribute Details

#urlObject (readonly)

Returns the value of attribute url.



21
22
23
# File 'lib/logplex/session.rb', line 21

def url
  @url
end

Instance Method Details

#each_event(&block) ⇒ Object

Public: iterate over events received from this logplex session

Yields Logplex::Event objects, one per event. Returns nothing Raises TODO(sissel): ???



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/logplex/session.rb', line 47

def each_event(&block)
  connection = Excon.new(@url, :chunk_size => @chunk_size)

  @buffer = ""
  @event_count = 0

  process_response = lambda do |chunk, remaining_bytes, total_bytes|
    @buffer += chunk

    events = @buffer.split("\n")
    if @buffer[-1] != "\n"
      @buffer = events.last
      events = events[0 .. -2]
    else
      @buffer = ""
    end

    events.each do |line|
      @event_count += 1
      block.call(line)
      return if !@tail && @limit < @event_count
    end
  end

  response = connection.get(:response_block => process_response)
  raise "Error: #{response.body}" if response.status != 200
end