Class: Fwd::Output

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/fwd/output.rb

Constant Summary collapse

RESCUABLE =
[
  Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, Errno::EPIPE,
  Errno::ENETUNREACH, Errno::ENETDOWN, Errno::EINVAL, Errno::ETIMEDOUT,
  IOError, EOFError
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(core) ⇒ Output

Constructor

Parameters:



15
16
17
18
19
20
21
# File 'lib/fwd/output.rb', line 15

def initialize(core)
  backends = Array(core.opts[:forward]).compact.map do |s|
    Fwd::Backend.new(s)
  end
  @core = core
  @pool = Fwd::Pool.new(backends)
end

Instance Attribute Details

#coreObject (readonly)

Returns the value of attribute core.



11
12
13
# File 'lib/fwd/output.rb', line 11

def core
  @core
end

#poolObject (readonly)

Returns the value of attribute pool.



11
12
13
# File 'lib/fwd/output.rb', line 11

def pool
  @pool
end

Instance Method Details

#forward!Object

Callback



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/fwd/output.rb', line 24

def forward!
  return if @forwarding

  @forwarding = true
  while (q = closed_files) && (file = q.shift)
    ok = reserve(file) do |reserved|
      start   = Time.now
      success = stream_file(reserved)
      real    = Time.now - start
      logger.info { "Flushed #{reserved.basename}, #{reserved.size.fdiv(1024).round}k in #{real.round(1)}s (Q: #{q.size})" }
      success
    end
    ok || break
  end
ensure
  @forwarding = false
end

#stream_file(file) ⇒ Object

Parameters:

  • file (Pathname)

    file to stream



43
44
45
46
47
# File 'lib/fwd/output.rb', line 43

def stream_file(file)
  pool.any? do |backend|
    stream_to(backend, file)
  end
end