Class: Jabber::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/vendor/xmpp4r/lib/xmpp4r/stream.rb

Overview

The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)

You may register callbacks for the three Jabber stanzas (message, presence and iq) and use the send and send_with_id methods.

To ensure the order of received stanzas, callback blocks are launched in the parser thread. If further blocking operations are intended in those callbacks, run your own thread there.

Direct Known Subclasses

Connection

Defined Under Namespace

Classes: ThreadBlock

Constant Summary collapse

DISCONNECTED =
1
CONNECTED =
2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeStream

Initialize a new stream



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 40

def initialize
  @fd = nil
  @status = DISCONNECTED
  @xmlcbs = CallbackList.new
  @stanzacbs = CallbackList.new
  @messagecbs = CallbackList.new
  @iqcbs = CallbackList.new
  @presencecbs = CallbackList.new
  @send_lock = Mutex.new
  @last_send = Time.now
  @exception_block = nil
  @threadblocks = []
  @wakeup_thread = nil
  @streamid = nil
  @streamns = 'jabber:client'
  @features_sem = Semaphore.new
  @parser_thread = nil
end

Instance Attribute Details

#fdObject (readonly)

file descriptor used



33
34
35
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 33

def fd
  @fd
end

#statusObject (readonly)

connection status



36
37
38
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 36

def status
  @status
end

Instance Method Details

#add_iq_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received Iqs

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



473
474
475
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 473

def add_iq_callback(priority = 0, ref = nil, &block)
  @iqcbs.add(priority, ref, block)
end

#add_message_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received Messages

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



419
420
421
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 419

def add_message_callback(priority = 0, ref = nil, &block)
  @messagecbs.add(priority, ref, block)
end

#add_presence_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received Presences

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



455
456
457
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 455

def add_presence_callback(priority = 0, ref = nil, &block)
  @presencecbs.add(priority, ref, block)
end

#add_stanza_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received Stanzas

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



437
438
439
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 437

def add_stanza_callback(priority = 0, ref = nil, &block)
  @stanzacbs.add(priority, ref, block)
end

#add_xml_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received XML messages

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



401
402
403
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 401

def add_xml_callback(priority = 0, ref = nil, &block)
  @xmlcbs.add(priority, ref, block)
end

#closeObject

Closes the connection to the Jabber service



487
488
489
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 487

def close
  close!
end

#close!Object



491
492
493
494
495
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 491

def close!
  @parser_thread.kill if @parser_thread
  @fd.close if @fd and !@fd.closed?
  @status = DISCONNECTED
end

#delete_iq_callback(ref) ⇒ Object

Delete an Iq callback

ref
String

The reference of the callback to delete



482
483
484
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 482

def delete_iq_callback(ref)
  @iqcbs.delete(ref)
end

#delete_message_callback(ref) ⇒ Object

Delete an Message callback

ref
String

The reference of the callback to delete



427
428
429
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 427

def delete_message_callback(ref)
  @messagecbs.delete(ref)
end

#delete_presence_callback(ref) ⇒ Object

Delete a Presence callback

ref
String

The reference of the callback to delete



463
464
465
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 463

def delete_presence_callback(ref)
  @presencecbs.delete(ref)
end

#delete_stanza_callback(ref) ⇒ Object

Delete a Stanza callback

ref
String

The reference of the callback to delete



445
446
447
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 445

def delete_stanza_callback(ref)
  @stanzacbs.delete(ref)
end

#delete_xml_callback(ref) ⇒ Object

Delete an XML-messages callback

ref
String

The reference of the callback to delete



409
410
411
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 409

def delete_xml_callback(ref)
  @xmlcbs.delete(ref)
end

#is_connected?Boolean

Returns if this connection is connected to a Jabber service

return
Boolean

Connection status

Returns:

  • (Boolean)


155
156
157
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 155

def is_connected?
  return @status == CONNECTED
end

#is_disconnected?Boolean

Returns if this connection is NOT connected to a Jabber service

