Class: Qpid::Proton::Handler::MessagingAdapter
- Inherits:
-
Adapter
- Object
- Adapter
- Qpid::Proton::Handler::MessagingAdapter
show all
- Defined in:
- lib/handler/messaging_adapter.rb
Overview
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from Adapter
adapt, adapter, #forward, #initialize, #proton_adapter_class
Class Method Details
.open_close(endpoint) ⇒ Object
Define repetative on_xxx_open/close methods for each endpoint type
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
# File 'lib/handler/messaging_adapter.rb', line 42
def self.open_close(endpoint)
Module.new do
define_method(:"on_#{endpoint}_remote_open") do |event|
begin
delegate(:"on_#{endpoint}_open", event.context)
event.context.open if event.context.local_uninit?
rescue StopAutoResponse
end
end
define_method(:"on_#{endpoint}_remote_close") do |event|
delegate_error(:"on_#{endpoint}_error", event.context) if event.context.condition
begin
delegate(:"on_#{endpoint}_close", event.context)
event.context.close if event.context.local_active?
rescue StopAutoResponse
end
end
end
end
|
Instance Method Details
#add_credit(event) ⇒ Object
119
120
121
122
123
124
|
# File 'lib/handler/messaging_adapter.rb', line 119
def add_credit(event)
return unless (r = event.receiver)
if r.open? && (r.drained == 0) && r.credit_window && (r.credit_window > r.credit)
r.flow(r.credit_window - r.credit)
end
end
|
#delegate(method, *args) ⇒ Object
26
27
28
|
# File 'lib/handler/messaging_adapter.rb', line 26
def delegate(method, *args)
forward(method, *args) or forward(:on_unhandled, method, *args)
end
|
#delegate_error(method, context) ⇒ Object
30
31
32
33
34
35
36
|
# File 'lib/handler/messaging_adapter.rb', line 30
def delegate_error(method, context)
unless forward(method, context) || forward(:on_error, context.condition)
forward(:on_unhandled, method, context)
context.connection.close(context.condition)
end
end
|
#on_container_start(container) ⇒ Object
38
|
# File 'lib/handler/messaging_adapter.rb', line 38
def on_container_start(container) delegate(:on_container_start, container); end
|
#on_container_stop(container) ⇒ Object
39
|
# File 'lib/handler/messaging_adapter.rb', line 39
def on_container_stop(container) delegate(:on_container_stop, container); end
|
#on_delivery(event) ⇒ Object
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/handler/messaging_adapter.rb', line 79
def on_delivery(event)
if event.link.receiver? d = event.delivery
if d.aborted?
delegate(:on_delivery_abort, d)
elsif d.complete?
if d.link.local_closed? && d.receiver.auto_accept
d.release else
begin
delegate(:on_message, d, d.message)
d.accept if d.receiver.auto_accept && !d.settled?
rescue Reject
d.reject unless d.settled?
rescue Release
d.release unless d.settled?
end
end
end
delegate(:on_delivery_settle, d) if d.settled?
add_credit(event)
else t = event.tracker
case t.state
when Delivery::ACCEPTED then delegate(:on_tracker_accept, t)
when Delivery::REJECTED then delegate(:on_tracker_reject, t)
when Delivery::RELEASED then delegate(:on_tracker_release, t)
when Delivery::MODIFIED then delegate(:on_tracker_modify, t)
end
delegate(:on_tracker_settle, t) if t.settled?
t.settle if t.sender.auto_settle
end
end
|
#on_link_flow(event) ⇒ Object
113
114
115
116
117
|
# File 'lib/handler/messaging_adapter.rb', line 113
def on_link_flow(event)
add_credit(event)
sender = event.sender
delegate(:on_sendable, sender) if sender && sender.open? && sender.credit > 0
end
|
#on_link_local_open(event) ⇒ Object
Add flow control for link opening events
75
|
# File 'lib/handler/messaging_adapter.rb', line 75
def on_link_local_open(event) add_credit(event); end
|
#on_link_remote_open(event) ⇒ Object
76
|
# File 'lib/handler/messaging_adapter.rb', line 76
def on_link_remote_open(event) super; add_credit(event); end
|
#on_transport_closed(event) ⇒ Object
70
71
72
|
# File 'lib/handler/messaging_adapter.rb', line 70
def on_transport_closed(event)
delegate(:on_transport_close, event.context) rescue StopAutoResponse
end
|
#on_transport_error(event) ⇒ Object
66
67
68
|
# File 'lib/handler/messaging_adapter.rb', line 66
def on_transport_error(event)
delegate_error(:on_transport_error, event.context)
end
|