Class: Fwd

Inherits:
Object
  • Object
show all
Defined in:
lib/fwd.rb

Defined Under Namespace

Classes: Backend, Buffer, CLI, Input, Output, Pool

Constant Summary collapse

FLUSH =
"\000>>"

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Fwd

Constructor

Parameters:

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • path (String)

    path where buffer files are stored

  • prefix (String)

    buffer file prefix

  • bind (URI)

    the endpoint to listen to

  • forward (Array<URI>)

    the endpoints to forward to

  • flush_limit (Integer)

    flush after L messages

  • flush_rate (Integer)

    flush after M messages

  • flush_interval (Integer)

    flush after N seconds



49
50
51
52
53
54
55
# File 'lib/fwd.rb', line 49

def initialize(opts = {})
  @bind   = URI.parse(opts[:bind] || "tcp://0.0.0.0:7289")
  @root   = Pathname.new(opts[:path] || "tmp")
  @prefix = opts[:prefix] || "buffer"
  @opts   = opts
  @output = Fwd::Output.new(self)
end

Class Attribute Details

.loggerObject

Logger

logger instance



19
20
21
# File 'lib/fwd.rb', line 19

def logger
  @logger ||= ::Logger.new(STDOUT)
end

Instance Attribute Details

#bindObject (readonly)



26
27
28
# File 'lib/fwd.rb', line 26

def bind
  @bind
end

#optsObject (readonly)



38
39
40
# File 'lib/fwd.rb', line 38

def opts
  @opts
end

#outputObject (readonly)



35
36
37
# File 'lib/fwd.rb', line 35

def output
  @output
end

#prefixObject (readonly)



32
33
34
# File 'lib/fwd.rb', line 32

def prefix
  @prefix
end

#rootObject (readonly)



29
30
31
# File 'lib/fwd.rb', line 29

def root
  @root
end

Instance Method Details

#flush!Object

Initiates flush



92
93
94
95
96
# File 'lib/fwd.rb', line 92

def flush!
  @piper.child do
    @piper.puts(FLUSH)
  end
end

#listen!Object

Starts the server



86
87
88
89
# File 'lib/fwd.rb', line 86

def listen!
  logger.info "Starting server on #{@bind}"
  EM.start_server @bind.host, @bind.port, Fwd::Input, self
end

#loggerObject

Logger

logger instance



99
100
101
# File 'lib/fwd.rb', line 99

def logger
  self.class.logger
end

#run!Object

Starts the loop



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/fwd.rb', line 58

def run!
  $0 = "fwd-rb (output)"

  @piper = ::Servolux::Piper.new('rw')
  at_exit do
    @piper.signal("TERM")
  end

  @piper.child do
    $0 = "fwd-rb (input)"
    EM.run { listen! }
  end

  @piper.parent do
    loop do
      sleep(0.1)
      case val = @piper.gets()
      when FLUSH
        output.forward!
      else
        logger.error "Received unknown message #{val.class.name} "
        exit
      end
    end
  end
end