return
Boolean

Connection status

Returns:

  • (Boolean)


163
164
165
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 163

def is_disconnected?
  return @status == DISCONNECTED
end

#on_exception(&block) ⇒ Object

Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.

The block has to take three arguments:

  • the Exception

  • the Jabber::Stream object (self)

  • a symbol where it happened, namely :start, :parser, :sending and :end



112
113
114
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 112

def on_exception(&block)
  @exception_block = block
end

#parse_failure(e) ⇒ Object

This method is called by the parser when a failure occurs



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 118

def parse_failure(e)
  Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

  # A new thread has to be created because close will cause the thread
  # to commit suicide(???)
  if @exception_block
    # New thread, because close will kill the current thread
    Thread.new do
      Thread.current.abort_on_exception = true
      close
      @exception_block.call(e, self, :parser)
    end
  else
    Jabber::debuglog "Stream#parse_failure was called by XML parser. Dumping " +
      "backtrace...\n" + e.exception + "\n#{e.backtrace.join("\n")}"
    close
    raise
  end
end

#parser_endObject

This method is called by the parser upon receiving </stream:stream>



140
141
142
143
144
145
146
147
148
149
150
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 140

def parser_end
  if @exception_block
    Thread.new do
      Thread.current.abort_on_exception = true
      close
      @exception_block.call(nil, self, :close)
    end
  else
    close
  end
end

#receive(element) ⇒ Object

Processes a received REXML::Element and executes registered thread blocks and filters against it.

element
REXML::Element

The received element



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 172

def receive(element)
  Jabber::debuglog("RECEIVED:\n#{element.to_s}")

  if element.namespace('').to_s == '' # REXML namespaces are always strings
    element.add_namespace(@streamns)
  end

  case element.prefix
  when 'stream'
    case element.name
      when 'stream'
        stanza = element
        @streamid = element.attributes['id']
        @streamns = element.namespace('') if element.namespace('')

        # Hack: component streams are basically client streams.
        # Someday we may want to create special stanza classes
        # for components/s2s deriving from normal stanzas but
        # posessing these namespaces
        @streamns = 'jabber:client' if @streamns == 'jabber:component:accept'

        unless element.attributes['version']  # isn't XMPP compliant, so
          Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features")
          @features_sem.run                   # don't wait for <stream:features/>
        end
      when 'features'
        stanza = element
        element.each { |e|
          if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl'
            e.each_element('mechanism') { |mech|
              @stream_mechanisms.push(mech.text)
            }
          else
            @stream_features[e.name] = e.namespace
          end
        }
        Jabber::debuglog("FEATURES: received")
        @features_sem.run
      else
        stanza = element
    end
  else
    # Any stanza, classes are registered by XMPPElement::name_xmlns
    begin
      stanza = XMPPStanza::import(element)
    rescue NoNameXmlnsRegistered
      stanza = element
    end
  end

  # Iterate through blocked threads (= waiting for an answer)
  #
  # We're dup'ping the @threadblocks here, so that we won't end up in an
  # endless loop if Stream#send is being nested. That means, the nested
  # threadblock won't receive the stanza currently processed, but the next
  # one.
  threadblocks = @threadblocks.dup
  threadblocks.each { |threadblock|
    exception = nil
    r = false
    begin
      r = threadblock.call(stanza)
    rescue Exception => e
      exception = e
    end

    if r == true
      @threadblocks.delete(threadblock)
      threadblock.wakeup
      return
    elsif exception
      @threadblocks.delete(threadblock)
      threadblock.raise(exception)
    end
  }

  Jabber::debuglog("PROCESSING:\n#{stanza.to_s} (#{stanza.class})")
  return true if @xmlcbs.process(stanza)
  return true if @stanzacbs.process(stanza)
  case stanza
  when Message
    return true if @messagecbs.process(stanza)
  when Iq
    return true if @iqcbs.process(stanza)
  when Presence
    return true if @presencecbs.process(stanza)
  end
end

