Class: Droonga::BufferedTCPSocket

Inherits:
Coolio::TCPSocket
  • Object
show all
Includes:
Loggable
Defined in:
lib/droonga/buffered_tcp_socket.rb

Defined Under Namespace

Classes: AlreadyInWritingByOthers, Chunk, ChunkLoader

Instance Method Summary collapse

Constructor Details

#initialize(socket, data_directory) ⇒ BufferedTCPSocket

Returns a new instance of BufferedTCPSocket.



30
31
32
33
34
# File 'lib/droonga/buffered_tcp_socket.rb', line 30

def initialize(socket, data_directory)
  super(socket)
  @data_directory = data_directory
  @_write_buffer = []
end

Instance Method Details

#on_connectObject



36
37
38
# File 'lib/droonga/buffered_tcp_socket.rb', line 36

def on_connect
  logger.trace("connected to #{@remote_host}:#{@remote_port}")
end

#on_writableObject



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/droonga/buffered_tcp_socket.rb', line 49

def on_writable
  until @_write_buffer.empty?
    chunk = @_write_buffer.shift
    begin
      chunk.writing
      logger.trace("Sending...", :data => chunk.data)
      written_size = @_io.write_nonblock(chunk.data)
      if written_size == chunk.data.bytesize
        chunk.written
        logger.trace("Completely sent.")
      else
        chunk.written_partial(written_size)
        logger.trace("Partially sent. Retry later.",
                     :written => written_size,
                     :rest    => chunk.data.bytesize)
        @_write_buffer.unshift(chunk)
        break
      end
    rescue AlreadyInWritingByOthers
      logger.trace("Chunk is already in sending by another process.")
    rescue Errno::EINTR
      @_write_buffer.unshift(chunk)
      chunk.failed
      logger.trace("Failed to send chunk. Retry later.",
                   :chunk => chunk,
                   :errpr => "Errno::EINTR")
      return
    rescue SystemCallError, IOError, SocketError => exception
      @_write_buffer.unshift(chunk)
      chunk.failed
      logger.trace("Failed to send chunk. Retry later.",
                   :chunk => chunk,
                   :exception => exception)
      return close
    end
  end

  if @_write_buffer.empty?
    disable_write_watcher
    on_write_complete
  end
end

#resumeObject



92
93
94
95
96
# File 'lib/droonga/buffered_tcp_socket.rb', line 92

def resume
  @_write_buffer = (load_chunks + @_write_buffer).sort_by do |chunk|
    chunk.time_stamp
  end
end

#write(data) ⇒ Object



40
41
42
43
44
45
46
47
# File 'lib/droonga/buffered_tcp_socket.rb', line 40

def write(data)
  chunk = Chunk.new(:directory => @data_directory,
                    :data      => data)
  chunk.buffering
  @_write_buffer << chunk
  schedule_write
  data.bytesize
end