Class: GRPC::RpcDesc
Overview
RpcDesc is a Descriptor of an RPC method.
Defined Under Namespace
Classes: Stream
Constant Summary
Core::StatusCodes::ABORTED, Core::StatusCodes::ALREADY_EXISTS, Core::StatusCodes::CANCELLED, Core::StatusCodes::DATA_LOSS, Core::StatusCodes::DEADLINE_EXCEEDED, Core::StatusCodes::FAILED_PRECONDITION, Core::StatusCodes::INTERNAL, Core::StatusCodes::INVALID_ARGUMENT, Core::StatusCodes::NOT_FOUND, Core::StatusCodes::OK, Core::StatusCodes::OUT_OF_RANGE, Core::StatusCodes::PERMISSION_DENIED, Core::StatusCodes::RESOURCE_EXHAUSTED, Core::StatusCodes::UNAUTHENTICATED, Core::StatusCodes::UNAVAILABLE, Core::StatusCodes::UNIMPLEMENTED, Core::StatusCodes::UNKNOWN
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#arity_error(mth, want, msg) ⇒ Object
-
#assert_arity_matches(mth) ⇒ Object
-
#bidi_streamer? ⇒ Boolean
-
#client_streamer? ⇒ Boolean
-
#handle_bidi_streamer(active_call, mth, inter_ctx) ⇒ Object
-
#handle_client_streamer(active_call, mth, inter_ctx) ⇒ Object
-
#handle_request_response(active_call, mth, inter_ctx) ⇒ Object
-
#handle_server_streamer(active_call, mth, inter_ctx) ⇒ Object
-
#marshal_proc ⇒ Proc
{ |instance| marshalled(instance) }.
-
#request_response? ⇒ Boolean
-
#run_server_method(active_call, mth, inter_ctx = InterceptionContext.new) ⇒ Object
-
#send_status(active_client, code, details, metadata = {}) ⇒ Object
-
#server_streamer? ⇒ Boolean
-
#unmarshal_proc(target) ⇒ Proc
An unmarshal proc { |marshalled(instance)| instance }.
Instance Attribute Details
Returns the value of attribute input
20
21
22
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 20
def input
@input
end
|
#marshal_method ⇒ Object
Returns the value of attribute marshal_method
20
21
22
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 20
def marshal_method
@marshal_method
end
|
#name ⇒ Object
Returns the value of attribute name
20
21
22
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 20
def name
@name
end
|
#output ⇒ Object
Returns the value of attribute output
20
21
22
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 20
def output
@output
end
|
#unmarshal_method ⇒ Object
Returns the value of attribute unmarshal_method
20
21
22
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 20
def unmarshal_method
@unmarshal_method
end
|
Instance Method Details
#arity_error(mth, want, msg) ⇒ Object
191
192
193
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 191
def arity_error(mth, want, msg)
"##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}"
end
|
#assert_arity_matches(mth) ⇒ Object
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 156
def assert_arity_matches(mth)
if bidi_streamer?
if mth.arity != 2 && mth.arity != 1
fail arity_error(mth, 2, "should be #{mth.name}(req, call) or " \
"#{mth.name}(req)")
end
elsif request_response? || server_streamer?
if mth.arity != 2
fail arity_error(mth, 2, "should be #{mth.name}(req, call)")
end
else
if mth.arity != 1
fail arity_error(mth, 1, "should be #{mth.name}(call)")
end
end
end
|
#bidi_streamer? ⇒ Boolean
187
188
189
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 187
def bidi_streamer?
input.is_a?(Stream) && output.is_a?(Stream)
end
|
#client_streamer? ⇒ Boolean
179
180
181
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 179
def client_streamer?
input.is_a?(Stream) && !output.is_a?(Stream)
end
|
#handle_bidi_streamer(active_call, mth, inter_ctx) ⇒ Object
105
106
107
108
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 105
def handle_bidi_streamer(active_call, mth, inter_ctx)
active_call.run_server_bidi(mth, inter_ctx)
send_status(active_call, OK, 'OK', active_call.output_metadata)
end
|
#handle_client_streamer(active_call, mth, inter_ctx) ⇒ Object
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 68
def handle_client_streamer(active_call, mth, inter_ctx)
call = active_call.multi_req_view
inter_ctx.intercept!(
:client_streamer,
method: mth,
call: call
) do
resp = mth.call(call)
active_call.server_unary_response(
resp,
trailing_metadata: active_call.output_metadata
)
end
end
|
#handle_request_response(active_call, mth, inter_ctx) ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 50
def handle_request_response(active_call, mth, inter_ctx)
req = active_call.read_unary_request
call = active_call.single_req_view
inter_ctx.intercept!(
:request_response,
method: mth,
call: call,
request: req
) do
resp = mth.call(req, call)
active_call.server_unary_response(
resp,
trailing_metadata: active_call.output_metadata
)
end
end
|
#handle_server_streamer(active_call, mth, inter_ctx) ⇒ Object
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 84
def handle_server_streamer(active_call, mth, inter_ctx)
req = active_call.read_unary_request
call = active_call.single_req_view
inter_ctx.intercept!(
:server_streamer,
method: mth,
call: call,
request: req
) do
replies = mth.call(req, call)
replies.each { |r| active_call.remote_send(r) }
send_status(active_call, OK, 'OK', active_call.output_metadata)
end
end
|
#marshal_proc ⇒ Proc
Returns { |instance| marshalled(instance) }.
34
35
36
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 34
def marshal_proc
proc { |o| o.class.send(marshal_method, o).to_s }
end
|
#request_response? ⇒ Boolean
175
176
177
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 175
def request_response?
!input.is_a?(Stream) && !output.is_a?(Stream)
end
|
#run_server_method(active_call, mth, inter_ctx = InterceptionContext.new) ⇒ Object
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 117
def run_server_method(active_call, mth, inter_ctx = InterceptionContext.new)
if request_response?
handle_request_response(active_call, mth, inter_ctx)
elsif client_streamer?
handle_client_streamer(active_call, mth, inter_ctx)
elsif server_streamer?
handle_server_streamer(active_call, mth, inter_ctx)
else handle_bidi_streamer(active_call, mth, inter_ctx)
end
rescue BadStatus => e
GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}")
send_status(active_call, e.code, e.details, e.metadata)
rescue Core::CallError => e
GRPC.logger.warn("failed call: #{active_call}\n#{e}")
rescue Core::OutOfTime
GRPC.logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late')
rescue StandardError, NotImplementedError => e
GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
GRPC.logger.warn(e)
send_status(active_call, UNKNOWN, "#{e.class}: #{e.message}")
end
|
#send_status(active_client, code, details, metadata = {}) ⇒ Object
195
196
197
198
199
200
201
202
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 195
def send_status(active_client, code, details, metadata = {})
details = 'Not sure why' if details.nil?
GRPC.logger.debug("Sending status #{code}:#{details}")
active_client.send_status(code, details, code == OK, metadata: metadata)
rescue StandardError => e
GRPC.logger.warn("Could not send status #{code}:#{details}")
GRPC.logger.warn(e)
end
|
#server_streamer? ⇒ Boolean
183
184
185
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 183
def server_streamer?
!input.is_a?(Stream) && output.is_a?(Stream)
end
|
#unmarshal_proc(target) ⇒ Proc
Returns An unmarshal proc { |marshalled(instance)| instance }.
43
44
45
46
47
48
|
# File 'src/ruby/lib/grpc/generic/rpc_desc.rb', line 43
def unmarshal_proc(target)
fail ArgumentError unless [:input, :output].include?(target)
unmarshal_class = send(target)
unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream
proc { |o| unmarshal_class.send(unmarshal_method, o) }
end
|