Class: Jabber::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/xmpp4r/stream.rb

Overview

The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)

You may register callbacks for the three Jabber stanzas (message, presence and iq) and use the send and send_with_id methods.

To ensure the order of received stanzas, callback blocks are launched in the parser thread. If further blocking operations are intended in those callbacks, run your own thread there.

Direct Known Subclasses

Connection

Defined Under Namespace

Classes: ThreadBlock

Constant Summary collapse

DISCONNECTED =
1
CONNECTED =
2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeStream

Initialize a new stream



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/xmpp4r/stream.rb', line 43

def initialize
  @fd = nil
  @status = DISCONNECTED
  @xmlcbs = CallbackList.new
  @stanzacbs = CallbackList.new
  @messagecbs = CallbackList.new
  @iqcbs = CallbackList.new
  @presencecbs = CallbackList.new
  @send_lock = Mutex.new
  @last_send = Time.now
  @exception_block = nil
  @tbcbmutex = Mutex.new
  @threadblocks = []
  @wakeup_thread = nil
  @streamid = nil
  @streamns = 'jabber:client'
  @features_sem = Semaphore.new
  @parser_thread = nil
  @processing = 0
end

Instance Attribute Details

#fdObject (readonly)

file descriptor used



33
34
35
# File 'lib/xmpp4r/stream.rb', line 33

def fd
  @fd
end

#processingObject (readonly)

number of stanzas currently being processed



39
40
41
# File 'lib/xmpp4r/stream.rb', line 39

def processing
  @processing
end

#statusObject (readonly)

connection status



36
37
38
# File 'lib/xmpp4r/stream.rb', line 36

def status
  @status
end

Instance Method Details

#add_iq_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received Iqs

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



552
553
554
555
556
# File 'lib/xmpp4r/stream.rb', line 552

def add_iq_callback(priority = 0, ref = nil, &block)
  @tbcbmutex.synchronize do
    @iqcbs.add(priority, ref, block)
  end
end

#add_message_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received Messages

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



486
487
488
489
490
# File 'lib/xmpp4r/stream.rb', line 486

def add_message_callback(priority = 0, ref = nil, &block)
  @tbcbmutex.synchronize do
    @messagecbs.add(priority, ref, block)
  end
end

#add_presence_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received Presences

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



530
531
532
533
534
# File 'lib/xmpp4r/stream.rb', line 530

def add_presence_callback(priority = 0, ref = nil, &block)
  @tbcbmutex.synchronize do
    @presencecbs.add(priority, ref, block)
  end
end

#add_stanza_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received Stanzas

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



508
509
510
511
512
# File 'lib/xmpp4r/stream.rb', line 508

def add_stanza_callback(priority = 0, ref = nil, &block)
  @tbcbmutex.synchronize do
    @stanzacbs.add(priority, ref, block)
  end
end

#add_xml_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received XML messages, these will be handled before any blocks given to Stream#send or other callbacks.

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



464
465
466
467
468
# File 'lib/xmpp4r/stream.rb', line 464

def add_xml_callback(priority = 0, ref = nil, &block)
  @tbcbmutex.synchronize do
    @xmlcbs.add(priority, ref, block)
  end
end

#closeObject

Closes the connection to the Jabber service



570
571
572
# File 'lib/xmpp4r/stream.rb', line 570

def close
  close!
end

#close!Object



574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
# File 'lib/xmpp4r/stream.rb', line 574

def close!
  pr = 1
  n = 0
  # In some cases, we might lost count of some stanzas
  # (for example, if the handler raises an exception)
  # so we can't block forever.
  while pr > 0 and n <= 20
    @tbcbmutex.synchronize { pr = @processing }
    if pr > 0
      n += 1
      Jabber::debuglog("TRYING TO CLOSE, STILL PROCESSING #{pr} STANZAS")
      #puts("TRYING TO CLOSE, STILL PROCESSING #{pr} STANZAS")
      sleep 0.1
    end
  end

  # Order Matters here! If this method is called from within 
  # @parser_thread then killing @parser_thread first would 
  # mean the other parts of the method fail to execute. 
  # That would be bad. So kill parser_thread last
  @fd.close if @fd and !@fd.closed?
  @status = DISCONNECTED
  @parser_thread.kill if @parser_thread
