Class: Fwd

Inherits:
Object
  • Object
show all
Defined in:
lib/fwd.rb

Defined Under Namespace

Classes: Backend, Buffer, CLI, Input, Output, Pool

Constant Summary collapse

FLUSH =
"\000>>"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Fwd

Constructor

Parameters:

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • path (String)

    path where buffer files are stored

  • prefix (String)

    buffer file prefix

  • bind (URI)

    the endpoint to listen to

  • forward (Array<URI>)

    the endpoints to forward to

  • buffer_limit (Integer)

    limit buffer files to N bytes

  • flush_rate (Integer)

    flush after N messages

  • flush_interval (Integer)

    flush after N seconds



41
42
43
44
45
46
47
48
49
# File 'lib/fwd.rb', line 41

def initialize(opts = {})
  @opts   = opts
  @bind   = URI.parse(opts[:bind] || "tcp://0.0.0.0:7289")
  @root   = Pathname.new(opts[:path] || "tmp")
  @prefix = opts[:prefix] || "buffer"
  @logger = ::Logger.new(opts[:log] || STDOUT)
  @logger.level = opts[:log_level] || ::Logger::INFO
  @output = Fwd::Output.new(self)
end

Instance Attribute Details

#bindObject (readonly)



15
16
17
# File 'lib/fwd.rb', line 15

def bind
  @bind
end

#loggerObject (readonly)



27
28
29
# File 'lib/fwd.rb', line 27

def logger
  @logger
end

#optsObject (readonly)



30
31
32
# File 'lib/fwd.rb', line 30

def opts
  @opts
end

#outputObject (readonly)



24
25
26
# File 'lib/fwd.rb', line 24

def output
  @output
end

#prefixObject (readonly)



21
22
23
# File 'lib/fwd.rb', line 21

def prefix
  @prefix
end

#rootObject (readonly)



18
19
20
# File 'lib/fwd.rb', line 18

def root
  @root
end

Instance Method Details

#flush!Object

Initiates flush



92
93
94
95
96
# File 'lib/fwd.rb', line 92

def flush!
  @piper.child do
    @piper.puts(FLUSH)
  end
end

#listen!Object

Starts the server



71
72
73
74
75
# File 'lib/fwd.rb', line 71

def listen!
  logger.info "Starting server on #{@bind}"
  buffer = Fwd::Buffer.new(self)
  EM.start_server @bind.host, @bind.port, Fwd::Input, self, buffer
end

#output_loop!Object

Starts the output loop



78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fwd.rb', line 78

def output_loop!
  loop do
    sleep(0.1)
    case val = @piper.gets()
    when FLUSH
      @output.forward!
    else
      logger.error "Received unknown message #{val.class.name}: #{val.inspect}"
      exit
    end
  end
end

#run!Object

Starts the loop



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fwd.rb', line 52

def run!
  @piper = ::Servolux::Piper.new('rw')

  at_exit do
    @piper.signal("TERM")
  end

  @piper.child do
    $0 = "fwd-rb (input)"
    EM.run { listen! }
  end

  @piper.parent do
    $0 = "fwd-rb (output)"
    output_loop!
  end
end