Class: GrpcKit::Session::IO

Inherits:
Object
  • Object
show all
Defined in:
lib/grpc_kit/session/io.rb

Instance Method Summary collapse

Constructor Details

#initialize(io) ⇒ IO

Returns a new instance of IO.



8
9
10
11
# File 'lib/grpc_kit/session/io.rb', line 8

def initialize(io)
  @io = io
  @wake_o, @wake_i = ::IO.pipe
end

Instance Method Details

#closeObject



13
14
15
16
17
# File 'lib/grpc_kit/session/io.rb', line 13

def close
  @wake_i.close
  @wake_o.close
  @io.close
end

#flushvoid

This method returns an undefined value.



74
75
76
# File 'lib/grpc_kit/session/io.rb', line 74

def flush
  @io.flush
end

#recv_event(length) ⇒ DS9::ERR_WOULDBLOCK, ...

Parameters:

  • length (Integer)

Returns:

  • (DS9::ERR_WOULDBLOCK, DS9::ERR_EOF, String)


21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/grpc_kit/session/io.rb', line 21

def recv_event(length)
  data = @io.read_nonblock(length, nil, exception: false)

  case data
  when :wait_readable
    DS9::ERR_WOULDBLOCK
  when nil # nil means EOF
    DS9::ERR_EOF
  else
    data
  end
end

#select(timeout: 1, write: true) ⇒ void

This method returns an undefined value.

Blocking until io object is readable or writable



58
59
60
61
62
# File 'lib/grpc_kit/session/io.rb', line 58

def select(timeout: 1, write: true)
  rs, ws = ::IO.select([@io, @wake_o], write ? [@io] : [], [], timeout)
  @wake_o.read(@wake_o.stat.size) if rs&.delete(@wake_o) && !@wake_o.closed?
  [rs || [], ws || []]
end

#send_event(data) ⇒ DS9::ERR_WOULDBLOCK, Integer

Parameters:

  • data (String)

Returns:

  • (DS9::ERR_WOULDBLOCK, Integer)


36
37
38
39
40
41
42
43
44
45
# File 'lib/grpc_kit/session/io.rb', line 36

def send_event(data)
  return 0 if data.empty?

  bytes = @io.write_nonblock(data, exception: false)
  if bytes == :wait_writable
    DS9::ERR_WOULDBLOCK
  else
    bytes
  end
end

#wait_readablevoid

This method returns an undefined value.

Blocking until io object is readable



49
50
51
52
53
54
# File 'lib/grpc_kit/session/io.rb', line 49

def wait_readable
  ::IO.select([@io], [], [])
  true
rescue IOError
  false
end

#wake!(memo = nil) ⇒ Object

Wake thread blocked at #select method

Parameters:

  • Indicate (Symbol)

    what event needed to invoke blocking thread. This argument is for debugging purpose.



66
67
68
69
70
71
# File 'lib/grpc_kit/session/io.rb', line 66

def wake!(memo = nil)
  @wake_i.write_nonblock(?\0, exception: false)
rescue Errno::EPIPE
rescue IOError
  raise unless @wake_i.closed?
end