Class: ZMachine::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/zmachine/channel.rb

Direct Known Subclasses

TCPChannel, ZMQChannel

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeChannel

Returns a new instance of Channel.



10
11
12
13
# File 'lib/zmachine/channel.rb', line 10

def initialize
  @outbound_queue = ConcurrentLinkedQueue.new
  @raw = false
end

Instance Attribute Details

#rawObject

Returns the value of attribute raw.



8
9
10
# File 'lib/zmachine/channel.rb', line 8

def raw
  @raw
end

#socketObject

Returns the value of attribute socket.



7
8
9
# File 'lib/zmachine/channel.rb', line 7

def socket
  @socket
end

Instance Method Details

#can_send?Boolean

methods that need to be implemented in sub classes:

selectable_fd bind(address, port = nil) bound? accept connect(address, port = nil) connection_pending? finish_connecting connected? read_inbound_data send_data(data) closed? peer write_outbound_data

Returns:

  • (Boolean)


31
32
33
# File 'lib/zmachine/channel.rb', line 31

def can_send?
  connected? && !@outbound_queue.empty?
end

#closeObject



59
60
61
62
63
64
# File 'lib/zmachine/channel.rb', line 59

def close
  return true if closed?
  ZMachine.logger.debug("zmachine:channel:#{__method__}", channel: self, caller: caller[0].inspect) if ZMachine.debug
  @outbound_queue.clear
  close!
end

#send_data(data) ⇒ Object

Raises:

  • (RuntimeError)


35
36
37
38
39
40
41
42
43
# File 'lib/zmachine/channel.rb', line 35

def send_data(data)
  ZMachine.logger.debug("zmachine:channel:#{__method__}", channel: self) if ZMachine.debug
  raise RuntimeError.new("send_data called after close") if @closed_callback
  return unless data
  buffer = ByteBuffer.wrap(data)
  if buffer.has_remaining
    @outbound_queue.add(buffer)
  end
end

#write_outbound_dataObject



45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/zmachine/channel.rb', line 45

def write_outbound_data
  ZMachine.logger.debug("zmachine:channel:#{__method__}", channel: self, can_send: can_send?) if ZMachine.debug
  while can_send?
    buffer = @outbound_queue.peek
    break unless buffer
    @socket.write(buffer) if buffer.has_remaining
    # Did we consume the whole outbound buffer? If yes,
    # pop it off and keep looping. If no, the outbound network
    # buffers are full, so break out of here.
    break if buffer.has_remaining
    @outbound_queue.poll
  end
end