end

#delete_iq_callback(ref) ⇒ Object

Delete an Iq callback

ref
String

The reference of the callback to delete



563
564
565
566
567
# File 'lib/xmpp4r/stream.rb', line 563

def delete_iq_callback(ref)
  @tbcbmutex.synchronize do
    @iqcbs.delete(ref)
  end
end

#delete_message_callback(ref) ⇒ Object

Delete an Message callback

ref
String

The reference of the callback to delete



496
497
498
499
500
# File 'lib/xmpp4r/stream.rb', line 496

def delete_message_callback(ref)
  @tbcbmutex.synchronize do
    @messagecbs.delete(ref)
  end
end

#delete_presence_callback(ref) ⇒ Object

Delete a Presence callback

ref
String

The reference of the callback to delete



540
541
542
543
544
# File 'lib/xmpp4r/stream.rb', line 540

def delete_presence_callback(ref)
  @tbcbmutex.synchronize do
    @presencecbs.delete(ref)
  end
end

#delete_stanza_callback(ref) ⇒ Object

Delete a Stanza callback

ref
String

The reference of the callback to delete



518
519
520
521
522
# File 'lib/xmpp4r/stream.rb', line 518

def delete_stanza_callback(ref)
  @tbcbmutex.synchronize do
    @stanzacbs.delete(ref)
  end
end

#delete_xml_callback(ref) ⇒ Object

Delete an XML-messages callback

ref
String

The reference of the callback to delete



474
475
476
477
478
# File 'lib/xmpp4r/stream.rb', line 474

def delete_xml_callback(ref)
  @tbcbmutex.synchronize do
    @xmlcbs.delete(ref)
  end
end

#iq_callbacksObject

Get the list of iq callbacks.



289
290
291
# File 'lib/xmpp4r/stream.rb', line 289

def iq_callbacks
  @iqcbs
end

#is_connected?Boolean

Returns if this connection is connected to a Jabber service

return
Boolean

Connection status

Returns:

  • (Boolean)


160
161
162
# File 'lib/xmpp4r/stream.rb', line 160

def is_connected?
  return @status == CONNECTED
end

#is_disconnected?Boolean

Returns if this connection is NOT connected to a Jabber service

return
Boolean

Connection status

Returns:

  • (Boolean)


168
169
170
# File 'lib/xmpp4r/stream.rb', line 168

def is_disconnected?
  return @status == DISCONNECTED
end

#message_callbacksObject

Get the list of message callbacks.



295
296
297
# File 'lib/xmpp4r/stream.rb', line 295

def message_callbacks
  @messagecbs
end

#on_exception(&block) ⇒ Object

Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.

The block has to take three arguments:

  • the Exception

  • the Jabber::Stream object (self)

  • a symbol where it happened, namely :start, :parser, :sending and :end



117
118
119
# File 'lib/xmpp4r/stream.rb', line 117

def on_exception(&block)
  @exception_block = block
end

#parse_failure(e) ⇒ Object

This method is called by the parser when a failure occurs



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/xmpp4r/stream.rb', line 123

def parse_failure(e)
  Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

  # A new thread has to be created because close will cause the thread
  # to commit suicide(???)
  if @exception_block
    # New thread, because close will kill the current thread
    Thread.new do
      Thread.current.abort_on_exception = true
      close
      @exception_block.call(e, self, :parser)
    end
  else
    Jabber::warnlog "Stream#parse_failure was called by XML parser. Dumping " +
      "backtrace...\n" + e.exception + "\n#{e.backtrace.join("\n")}"
    close
    raise
  end
end

#parser_endObject

This method is called by the parser upon receiving </stream:stream>



145
146
147
148
149
150
151
152
153
154
155
# File 'lib/xmpp4r/stream.rb', line 145

def parser_end
  if @exception_block
    Thread.new do
      Thread.current.abort_on_exception = true
      close
      @exception_block.call(nil, self, :close)
    end
  else
    close
  end
