Class: Wamp::Client::Session

Inherits:
Object
  • Object
show all
Includes:
Check
Defined in:
lib/wamp/client/session.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Check

included

Constructor Details

#initialize(transport, options = {}) ⇒ Session

Constructor

Parameters:

  • transport (Wamp::Client::Transport::Base)

    The transport that the session will use

  • options (Hash) (defaults to: {})

    Hash containing different session options

Options Hash (options):

  • :authid (String)

    The authentication ID

  • :authmethods (Array)

    Different auth methods that this client supports



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/wamp/client/session.rb', line 215

def initialize(transport, options={})

  # Parameters
  self.id = nil
  self.realm = nil
  self.options = options || {}

  # Outstanding Requests
  self._requests = {
      publish: {},
      subscribe: {},
      unsubscribe: {},
      call: {},
      register: {},
      unregister: {}
  }

  # Init Subs and Regs in place
  self._subscriptions = {}
  self._registrations = {}
  self._defers = {}

  # Setup Transport
  self.transport = transport
  self.transport.on_message do |msg|
    self._receive_message(msg)
  end

  # Other parameters
  self._goodbye_sent = false

  # Setup session callbacks
  @on_join = nil
  @on_leave = nil
  @on_challenge = nil

end

Instance Attribute Details

#_defersObject

Private attributes



208
209
210
# File 'lib/wamp/client/session.rb', line 208

def _defers
  @_defers
end

#_goodbye_sentObject

Private attributes



208
209
210
# File 'lib/wamp/client/session.rb', line 208

def _goodbye_sent
  @_goodbye_sent
end

#_registrationsObject

Private attributes



208
209
210
# File 'lib/wamp/client/session.rb', line 208

def _registrations
  @_registrations
end

#_requestsObject

Private attributes



208
209
210
# File 'lib/wamp/client/session.rb', line 208

def _requests
  @_requests
end

#_subscriptionsObject

Private attributes



208
209
210
# File 'lib/wamp/client/session.rb', line 208

def _subscriptions
  @_subscriptions
end

#idObject

Returns the value of attribute id.



205
206
207
# File 'lib/wamp/client/session.rb', line 205

def id
  @id
end

#optionsObject

Returns the value of attribute options.



205
206
207
# File 'lib/wamp/client/session.rb', line 205

def options
  @options
end

#realmObject

Returns the value of attribute realm.



205
206
207
# File 'lib/wamp/client/session.rb', line 205

def realm
  @realm
end

#transportObject

Returns the value of attribute transport.



205
206
207
# File 'lib/wamp/client/session.rb', line 205

def transport
  @transport
end

Instance Method Details

#_error_to_hash(msg) ⇒ Object

Converts and error message to a hash

Parameters:



306
307
308
309
310
311
312
# File 'lib/wamp/client/session.rb', line 306

def _error_to_hash(msg)
  {
      error: msg.error,
      args: msg.arguments,
      kwargs: msg.argumentskw
  }
end

#_generate_idObject

Generates an ID according to the specification (Section 5.1.2)



300
301
302
# File 'lib/wamp/client/session.rb', line 300

def _generate_id
  rand(0..9007199254740992)
end

#_process_CALL_error(msg) ⇒ Object

Processes an error from a call request

Parameters:



966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
# File 'lib/wamp/client/session.rb', line 966

