Class: Droonga::BufferedTCPSocket
- Inherits:
-
Coolio::TCPSocket
- Object
- Coolio::TCPSocket
- Droonga::BufferedTCPSocket
show all
- Includes:
- Loggable
- Defined in:
- lib/droonga/buffered_tcp_socket.rb
Defined Under Namespace
Classes: AlreadyInWritingByOthers, Chunk, ChunkLoader
Instance Method Summary
collapse
Constructor Details
#initialize(socket, data_directory) ⇒ BufferedTCPSocket
Returns a new instance of BufferedTCPSocket.
30
31
32
33
34
|
# File 'lib/droonga/buffered_tcp_socket.rb', line 30
def initialize(socket, data_directory)
super(socket)
@data_directory = data_directory
@_write_buffer = []
end
|
Instance Method Details
#on_connect ⇒ Object
36
37
38
|
# File 'lib/droonga/buffered_tcp_socket.rb', line 36
def on_connect
logger.trace("connected to #{@remote_host}:#{@remote_port}")
end
|
#on_writable ⇒ Object
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/droonga/buffered_tcp_socket.rb', line 49
def on_writable
until @_write_buffer.empty?
chunk = @_write_buffer.shift
begin
chunk.writing
logger.trace("Sending...", :data => chunk.data)
written_size = @_io.write_nonblock(chunk.data)
if written_size == chunk.data.bytesize
chunk.written
logger.trace("Completely sent.")
else
chunk.written_partial(written_size)
logger.trace("Partially sent. Retry later.",
:written => written_size,
:rest => chunk.data.bytesize)
@_write_buffer.unshift(chunk)
break
end
rescue AlreadyInWritingByOthers
logger.trace("Chunk is already in sending by another process.")
rescue Errno::EINTR
@_write_buffer.unshift(chunk)
chunk.failed
logger.trace("Failed to send chunk. Retry later.",
:chunk => chunk,
:errpr => "Errno::EINTR")
return
rescue SystemCallError, IOError, SocketError => exception
@_write_buffer.unshift(chunk)
chunk.failed
logger.trace("Failed to send chunk. Retry later.",
:chunk => chunk,
:exception => exception)
return close
end
end
if @_write_buffer.empty?
disable_write_watcher
on_write_complete
end
end
|
#resume ⇒ Object
92
93
94
95
96
|
# File 'lib/droonga/buffered_tcp_socket.rb', line 92
def resume
@_write_buffer = (load_chunks + @_write_buffer).sort_by do |chunk|
chunk.time_stamp
end
end
|
#write(data) ⇒ Object
40
41
42
43
44
45
46
47
|
# File 'lib/droonga/buffered_tcp_socket.rb', line 40
def write(data)
chunk = Chunk.new(:directory => @data_directory,
:data => data)
chunk.buffering
@_write_buffer << chunk
schedule_write
data.bytesize
end
|