end

#presence_callbacksObject

Get the list of presence callbacks.



301
302
303
# File 'lib/xmpp4r/stream.rb', line 301

def presence_callbacks
  @presencecbs
end

#receive(element) ⇒ Object

Processes a received REXML::Element and executes registered thread blocks and filters against it.

element
REXML::Element

The received element



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/xmpp4r/stream.rb', line 177

def receive(element)
  @tbcbmutex.synchronize { @processing += 1 }
  Jabber::debuglog("RECEIVED:\n#{element.to_s}")

  if element.namespace('').to_s == '' # REXML namespaces are always strings
    element.add_namespace(@streamns)
  end

  case element.prefix
  when 'stream'
    case element.name
      when 'stream'
        stanza = element
        @streamid = element.attributes['id']
        @streamns = element.namespace('') if element.namespace('')

        # Hack: component streams are basically client streams.
        # Someday we may want to create special stanza classes
        # for components/s2s deriving from normal stanzas but
        # posessing these namespaces
        @streamns = 'jabber:client' if @streamns == 'jabber:component:accept'

        unless element.attributes['version']  # isn't XMPP compliant, so
          Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features")
          @features_sem.run                   # don't wait for <stream:features/>
        end
      when 'features'
        stanza = element
        element.each { |e|
          if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl'
            e.each_element('mechanism') { |mech|
              @stream_mechanisms.push(mech.text)
            }
          else
            @stream_features[e.name] = e.namespace
          end
        }
        Jabber::debuglog("FEATURES: received")
        @features_sem.run
      else
        stanza = element
    end
  else
    # Any stanza, classes are registered by XMPPElement::name_xmlns
    begin
      stanza = XMPPStanza::import(element)
    rescue NoNameXmlnsRegistered
      stanza = element
    end
  end

  if @xmlcbs.process(stanza)
    @tbcbmutex.synchronize { @processing -= 1 }
    return true
  end

  # Iterate through blocked threads (= waiting for an answer)
  #
  # We're dup'ping the @threadblocks here, so that we won't end up in an
  # endless loop if Stream#send is being nested. That means, the nested
  # threadblock won't receive the stanza currently processed, but the next
  # one.
  threadblocks = nil
  @tbcbmutex.synchronize do
    threadblocks = @threadblocks.dup
  end
  threadblocks.each { |threadblock|
    exception = nil
    r = false
    begin
      r = threadblock.call(stanza)
    rescue Exception => e
      exception = e
    end

    if r == true
      @tbcbmutex.synchronize do
        @threadblocks.delete(threadblock)
      end
      threadblock.wakeup
      @tbcbmutex.synchronize { @processing -= 1 }
      return true
    elsif exception
      @tbcbmutex.synchronize do
        @threadblocks.delete(threadblock)
      end
      threadblock.raise(exception)
    end
  }

  Jabber::debuglog("PROCESSING:\n#{stanza.to_s} (#{stanza.class})")
  Jabber::debuglog("TRYING stanzacbs...")
  if @stanzacbs.process(stanza)
      @tbcbmutex.synchronize { @processing -= 1 }
      return true
  end
  r = false
  Jabber::debuglog("TRYING message/iq/presence/cbs...")
  case stanza
  when Message
    r = @messagecbs.process(stanza)
  when Iq
    r = @iqcbs.process(stanza)
  when Presence
    r = @presencecbs.process(stanza)
  end
  @tbcbmutex.synchronize { @processing -= 1 }
  return r
end

#send(xml, &block) ⇒ Object

Sends XML data to the socket and (optionally) waits to process received data.

Do not invoke this in a callback but in a seperate thread because we may not suspend the parser-thread (in whose context callbacks are executed).

xml
String

The xml data to send

&block
Block

The optional block



361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
# File 'lib/xmpp4r/stream.rb', line 361

