Module: RStyx::Client::StyxClient
- Defined in:
- lib/rstyx/client.rb
Overview
Message receiving module for the Styx client. The client will assemble all inbound messages.
Instance Attribute Summary collapse
-
#keyringauth ⇒ Object
Returns the value of attribute keyringauth.
-
#sentmessages ⇒ Object
Returns the value of attribute sentmessages.
Instance Method Summary collapse
-
#disconnect ⇒ Object
Disconnect from the remote server.
- #post_init ⇒ Object
-
#receive_data(data) ⇒ Object
Receive data from the network connection, called by EventMachine.
-
#send_message(message, timeout = 0) ⇒ Object
Send a message synchronously.
-
#send_message_async(message, &block) ⇒ Object
Send a message asynchronously.
Instance Attribute Details
#keyringauth ⇒ Object
Returns the value of attribute keyringauth.
41 42 43 |
# File 'lib/rstyx/client.rb', line 41 def keyringauth @keyringauth end |
#sentmessages ⇒ Object
Returns the value of attribute sentmessages.
41 42 43 |
# File 'lib/rstyx/client.rb', line 41 def @sentmessages end |
Instance Method Details
#disconnect ⇒ Object
Disconnect from the remote server.
178 179 180 181 182 183 184 185 |
# File 'lib/rstyx/client.rb', line 178 def disconnect # flush all outstanding messages before disconnect .each_key do |tag| rflush = (Message::Tflush.new(:oldtag => tag)) end EventMachine::stop_event_loop end |
#post_init ⇒ Object
43 44 45 46 47 48 |
# File 'lib/rstyx/client.rb', line 43 def post_init @msgbuffer = "" @lock = Mutex.new @sentmessages = {} @keyringauth = nil end |
#receive_data(data) ⇒ Object
Receive data from the network connection, called by EventMachine.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/rstyx/client.rb', line 117 def receive_data(data) # If we are in keyring authentication mode, write any data received # into the @keyringauth's buffer, and simply return. unless (@keyringauth.nil?) @keyringauth << data return end @msgbuffer << data DEBUG > 1 && puts(" << #{data.unpack("H*").inspect}") while @msgbuffer.length > 4 length = @msgbuffer.unpack("V")[0] # Break out if there is not enough data in the message # buffer to construct a message. if @msgbuffer.length < length break end # Decode the received data , @msgbuffer = @msgbuffer.unpack("a#{length}a*") styxmsg = Message::StyxMessage.from_bytes() DEBUG > 0 && puts(" << #{styxmsg.to_s}") # and look for its tag in the hash of received messages tmsg, cb = @lock.synchronize do @sentmessages.delete(styxmsg.tag) end if tmsg.nil? # Discard unrecognized messages. next end if styxmsg.class == Message::Rflush # We need to delete the oldtag as well, and send the # rflush to the original sender if possible, so they # don't wait forever. if tmsg.respond_to?(:oldtag) otmsg, ocb = @lock.synchronize do @sentmessages.delete(tmsg.oldtag) end end if !otmsg.nil? && !ocb.nil? ocb.call(otmsg, styxmsg) end end # Now, activate the callback block. if !(tmsg.nil? || cb.nil?) cb.call(tmsg, styxmsg) end # after all this is done, there may still be enough data in # the message buffer for more messages so keep looping. end # If we get here, we don't have enough data in the buffer to # build a new message. end |
#send_message(message, timeout = 0) ⇒ Object
Send a message synchronously. If an error occurs, a StyxException is raised.
message
-
The Styx message to send.
timeout
-
optional timeout for receiving the response.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/rstyx/client.rb', line 88 def (, timeout=0) # The queue here holds the response message, and is used # to communicate with the receive_data thread, which ultimately # calls the block passed to send_message_async. queue = Queue.new () do |tx,rx| # Enqueue the response message -- this runs in the # receive_data thread queue << rx end Timeout::timeout(timeout, StyxException.new("timeout waiting for a reply to #{.to_s}")) do # This will block until the queue contains something resp = queue.shift # Check the response to see if it is the response to # the transmitted message. if resp.class == Message::Rerror raise StyxException.new("#{resp.ename}") end if resp.ident != .ident + 1 raise StyxException.new("Unexpected #{resp.to_s} received in response to #{.to_s}") end return(resp) end end |
#send_message_async(message, &block) ⇒ Object
Send a message asynchronously.
message
- StyxMessage
-
the message to be sent
block
- Proc
-
the callback to use
- return
- Fixnum
-
the tag number used.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/rstyx/client.rb', line 57 def (, &block) # store the message and callback indexed by tag @lock.synchronize do if .tag.nil? # If a tag has not been explicitly specified, get # a new tag for the message. We use the current # thread's object ID as the base and use what # amounts to a linear probing algorithm to # determine a new tag in case of collisions. tag = Thread.current.object_id % MAX_TAG while @sentmessages.has_key?(tag) tag += 1 end .tag = tag end @sentmessages[.tag] = [, block] end DEBUG > 0 && puts(" >> #{.to_s}") DEBUG > 1 && puts(" >> #{.to_bytes.unpack("H*").inspect}") send_data(.to_bytes) return(.tag) end |