#send(xml, &block) ⇒ Object

Sends XML data to the socket and (optionally) waits to process received data.

Do not invoke this in a callback but in a seperate thread because we may not suspend the parser-thread (in whose context callbacks are executed).

xml
String

The xml data to send

&block
Block

The optional block



306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 306

def send(xml, &block)
  Jabber::debuglog("SENDING:\n#{xml}")
  @threadblocks.unshift(threadblock = ThreadBlock.new(block)) if block
  begin
    # Temporarily remove stanza's namespace to
    # reduce bandwidth consumption
    if xml.kind_of? XMPPStanza and xml.namespace == 'jabber:client'
      xml.delete_namespace
      send_data(xml.to_s)
      xml.add_namespace(@streamns)
    else
      send_data(xml.to_s)
    end
  rescue Exception => e
    Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

    if @exception_block
      Thread.new do
        Thread.current.abort_on_exception = true
        close!
        @exception_block.call(e, self, :sending)
      end
    else
      Jabber::debuglog "Exception caught while sending! (#{e.class})\n#{e.backtrace.join("\n")}"
      close!
      raise
    end
  end
  # The parser thread might be running this (think of a callback running send())
  # If this is the case, we mustn't stop (or we would cause a deadlock)
  if block and Thread.current != @parser_thread
    threadblock.wait
  elsif block
    Jabber::debuglog("WARNING:\nCannot stop current thread in Jabber::Stream#send because it is the parser thread!")
  end
end

#send_data(data) ⇒ Object



288
289
290
291
292
293
294
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 288

def send_data(data)
  @send_lock.synchronize do
    @last_send = Time.now
    @fd << data
    @fd.flush
  end
end

#send_with_id(xml, &block) ⇒ Object

Send an XMMP stanza with an Jabber::XMPPStanza#id. The id will be generated by Jabber::IdGenerator if not already set.

The block will be called once: when receiving a stanza with the same Jabber::XMPPStanza#id. There is no need to return true to complete this! Instead the return value of the block will be returned. This is a direct result of unique request/response stanza identification via the id attribute.

The block may be omitted. Then, the result will be the response stanza.

Be aware that if a stanza with type='error' is received the function does not yield but raises an ServerError with the corresponding error element.

Please see Stream#send for some implementational details.

Please read the note about nesting at Stream#send

xml
XMPPStanza


364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 364

def send_with_id(xml, &block)
  if xml.id.nil?
    xml.id = Jabber::IdGenerator.instance.generate_id
  end

  res = nil
  error = nil
  send(xml) do |received|
    if received.kind_of? XMPPStanza and received.id == xml.id
      if received.type == :error
        error = (received.error ? received.error : ErrorResponse.new)
        true
      elsif block_given?
        res = yield(received)
        true
      else
        res = received
        true
      end
    else
      false
    end
  end

  unless error.nil?
    raise ServerError.new(error)
  end

  res
end

#start(fd) ⇒ Object

Start the XML parser on the fd



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 61

def start(fd)
  @stream_mechanisms = []
  @stream_features = {}

  @fd = fd
  @parser = StreamParser.new(@fd, self)
  @parser_thread = Thread.new do
    Thread.current.abort_on_exception = true
    begin
      @parser.parse
      Jabber::debuglog("DISCONNECTED\n")

      if @exception_block
        Thread.new { close!; @exception_block.call(nil, self, :disconnected) }
      else
        close!
      end
    rescue Exception => e
      Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

      if @exception_block
        Thread.new do
          Thread.current.abort_on_exception = true
          close
          @exception_block.call(e, self, :start)
        end
      else
        Jabber::debuglog "Exception caught in Parser thread! (#{e.class})\n#{e.backtrace.join("\n")}"
        close!
        raise
      end
    end
  end

  @status = CONNECTED
end

#stopObject



98
99
100
101
# File 'lib/vendor/xmpp4r/lib/xmpp4r/stream.rb', line 98

def stop
  @parser_thread.kill
  @parser = nil
end