def send(xml, &block)
  Jabber::debuglog("SENDING:\n#{xml}")
  if block
    threadblock = ThreadBlock.new(block)
    @tbcbmutex.synchronize do
      @threadblocks.unshift(threadblock)
    end
  end
  begin
    # Temporarily remove stanza's namespace to
    # reduce bandwidth consumption
    if xml.kind_of? XMPPStanza and xml.namespace == 'jabber:client' and
        xml.prefix != 'stream' and xml.name != 'stream'
      xml.delete_namespace
      send_data(xml.to_s)
      xml.add_namespace(@streamns)
    else
      send_data(xml.to_s)
    end
  rescue Exception => e
    Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

    if @exception_block
      Thread.new do
        Thread.current.abort_on_exception = true
        close!
        @exception_block.call(e, self, :sending)
      end
    else
      Jabber::warnlog "Exception caught while sending! (#{e.class})\n#{e.backtrace.join("\n")}"
      close!
      raise
    end
  end
  # The parser thread might be running this (think of a callback running send())
  # If this is the case, we mustn't stop (or we would cause a deadlock)
  if block and Thread.current != @parser_thread
    threadblock.wait
  elsif block
    Jabber::warnlog("WARNING:\nCannot stop current thread in Jabber::Stream#send because it is the parser thread!")
  end
end

#send_data(data) ⇒ Object



343
344
345
346
347
348
349
# File 'lib/xmpp4r/stream.rb', line 343

def send_data(data)
  @send_lock.synchronize do
    @last_send = Time.now
    @fd << data
    @fd.flush
  end
end

#send_with_id(xml, &block) ⇒ Object

Send an XMMP stanza with an Jabber::XMPPStanza#id. The id will be generated by Jabber::IdGenerator if not already set.

The block will be called once: when receiving a stanza with the same Jabber::XMPPStanza#id. There is no need to return true to complete this! Instead the return value of the block will be returned. This is a direct result of unique request/response stanza identification via the id attribute.

The block may be omitted. Then, the result will be the response stanza.

Be aware that if a stanza with type='error' is received the function does not yield but raises an ServerError with the corresponding error element.

Please see Stream#send for some implementational details.

Please read the note about nesting at Stream#send

xml
XMPPStanza


425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/xmpp4r/stream.rb', line 425

def send_with_id(xml, &block)
  if xml.id.nil?
    xml.id = Jabber::IdGenerator.instance.generate_id
  end

  res = nil
  error = nil
  send(xml) do |received|
    if received.kind_of? XMPPStanza and received.id == xml.id
      if received.type == :error
        error = (received.error ? received.error : ErrorResponse.new)
        true
      elsif block_given?
        res = yield(received)
        true
      else
        res = received
        true
      end
    else
      false
    end
  end

  unless error.nil?
    raise ServerError.new(error)
  end

  res
end

#stanza_callbacksObject

Get the list of stanza callbacks.



307
308
309
# File 'lib/xmpp4r/stream.rb', line 307

def stanza_callbacks
  @stanzacbs
end

#start(fd) ⇒ Object

Start the XML parser on the fd



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/xmpp4r/stream.rb', line 66

def start(fd)
  @stream_mechanisms = []
  @stream_features = {}

  @fd = fd
  @parser = StreamParser.new(@fd, self)
  @parser_thread = Thread.new do
    Thread.current.abort_on_exception = true
    begin
      @parser.parse
      Jabber::debuglog("DISCONNECTED\n")

      if @exception_block
        Thread.new { close!; @exception_block.call(nil, self, :disconnected) }
      else
        close!
      end
    rescue Exception => e
      Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

      if @exception_block
        Thread.new do
          Thread.current.abort_on_exception = true
          close
          @exception_block.call(e, self, :start)
        end
      else
        Jabber::warnlog "Exception caught in Parser thread! (#{e.class})\n#{e.backtrace.join("\n")}"
        close!
        raise
      end
    end
  end

  @status = CONNECTED
end

#stopObject



103
104
105
106
# File 'lib/xmpp4r/stream.rb', line 103

def stop
  @parser_thread.kill
  @parser = nil
end

#xml_callbacksObject

Get the list of xml callbacks.



313
314
315
# File 'lib/xmpp4r/stream.rb', line 313

def xml_callbacks
  @xmlcbs
end