Class: Qpid::Proton::Handler::MessagingAdapter

Inherits:
Adapter
  • Object
show all
Defined in:
lib/handler/messaging_adapter.rb

Overview

Adapt raw proton events to MessagingHandler events.

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Adapter

adapt, adapter, #forward, #initialize, #proton_adapter_class

Constructor Details

This class inherits a constructor from Qpid::Proton::Handler::Adapter

Class Method Details

.open_close(endpoint) ⇒ Object

Define repetative on_xxx_open/close methods for each endpoint type



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/handler/messaging_adapter.rb', line 42

def self.open_close(endpoint)
  Module.new do
    define_method(:"on_#{endpoint}_remote_open") do |event|
      begin
        delegate(:"on_#{endpoint}_open", event.context)
        event.context.open if event.context.local_uninit?
      rescue StopAutoResponse
      end
    end

    define_method(:"on_#{endpoint}_remote_close") do |event|
      delegate_error(:"on_#{endpoint}_error", event.context) if event.context.condition
      begin
        delegate(:"on_#{endpoint}_close", event.context)
        event.context.close if event.context.local_active?
      rescue StopAutoResponse
      end
    end
  end
end

Instance Method Details

#add_credit(event) ⇒ Object



119
120
121
122
123
124
# File 'lib/handler/messaging_adapter.rb', line 119

def add_credit(event)
  return unless (r = event.receiver)
  if r.open? && (r.drained == 0) && r.credit_window && (r.credit_window > r.credit)
    r.flow(r.credit_window - r.credit)
  end
end

#delegate(method, *args) ⇒ Object



26
27
28
# File 'lib/handler/messaging_adapter.rb', line 26

def delegate(method, *args)
  forward(method, *args) or forward(:on_unhandled, method, *args)
end

#delegate_error(method, context) ⇒ Object



30
31
32
33
34
35
36
# File 'lib/handler/messaging_adapter.rb', line 30

def delegate_error(method, context)
  unless forward(method, context) || forward(:on_error, context.condition)
    forward(:on_unhandled, method, context)
    # Close the whole connection on an un-handled error
    context.connection.close(context.condition)
  end
end

#on_container_start(container) ⇒ Object



38
# File 'lib/handler/messaging_adapter.rb', line 38

def on_container_start(container) delegate(:on_container_start, container); end

#on_container_stop(container) ⇒ Object



39
# File 'lib/handler/messaging_adapter.rb', line 39

def on_container_stop(container) delegate(:on_container_stop, container); end

#on_delivery(event) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/handler/messaging_adapter.rb', line 79

def on_delivery(event)
  if event.link.receiver?       # Incoming message
    d = event.delivery
    if d.aborted?
      delegate(:on_delivery_abort, d)
    elsif d.complete?
      if d.link.local_closed? && d.receiver.auto_accept
        d.release         # Auto release after close
      else
        begin
          delegate(:on_message, d, d.message)
          d.accept if d.receiver.auto_accept  && !d.settled?
        rescue Reject
          d.reject unless d.settled?
        rescue Release
          d.release unless d.settled?
        end
      end
    end
    delegate(:on_delivery_settle, d) if d.settled?
    add_credit(event)
  else                      # Outgoing message
    t = event.tracker
    case t.state
    when Delivery::ACCEPTED then delegate(:on_tracker_accept, t)
    when Delivery::REJECTED then delegate(:on_tracker_reject, t)
    when Delivery::RELEASED then delegate(:on_tracker_release, t)
    when Delivery::MODIFIED then delegate(:on_tracker_modify, t)
    end
    delegate(:on_tracker_settle, t) if t.settled?
    t.settle if t.sender.auto_settle
  end
end


113
114
115
116
117
# File 'lib/handler/messaging_adapter.rb', line 113

def on_link_flow(event)
  add_credit(event)
  sender = event.sender
  delegate(:on_sendable, sender) if sender && sender.open? && sender.credit > 0
end

Add flow control for link opening events



75
# File 'lib/handler/messaging_adapter.rb', line 75

def on_link_local_open(event) add_credit(event); end


76
# File 'lib/handler/messaging_adapter.rb', line 76

def on_link_remote_open(event) super; add_credit(event); end

#on_transport_closed(event) ⇒ Object



70
71
72
# File 'lib/handler/messaging_adapter.rb', line 70

def on_transport_closed(event)
  delegate(:on_transport_close, event.context) rescue StopAutoResponse
end

#on_transport_error(event) ⇒ Object



66
67
68
# File 'lib/handler/messaging_adapter.rb', line 66

def on_transport_error(event)
  delegate_error(:on_transport_error, event.context)
end