def _process_CALL_error(msg)

  # Remove the pending publish and inform the caller of the failure
  call = self._requests[:call].delete(msg.request_request)
  if call

    details = msg.details || {}
    details[:procedure] = call[:p] unless details[:procedure]
    details[:type] = 'call'
    details[:session] = self

    c = call[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_EVENT(msg) ⇒ Object

Processes and event from the broker

Parameters:



477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
# File 'lib/wamp/client/session.rb', line 477

def _process_EVENT(msg)

  args = msg.publish_arguments || []
  kwargs = msg.publish_argumentskw || {}

  s = self._subscriptions[msg.subscribed_subscription]
  if s
    details = msg.details || {}
    details[:publication] = msg.published_publication
    details[:session] = self

    h = s.handler
    h.call(args, kwargs, details) if h
  end

end

#_process_INTERRUPT(msg) ⇒ Object

Processes the interrupt

Parameters:



803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
# File 'lib/wamp/client/session.rb', line 803

def _process_INTERRUPT(msg)

  request = msg.invocation_request
  mode = msg.options[:mode]

  defer = self._defers[request]
  if defer
    r = self._registrations[defer.registration]
    if r
      # If it exists, call the interrupt handler to inform it of the interrupt
      i = r.i_handler
      error = nil
      if i
        begin
          error = i.call(request, mode)
        rescue Exception => e
          error = e
        end
      end

      error ||= 'interrupt'

      # Send the error back to the client
      self._send_INVOCATION_error(request, error, true)
    end

    # Delete the defer
    self._defers.delete(request)
  end

end

#_process_INVOCATION(msg) ⇒ Object

Processes and event from the broker

Parameters:



744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
# File 'lib/wamp/client/session.rb', line 744

def _process_INVOCATION(msg)

  request = msg.request
  args = msg.call_arguments || []
  kwargs = msg.call_argumentskw || {}

  details = msg.details || {}
  details[:request] = request
  details[:session] = self

  r = self._registrations[msg.registered_registration]
  if r
    h = r.handler
    if h
      begin
        value = h.call(args, kwargs, details)

        # If a defer was returned, handle accordingly
        if value.is_a? Wamp::Client::Defer::CallDefer
          value.request = request
          value.registration = msg.registered_registration

          # Store the defer
          self._defers[request] = value

          # On complete, send the result
          value.on_complete do |defer, result|
            self.yield(defer.request, result, {}, true)
            self._defers.delete(defer.request)
          end

          # On error, send the error
          value.on_error do |defer, error|
            self._send_INVOCATION_error(defer.request, error, true)
            self._defers.delete(defer.request)
          end

          # For progressive, return the progress
          if value.is_a? Wamp::Client::Defer::ProgressiveCallDefer
            value.on_progress do |defer, result|
              self.yield(defer.request, result, {progress: true}, true)
            end
          end

          # Else it was a normal response
        else
          self.yield(request, value)
        end

      rescue Exception => error
        self._send_INVOCATION_error(request, error)
      end

    end
  end
end

#_process_PUBLISH_error(msg) ⇒ Object

Processes an error from a publish request

Parameters:



609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
# File 'lib/wamp/client/session.rb', line 609

def _process_PUBLISH_error(msg)

  # Remove the pending publish and inform the caller of the failure
  s = self._requests[:publish].delete(msg.request_request)
  if s

    details = msg.details || {}
    details[:topic] = s[:t] unless details[:topic]
    details[:type] = 'publish'
    details[:session] = self

    c = s[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_PUBLISHED(msg) ⇒ Object

Processes the response to a publish request

Parameters:



589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
# File 'lib/wamp/client/session.rb', line 589

def _process_PUBLISHED(msg)

  # Remove the pending publish and alert the callback
  p = self._requests[:publish].delete(msg.publish_request)
  if p

    details = {}
    details[:topic] = p[:t]
    details[:type] = 'publish'
    details[:publication] = msg.publication
    details[:session] = self

    c = p[:c]
    c.call(p, nil, details) if c
  end

end

#_process_REGISTER_error(msg) ⇒ Object

Processes an error from a request

Parameters:



678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
# File 'lib/wamp/client/session.rb', line 678

def _process_REGISTER_error(msg)

  # Remove the pending registration and inform the caller of the failure
  r = self._requests[:register].delete(msg.request_request)
  if r

    details = msg.details || {}
    details[:procedure] = r[:p] unless details[:procedure]
    details[:type] = 'register'
    details[:session] = self

    c = r[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_REGISTERED(msg) ⇒ Object

Processes the response to a register request

Parameters:



657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
# File 'lib/wamp/client/session.rb', line 657

def _process_REGISTERED(msg)

  # Remove the pending subscription, add it to the registered ones, and inform the caller
  r = self._requests[:register].delete(msg.register_request)
  if r
    n_r = Registration.new(r[:p], r[:h], r[:o], r[:i], self, msg.registration)
    self._registrations[msg.registration] = n_r

    details = {}
    details[:procedure] = r[:p]
    details[:type] = 'register'
    details[:session] = self

    c = r[:c]
    c.call(n_r, nil, details) if c
  end

end

#_process_RESULT(msg) ⇒ Object

Processes the response to a publish request

Parameters:



944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
# File 'lib/wamp/client/session.rb', line 944

def _process_RESULT(msg)

  details = msg.details || {}

  call = self._requests[:call][msg.call_request]

  # Don't remove if progress is true and the options had receive_progress true
  self._requests[:call].delete(msg.call_request) unless (details[:progress] and (call and call[:o][:receive_progress]))

  if call
    details[:procedure] = call[:p] unless details[:procedure]
    details[:type] = 'call'
    details[:session] = self

    c = call[:c]
    c.call(CallResult.new(msg.yield_arguments, msg.yield_argumentskw), nil, details) if c
  end

end

#_process_SUBSCRIBE_error(msg) ⇒ Object

Processes an error from a request

Parameters:



458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
# File 'lib/wamp/client/session.rb', line 458

def _process_SUBSCRIBE_error(msg)

  # Remove the pending subscription and inform the caller of the failure
  s = self._requests[:subscribe].delete(msg.request_request)
  if s

    details = msg.details || {}
    details[:topic] = s[:t] unless details[:topic]
    details[:type] = 'subscribe'
    details[:session] = self

    c = s[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_SUBSCRIBED(msg) ⇒ Object

Processes the response to a subscribe request

Parameters:



437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/wamp/client/session.rb', line 437

def _process_SUBSCRIBED(msg)

  # Remove the pending subscription, add it to the registered ones, and inform the caller
  s = self._requests[:subscribe].delete(msg.subscribe_request)
  if s

    details = {}
    details[:topic] = s[:t] unless details[:topic]
    details[:type] = 'subscribe'
    details[:session] = self

    n_s = Subscription.new(s[:t], s[:h], s[:o], self, msg.subscription)
    self._subscriptions[msg.subscription] = n_s
    c = s[:c]
    c.call(n_s, nil, details) if c
  end

end

#_process_UNREGISTER_error(msg) ⇒ Object

Processes an error from a request

Parameters:



881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
# File 'lib/wamp/client/session.rb', line 881

def _process_UNREGISTER_error(msg)

  # Remove the pending subscription and inform the caller of the failure
  r = self._requests[:unregister].delete(msg.request_request)
  if r

    details = msg.details || {}
    details[:procedure] = r[:r].procedure unless details[:procedure]
    details[:type] = 'unregister'
    details[:session] = self

    c = r[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_UNREGISTERED(msg) ⇒ Object

Processes the response to a unregister request

Parameters:



860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
# File 'lib/wamp/client/session.rb', line 860

def _process_UNREGISTERED(msg)

  # Remove the pending unregistration, add it to the registered ones, and inform the caller
  r = self._requests[:unregister].delete(msg.unregister_request)
  if r
    r_s = r[:r]
    self._registrations.delete(r_s.id)

    details = {}
    details[:procedure] = r_s.procedure
    details[:type] = 'unregister'
    details[:session] = self

    c = r[:c]
    c.call(r_s, nil, details) if c
  end

end

#_process_UNSUBSCRIBE_error(msg) ⇒ Object

Processes an error from a request

Parameters:



541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
# File 'lib/wamp/client/session.rb', line 541

def _process_UNSUBSCRIBE_error(msg)

  # Remove the pending subscription and inform the caller of the failure
  s = self._requests[:unsubscribe].delete(msg.request_request)
  if s

    details = msg.details || {}
    details[:topic] = s[:s].topic unless details[:topic]
    details[:type] = 'unsubscribe'
    details[:session] = self

    c = s[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_UNSUBSCRIBED(msg) ⇒ Object

Processes the response to a unsubscribe request

Parameters:



519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
# File 'lib/wamp/client/session.rb', line 519

def _process_UNSUBSCRIBED(msg)

  # Remove the pending unsubscription, add it to the registered ones, and inform the caller
  s = self._requests[:unsubscribe].delete(msg.unsubscribe_request)
  if s
    n_s = s[:s]
    self._subscriptions.delete(n_s.id)

    details = {}
    details[:topic] = s[:s].topic
    details[:type] = 'unsubscribe'
    details[:session] = self

    c = s[:c]
    c.call(n_s, nil, details) if c
  end

end

#_receive_message(msg) ⇒ Object

Processes received messages

Parameters:

  • msg (Array)


326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
# File 'lib/wamp/client/session.rb', line 326

def _receive_message(msg)

  # Print the raw message
  logger.debug("#{self.class.name} RX(raw): #{msg.to_s}")

  # Parse the WAMP message
  message = Wamp::Client::Message.parse(msg)

  # Print the parsed WAMP message
  logger.debug("#{self.class.name} RX: #{message.to_s}")

  # WAMP Session is not open
  if self.id.nil?

    # Parse the welcome message
    if message.is_a? Wamp::Client::Message::Welcome
      # Get the session ID
      self.id = message.session

      # Log joining the session
      logger.info("#{self.class.name} joined session with realm '#{message.details[:realm]}'")

      # Call the callback if it is set
      @on_join.call(message.details) unless @on_join.nil?
    elsif message.is_a? Wamp::Client::Message::Challenge
      # Log challenge received
      logger.debug("#{self.class.name} auth challenge '#{message.authmethod}', extra: #{message.extra}")

      # Call the callback if set
      if @on_challenge
        signature, extra = @on_challenge.call(message.authmethod, message.extra)
      else
        signature = nil
        extra = nil
      end

      signature ||= ''
      extra ||= {}

      authenticate = Wamp::Client::Message::Authenticate.new(signature, extra)
      self._send_message(authenticate)

    elsif message.is_a? Wamp::Client::Message::Abort
      # Log leaving the session
      logger.info("#{self.class.name} left session '#{message.reason}'")

      # Call the callback if it is set
      @on_leave.call(message.reason, message.details) unless @on_leave.nil?
    end

    # Wamp Session is open
  else

    # If goodbye, close the session
    if message.is_a? Wamp::Client::Message::Goodbye

      # If we didn't send the goodbye, respond
      unless self._goodbye_sent
        goodbye = Wamp::Client::Message::Goodbye.new({}, 'wamp.error.goodbye_and_out')
        self._send_message(goodbye)
      end

      # Close out session
      self.id = nil
      self.realm = nil
      self._goodbye_sent = false
      @on_leave.call(message.reason, message.details) unless @on_leave.nil?

    else

      # Else this is a normal message.  Lookup the handler and call it
      type = message.is_a?(Message::Error) ? message.request_type : message.class.type
      handler = HANDLER_LOOKUP[type]

      if handler != nil
        handler.call(self, message)
      else
        logger.error("#{self.class.name} unknown message type '#{type}'")
      end
    end
  end

end

#_send_INVOCATION_error(request, error, check_defer = false) ⇒ Object

Sends an error back to the caller

Parameters:

  • request (Integer)
    • The request ID

  • error


698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
# File 'lib/wamp/client/session.rb', line 698

def _send_INVOCATION_error(request, error, check_defer=false)
  # Prevent responses for defers that have already completed or had an error
  if check_defer and not self._defers[request]
    return
  end

  if error.nil?
    error = CallError.new('wamp.error.runtime')
  elsif not error.is_a?(CallError)
    backtrace = error.is_a?(Exception) ? error.backtrace : nil
    error = CallError.new('wamp.error.runtime', [error.to_s], { backtrace: backtrace })
  end

  error_msg = Wamp::Client::Message::Error.new(Wamp::Client::Message::Types::INVOCATION, request, {}, error.error, error.args, error.kwargs)
  self._send_message(error_msg)
end

#_send_message(msg) ⇒ Object

Sends a message

Parameters:



316
317
318
319
320
321
322
# File 'lib/wamp/client/session.rb', line 316

def _send_message(msg)
  # Log the message
  logger.debug("#{self.class.name} TX: #{msg.to_s}")

  # Send it to the transport
  self.transport.send_message(msg.payload)
end

#call(procedure, args = nil, kwargs = nil, options = {}, &callback) ⇒ Call

Publishes and event to a topic

Parameters:

  • procedure (String)

    The procedure to invoke

  • args (Array) (defaults to: nil)

    The arguments

  • kwargs (Hash) (defaults to: nil)

    The keyword arguments

  • options (Hash) (defaults to: {})

    The options for the call

  • callback (block)

    The callback(result, error, details) called to signal if the call was a success or not

Returns:

  • (Call)

    An object representing the call



909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
# File 'lib/wamp/client/session.rb', line 909

def call(procedure, args=nil, kwargs=nil, options={}, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'call'"
  end

  self.class.check_uri('procedure', procedure)
  self.class.check_dict('options', options)
  self.class.check_list('args', args, true)
  self.class.check_dict('kwargs', kwargs, true)

  # Create a new call request
  request = self._generate_id
  self._requests[:call][request] = {p: procedure, a: args, k: kwargs, o: options, c: callback}

  # Send the message
  msg = Wamp::Client::Message::Call.new(request, options, procedure, args, kwargs)
  self._send_message(msg)

  call = Call.new(self, request)

  # Timeout Logic
  if options[:timeout] and options[:timeout] > 0
    self.transport.add_timer(options[:timeout]) do
      # Once the timer expires, if the call hasn't completed, cancel it
      if self._requests[:call][call.id]
        call.cancel
      end
    end
  end

  call
end

#cancel(call, mode = 'skip') ⇒ Object

Cancels a call

Parameters:

  • call (Call)
    • The call object

  • mode (String) (defaults to: 'skip')
    • The mode of the skip. Options are ‘skip’, ‘kill’, ‘killnowait’



990
991
992
993
994
995
996
997
998
999
1000
# File 'lib/wamp/client/session.rb', line 990

def cancel(call, mode='skip')
  unless is_open?
    raise RuntimeError, "Session must be open to call 'cancel'"
  end

  self.class.check_nil('call', call, false)

  # Send the message
  cancel = Wamp::Client::Message::Cancel.new(call.id, { mode: mode })
  self._send_message(cancel)
end

#is_open?Boolean

Returns ‘true’ if the session is open

Returns:

  • (Boolean)


254
255
256
# File 'lib/wamp/client/session.rb', line 254

def is_open?
  !self.id.nil?
end

#join(realm) ⇒ Object

Joins the WAMP Router

Parameters:

  • realm (String)

    The name of the realm



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/wamp/client/session.rb', line 260

def join(realm)
  if is_open?
    raise RuntimeError, "Session must be closed to call 'join'"
  end

  self.class.check_uri('realm', realm)

  self.realm = realm

  details = {}
  details[:roles] = WAMP_FEATURES
  details[:agent] = "Ruby-Wamp::Client-#{Wamp::Client::VERSION}"
  details[:authid] = self.options[:authid] if self.options[:authid]
  details[:authmethods] = self.options[:authmethods] if self.options[:authmethods]

  # Send Hello message
  hello = Wamp::Client::Message::Hello.new(realm, details)
  self._send_message(hello)
end

#leave(reason = 'wamp.close.normal', message = 'user initiated') ⇒ Object

Leaves the WAMP Router

Parameters:

  • reason (String) (defaults to: 'wamp.close.normal')

    URI signalling the reason for leaving



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/wamp/client/session.rb', line 282

def leave(reason='wamp.close.normal', message='user initiated')
  unless is_open?
    raise RuntimeError, "Session must be opened to call 'leave'"
  end

  self.class.check_uri('reason', reason, true)
  self.class.check_string('message', message, true)

  details = {}
  details[:message] = message

  # Send Goodbye message
  goodbye = Wamp::Client::Message::Goodbye.new(details, reason)
  self._send_message(goodbye)
  self._goodbye_sent = true
end

#on(event, &callback) ⇒ Object

Simple setter for callbacks



192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/wamp/client/session.rb', line 192

def on(event, &callback)
  case event
  when :join
    self.on_join(&callback)
  when :challenge
    self.on_challenge(&callback)
  when :leave
    self.on_leave(&callback)
  else
    raise RuntimeError, "Unknown on(event) '#{event}'"
  end
end

#on_challenge(&on_challenge) ⇒ Object



187
188
189
# File 'lib/wamp/client/session.rb', line 187

def on_challenge(&on_challenge)
  @on_challenge = on_challenge
end

#on_join(&on_join) ⇒ Object



169
170
171
# File 'lib/wamp/client/session.rb', line 169

def on_join(&on_join)
  @on_join = on_join
end

#on_leave(&on_leave) ⇒ Object



177
178
179
# File 'lib/wamp/client/session.rb', line 177

def on_leave(&on_leave)
  @on_leave = on_leave
end

#publish(topic, args = nil, kwargs = nil, options = {}, &callback) ⇒ Object

Publishes and event to a topic

Parameters:

  • topic (String)

    The topic to publish the event to

  • args (Array) (defaults to: nil)

    The arguments

  • kwargs (Hash) (defaults to: nil)

    The keyword arguments

  • options (Hash) (defaults to: {})

    The options for the publish

  • callback (block)

    The callback(publish, error, details) called to signal if the publish was a success or not



568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
# File 'lib/wamp/client/session.rb', line 568

def publish(topic, args=nil, kwargs=nil, options={}, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'publish'"
  end

  self.class.check_uri('topic', topic)
  self.class.check_dict('options', options)
  self.class.check_list('args', args, true)
  self.class.check_dict('kwargs', kwargs, true)

  # Create a new publish request
  request = self._generate_id
  self._requests[:publish][request] = {t: topic, a: args, k: kwargs, o: options, c: callback} if options[:acknowledge]

  # Send the message
  publish = Wamp::Client::Message::Publish.new(request, options, topic, args, kwargs)
  self._send_message(publish)
end

#register(procedure, handler, options = nil, interrupt = nil, &callback) ⇒ Object

Register to a procedure

Parameters:

  • procedure (String)

    The procedure to register for

  • handler (lambda)

    The handler(args, kwargs, details) when an invocation is received

  • options (Hash, nil) (defaults to: nil)

    The options for the registration

  • interrupt (lambda) (defaults to: nil)

    The handler(request, mode) when an interrupt is received

  • callback (block)

    The callback(registration, error, details) called to signal if the registration was a success or not



636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
# File 'lib/wamp/client/session.rb', line 636

def register(procedure, handler, options=nil, interrupt=nil, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'register'"
  end

  options ||= {}

  self.class.check_uri('procedure', procedure)
  self.class.check_nil('handler', handler, false)

  # Create a new registration request
  request = self._generate_id
  self._requests[:register][request] = {p: procedure, h: handler, i: interrupt, o: options, c: callback}

  # Send the message
  register = Wamp::Client::Message::Register.new(request, options, procedure)
  self._send_message(register)
end

#subscribe(topic, handler, options = {}, &callback) ⇒ Object

Subscribes to a topic

Parameters:

  • topic (String)

    The topic to subscribe to

  • handler (lambda)

    The handler(args, kwargs, details) when an event is received

  • options (Hash) (defaults to: {})

    The options for the subscription

  • callback (block)

    The callback(subscription, error) called to signal if the subscription was a success or not



417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
# File 'lib/wamp/client/session.rb', line 417

def subscribe(topic, handler, options={}, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'subscribe'"
  end

  self.class.check_uri('topic', topic)
  self.class.check_dict('options', options)
  self.class.check_nil('handler', handler, false)

  # Create a new subscribe request
  request = self._generate_id
  self._requests[:subscribe][request] = {t: topic, h: handler, o: options, c: callback}

  # Send the message
  subscribe = Wamp::Client::Message::Subscribe.new(request, options, topic)
  self._send_message(subscribe)
end

#unregister(registration, &callback) ⇒ Object

Unregisters from a procedure

Parameters:

  • registration (Registration)

    The registration object from when the registration was created

  • callback (block)

    The callback(registration, error, details) called to signal if the unregistration was a success or not



842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
# File 'lib/wamp/client/session.rb', line 842

def unregister(registration, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'unregister'"
  end

  self.class.check_nil('registration', registration, false)

  # Create a new unsubscribe request
  request = self._generate_id
  self._requests[:unregister][request] = { r: registration, c: callback }

  # Send the message
  unregister = Wamp::Client::Message::Unregister.new(request, registration.id)
  self._send_message(unregister)
end

#unsubscribe(subscription, &callback) ⇒ Object

Unsubscribes from a subscription

Parameters:

  • subscription (Subscription)

    The subscription object from when the subscription was created

  • callback (block)

    The callback(subscription, error, details) called to signal if the subscription was a success or not



501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
# File 'lib/wamp/client/session.rb', line 501

def unsubscribe(subscription, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'unsubscribe'"
  end

  self.class.check_nil('subscription', subscription, false)

  # Create a new unsubscribe request
  request = self._generate_id
  self._requests[:unsubscribe][request] = { s: subscription, c: callback }

  # Send the message
  unsubscribe = Wamp::Client::Message::Unsubscribe.new(request, subscription.id)
  self._send_message(unsubscribe)
end

#yield(request, result, options = {}, check_defer = false) ⇒ Object

Sends a result for the invocation

Parameters:

  • request (Integer)
    • The id of the request

  • result (CallError, CallResult, anything)
    • If it is a CallError, the error will be returned

  • options (Hash) (defaults to: {})
    • The options to be sent with the yield



719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
# File 'lib/wamp/client/session.rb', line 719

def yield(request, result, options={}, check_defer=false)
  # Prevent responses for defers that have already completed or had an error
  if check_defer and not self._defers[request]
    return
  end

  if result.nil?
    result = CallResult.new
  elsif result.is_a?(CallError)
    # Do nothing
  elsif not result.is_a?(CallResult)
    result = CallResult.new([result])
  end

  if result.is_a?(CallError)
    self._send_INVOCATION_error(request, result)
  else
    yield_msg = Wamp::Client::Message::Yield.new(request, options, result.args, result.kwargs)
    self._send_message(yield_msg)
  end
end