Class: XRBP::WebSocket::Plugins::MessageDispatcher
- Inherits:
-
PluginBase
- Object
- PluginBase
- XRBP::WebSocket::Plugins::MessageDispatcher
- Defined in:
- lib/xrbp/websocket/plugins/message_dispatcher.rb
Overview
Dispatch messages & wait for responses (w/ optional timeout). This module allows the client to track messages sent to the server, waiting for responses up to a maximum time. An overridable callback method is provided to match responses to messages. Most often the end-user will not use this plugin directly but rather through CommandDispatcher which inherits it / extends it to issue and track structured commands.
Direct Known Subclasses
Constant Summary collapse
- DEFAULT_TIMEOUT =
10
Instance Attribute Summary collapse
-
#message_timeout ⇒ Object
Returns the value of attribute message_timeout.
-
#messages ⇒ Object
readonly
Returns the value of attribute messages.
Attributes inherited from PluginBase
Instance Method Summary collapse
- #added ⇒ Object
- #cancel_all_messages ⇒ Object
-
#cancel_message(msg) ⇒ Object
FIXME: I believe there is issue causing deadlock at process termination where subsequent pages in paginated cmds are timing out.
- #closed ⇒ Object
-
#initialize(connection) ⇒ MessageDispatcher
constructor
A new instance of MessageDispatcher.
-
#match_message(msg) ⇒ Object
Should be overridden in subclass return request message & formatted response given raw response.
- #message(res) ⇒ Object
- #opened ⇒ Object
- #parsing_plugins ⇒ Object
- #try_next(msg) ⇒ Object
-
#unlock!(req, res) ⇒ Object
Return bool if message,response is read to be unlocked / returned to client.
Constructor Details
#initialize(connection) ⇒ MessageDispatcher
Returns a new instance of MessageDispatcher.
26 27 28 29 30 |
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 26 def initialize(connection) super(connection) @message_timeout = DEFAULT_TIMEOUT @messages = [] end |
Instance Attribute Details
#message_timeout ⇒ Object
Returns the value of attribute message_timeout.
24 25 26 |
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 24 def @message_timeout end |
#messages ⇒ Object (readonly)
Returns the value of attribute messages.
23 24 25 |
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 23 def @messages end |
Instance Method Details
#added ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 32 def added plugin = self connection.define_instance_method(:message_timeout=) do |t| plugin. = t connections.each{ |c| c.plugin(MessageDispatcher) . = t } if self.kind_of?(MultiConnection) end connection.define_instance_method(:msg) do |msg, &bl| return next_connection.msg msg, &bl if self.kind_of?(MultiConnection) msg = Message.new(msg) unless msg.kind_of?(Message) msg.connection = self msg.time = Time.now msg.bl = bl if bl unless self.open? if plugin.try_next(msg) return nil if bl msg.wait return msg.result else msg.bl.call nil if bl return nil end end plugin. << msg send_data msg.to_s return nil if bl msg.wait msg.result end connection.on :close do plugin. end unless connection.kind_of?(MultiConnection) end |
#cancel_all_messages ⇒ Object
136 137 138 139 140 141 |
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 136 def # copy array as we modify original during iteration Array.new().each { |msg| (msg) } end |
#cancel_message(msg) ⇒ Object
FIXME: I believe there is issue causing deadlock at process
termination where subsequent pages in paginated cmds
are timing out. Since when retrieving messages
synchronously, the first message block will be used
to wait for the results and on timeout cancel_message
will be called with the _latest_ message, the wait
block never gets unlocked.
129 130 131 132 133 134 |
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 129 def (msg) connection.state_mutex.synchronize { .delete(msg) msg.signal } end |
#closed ⇒ Object
171 172 173 |
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 171 def closed terminate! end |
#match_message(msg) ⇒ Object
Should be overridden in subclass return request message & formatted response given raw response
81 82 83 |
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 81 def (msg) nil end |
#message(res) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 93 def (res) req, res = (res) return unless req .delete(req) return unless unlock!(req, res) begin res = parse_result(res, req) rescue Exception => e if try_next(req) return else res = nil end end req.bl.call(res) end |
#opened ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 145 def opened connection.add_work do # XXX remove force_quit? condition check from this loop, # so we're sure messages always timeout, even on force quit. # Always ensure close! is called after websocket is no longer # being used! until terminate? || connection.closed? now = Time.now tmsgs = Array.new() tmsgs.each { |msg| if now - msg.time > @message_timeout connection.emit :timeout, msg (msg) unless try_next(msg) # XXX manually close the connection as # a broken pipe will not stop websocket polling connection.async_close! end } connection.rsleep(0.1) end end end |
#parsing_plugins ⇒ Object
19 20 21 |
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 19 def parsing_plugins connection.plugins end |
#try_next(msg) ⇒ Object
114 115 116 117 118 119 120 |
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 114 def try_next(msg) conn = connection.next_connection(msg.connection) return false unless !!conn .delete(msg) conn.msg(msg, &msg.bl) true end |
#unlock!(req, res) ⇒ Object
Return bool if message,response is read to be unlocked / returned to client. Allows other plugins to block message unlocking
87 88 89 90 91 |
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 87 def unlock!(req, res) !connection.plugins.any? { |plg| plg != self && plg.respond_to?(:unlock!) && !plg.unlock!(req, res) } end |