Class: Yahns::Wbuf

Inherits:
Object
  • Object
show all
Includes:
WbufCommon
Defined in:
lib/yahns/wbuf.rb

Overview

This class is triggered whenever we need write buffering for clients reading responses slowly. Small responses which fit into kernel socket buffers do not trigger this. yahns will always attempt to write to kernel socket buffers first to avoid unnecessary copies in userspace.

Thus, most data is into copied to the kernel only once, the kernel will perform zero-copy I/O from the page cache to the socket. The only time data may be copied twice is the initial write()/send() which triggers EAGAIN.

We only buffer to the filesystem (note: likely not a disk, since this is short-lived). We let the sysadmin/kernel decide whether or not the data needs to hit disk or not.

This avoids large allocations from malloc, potentially limiting fragmentation and keeping (common) smaller allocations fast. General purpose malloc implementations in the 64-bit era tend to avoid releasing memory back to the kernel, so large heap allocations are best avoided as the kernel has little chance to reclaim memory used for a temporary buffer.

The biggest downside of this approach is it requires an FD, but yahns configurations are configured for many FDs anyways, so it’s unlikely to be a scalability issue.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from WbufCommon

#wbuf_close_common, #wbuf_flush

Constructor Details

#initialize(body, persist) ⇒ Wbuf

Returns a new instance of Wbuf.



35
36
37
38
39
40
41
# File 'lib/yahns/wbuf.rb', line 35

def initialize(body, persist)
  @tmpio = nil
  @sf_offset = @sf_count = 0
  @wbuf_persist = persist # whether or not we keep the connection alive
  @body = body # something we call #close on when done writing
  @busy = false
end

Instance Attribute Details

#busyObject (readonly)

Returns the value of attribute busy.



33
34
35
# File 'lib/yahns/wbuf.rb', line 33

def busy
  @busy
end

Instance Method Details

#wbuf_abortObject



92
93
94
# File 'lib/yahns/wbuf.rb', line 92

def wbuf_abort
  @tmpio = @tmpio.close if @tmpio
end

#wbuf_close(client) ⇒ Object

called by last wbuf_flush



87
88
89
90
# File 'lib/yahns/wbuf.rb', line 87

def wbuf_close(client)
  wbuf_abort
  wbuf_close_common(client)
end

#wbuf_write(c, buf) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/yahns/wbuf.rb', line 48

def wbuf_write(c, buf)
  # try to bypass the VFS layer and write directly to the socket
  # if we're all caught up
  case rv = String === buf ? c.kgio_trywrite(buf) : c.kgio_trywritev(buf)
  when String, Array
    buf = rv # retry in loop
  when nil
    return # yay! hopefully we don't have to buffer again
  when :wait_writable, :wait_readable
    @busy = rv
  end until @busy

  @tmpio ||= Yahns::TmpIO.new(c.class.output_buffer_tmpdir)
  # n.b.: we rely on O_APPEND in TmpIO, here
  @sf_count += String === buf ? @tmpio.write(buf) : wbuf_writev(buf)

  # we spent some time copying to the FS, try to write to
  # the socket again in case some space opened up...
  case rv = c.trysendfile(@tmpio, @sf_offset, @sf_count)
  when Integer
    @sf_count -= rv
    @sf_offset += rv
  when :wait_writable, :wait_readable
    @busy = rv
    return rv
  else
    raise "BUG: #{rv.nil? ? "EOF" : rv.inspect} on tmpio " \
          "sf_offset=#@sf_offset sf_count=#@sf_count"
  end while @sf_count > 0

  # we're all caught up, try to prevent dirty data from getting flushed
  # to disk if we can help it.
  wbuf_abort
  @sf_offset = 0
  @busy = false
  nil
end

#wbuf_writev(buf) ⇒ Object



43
44
45
46
# File 'lib/yahns/wbuf.rb', line 43

def wbuf_writev(buf)
  @tmpio.kgio_writev(buf)
  buf.inject(0) { |n, s| n += s.size }
end