Class: BERTRPC::Action

Inherits:
Object
  • Object
show all
Includes:
Encodes
Defined in:
lib/bertrpc/action.rb

Instance Method Summary collapse

Methods included from Encodes

#decode_bert_response, #encode_ruby_request, #error

Constructor Details

#initialize(svc, req, mod, fun, args) ⇒ Action

Returns a new instance of Action.



5
6
7
8
9
10
11
# File 'lib/bertrpc/action.rb', line 5

def initialize(svc, req, mod, fun, args)
  @svc = svc
  @req = req
  @mod = mod
  @fun = fun
  @args = args
end

Instance Method Details

#connect_to(host, port, timeout = nil) ⇒ Object

Creates a socket object which does speedy, non-blocking reads and can perform reliable read timeouts.

Raises Timeout::Error on timeout.

+host+ String address of the target TCP server
+port+ Integer port of the target TCP server
+timeout+ Optional Integer (in seconds) of the read timeout


96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/bertrpc/action.rb', line 96

def connect_to(host, port, timeout = nil)
  addr = Socket.getaddrinfo(host, nil, Socket::AF_INET)
  sock = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
  sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1

  if timeout
    secs = Integer(timeout)
    usecs = Integer((timeout - secs) * 1_000_000)
    optval = [secs, usecs].pack("l_2")
    sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
    sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
  end

  sock.connect(Socket.pack_sockaddr_in(port, addr[0][3]))
  sock
end

#executeObject



13
14
15
16
17
# File 'lib/bertrpc/action.rb', line 13

def execute
  bert_request = encode_ruby_request(t[@req.kind, @mod, @fun, @args])
  bert_response = transaction(bert_request)
  decode_bert_response(bert_response)
end

#read(sock, len, timeout) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/bertrpc/action.rb', line 26

def read(sock, len, timeout)
  data, size = [], 0
  while size < len
    r, w, e = IO.select([sock], [], [], timeout)
    raise Errno::EAGAIN if r.nil?
    msg, sender = sock.recvfrom(len - size)
    raise Errno::ECONNRESET if msg.size == 0
    size += msg.size
    data << msg
  end
  data.join ''
end

#transaction(bert_request) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/bertrpc/action.rb', line 39

def transaction(bert_request)
  timeout = @svc.timeout && Float(@svc.timeout)
  sock = connect_to(@svc.host, @svc.port, timeout)

  if @req.options
    if @req.options[:cache] && @req.options[:cache][0] == :validation
      token = @req.options[:cache][1]
      info_bert = encode_ruby_request(t[:info, :cache, [:validation, token]])
      write(sock, info_bert)
    end

    if @req.options[:stream]
      info_bert = encode_ruby_request(t[:info, :stream, []])
      write(sock, info_bert)
    end
  end

  write(sock, bert_request)

  case stream = (@req.options||{})[:stream]
  when IO then
    while blob = stream.read(1024)
      write(sock, blob)
    end
    write(sock, "")
  when String
    File.open(stream) do |f|
      while blob = f.read(1024)
        write(sock, blob)
      end
      write(sock, "")
    end
  end

  lenheader = read(sock, 4, timeout)
  raise ProtocolError.new(ProtocolError::NO_HEADER) unless lenheader
  len = lenheader.unpack('N').first
  bert_response = read(sock, len, timeout)
  raise ProtocolError.new(ProtocolError::NO_DATA) unless bert_response
  sock.close
  bert_response
rescue Errno::ECONNREFUSED
  raise ConnectionError.new(@svc.host, @svc.port)
rescue Errno::EAGAIN
  raise ReadTimeoutError.new(@svc.host, @svc.port, @svc.timeout)
rescue Errno::ECONNRESET
  raise ReadError.new(@svc.host, @svc.port)
end

#write(sock, bert) ⇒ Object

private



21
22
23
24
# File 'lib/bertrpc/action.rb', line 21

def write(sock, bert)
  sock.write([bert.length].pack("N"))
  sock.write(bert)
end