Class: WampClient::Session

Inherits:
Object
  • Object
show all
Includes:
Check
Defined in:
lib/wamp_client/session.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Check

included

Constructor Details

#initialize(transport, options = {}) ⇒ Session

Constructor

Parameters:

  • transport (WampClient::Transport::Base)

    The transport that the session will use

  • options (Hash) (defaults to: {})

    Hash containing different session options

Options Hash (options):

  • :authid (String)

    The authentication ID

  • :authmethods (Array)

    Different auth methods that this client supports



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/wamp_client/session.rb', line 180

def initialize(transport, options={})

  # Parameters
  self.id = nil
  self.realm = nil
  self.options = options || {}
  self.verbose = self.options[:verbose]

  # Outstanding Requests
  self._requests = {
      publish: {},
      subscribe: {},
      unsubscribe: {},
      call: {},
      register: {},
      unregister: {}
  }

  # Init Subs and Regs in place
  self._subscriptions = {}
  self._registrations = {}
  self._defers = {}

  # Setup Transport
  self.transport = transport
  self.transport.on_message do |msg|
    self._receive_message(msg)
  end

  # Other parameters
  self._goodbye_sent = false

  # Setup session callbacks
  @on_join = nil
  @on_leave = nil

end

Instance Attribute Details

#_defersObject

Private attributes



173
174
175
# File 'lib/wamp_client/session.rb', line 173

def _defers
  @_defers
end

#_goodbye_sentObject

Private attributes



173
174
175
# File 'lib/wamp_client/session.rb', line 173

def _goodbye_sent
  @_goodbye_sent
end

#_registrationsObject

Private attributes



173
174
175
# File 'lib/wamp_client/session.rb', line 173

def _registrations
  @_registrations
end

#_requestsObject

Private attributes



173
174
175
# File 'lib/wamp_client/session.rb', line 173

def _requests
  @_requests
end

#_subscriptionsObject

Private attributes



173
174
175
# File 'lib/wamp_client/session.rb', line 173

def _subscriptions
  @_subscriptions
end

#idObject

Returns the value of attribute id.



170
171
172
# File 'lib/wamp_client/session.rb', line 170

def id
  @id
end

#optionsObject

Returns the value of attribute options.



170
171
172
# File 'lib/wamp_client/session.rb', line 170

def options
  @options
end

#realmObject

Returns the value of attribute realm.



170
171
172
# File 'lib/wamp_client/session.rb', line 170

def realm
  @realm
end

#transportObject

Returns the value of attribute transport.



170
171
172
# File 'lib/wamp_client/session.rb', line 170

def transport
  @transport
end

#verboseObject

Returns the value of attribute verbose.



170
171
172
# File 'lib/wamp_client/session.rb', line 170

def verbose
  @verbose
end

Instance Method Details

#_error_to_hash(msg) ⇒ Object

Converts and error message to a hash

Parameters:



271
272
273
274
275
276
277
# File 'lib/wamp_client/session.rb', line 271

def _error_to_hash(msg)
  {
      error: msg.error,
      args: msg.arguments,
      kwargs: msg.argumentskw
  }
end

#_generate_idObject

Generates an ID according to the specification (Section 5.1.2)



265
266
267
# File 'lib/wamp_client/session.rb', line 265

def _generate_id
  rand(0..9007199254740992)
end

#_process_CALL_error(msg) ⇒ Object

Processes an error from a call request

Parameters:



946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
# File 'lib/wamp_client/session.rb', line 946

def _process_CALL_error(msg)

  # Remove the pending publish and inform the caller of the failure
  call = self._requests[:call].delete(msg.request_request)
  if call

    details = msg.details || {}
    details[:procedure] = call[:p] unless details[:procedure]
    details[:type] = 'call'
    details[:session] = self

    c = call[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_EVENT(msg) ⇒ Object

Processes and event from the broker

Parameters:



458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
# File 'lib/wamp_client/session.rb', line 458

def _process_EVENT(msg)

  args = msg.publish_arguments || []
  kwargs = msg.publish_argumentskw || {}

  s = self._subscriptions[msg.subscribed_subscription]
  if s
    details = msg.details || {}
    details[:publication] = msg.published_publication
    details[:session] = self

    h = s.handler
    h.call(args, kwargs, details) if h
  end

end

#_process_INTERRUPT(msg) ⇒ Object

Processes the interrupt

Parameters:



783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
# File 'lib/wamp_client/session.rb', line 783

def _process_INTERRUPT(msg)

  request = msg.invocation_request
  mode = msg.options[:mode]

  defer = self._defers[request]
  if defer
    r = self._registrations[defer.registration]
    if r
      # If it exists, call the interrupt handler to inform it of the interrupt
      i = r.i_handler
      error = nil
      if i
        begin
          error = i.call(request, mode)
        rescue Exception => e
          error = e
        end
      end

      error ||= 'interrupt'

      # Send the error back to the client
      self._send_INVOCATION_error(request, error, true)
    end

    # Delete the defer
    self._defers.delete(request)
  end

end

#_process_INVOCATION(msg) ⇒ Object

Processes and event from the broker

Parameters:



724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
# File 'lib/wamp_client/session.rb', line 724

def _process_INVOCATION(msg)

  request = msg.request
  args = msg.call_arguments || []
  kwargs = msg.call_argumentskw || {}

  details = msg.details || {}
  details[:request] = request
  details[:session] = self

  r = self._registrations[msg.registered_registration]
  if r
    h = r.handler
    if h
      begin
        value = h.call(args, kwargs, details)

        # If a defer was returned, handle accordingly
        if value.is_a? WampClient::Defer::CallDefer
          value.request = request
          value.registration = msg.registered_registration

          # Store the defer
          self._defers[request] = value

          # On complete, send the result
          value.on_complete do |defer, result|
            self.yield(defer.request, result, {}, true)
            self._defers.delete(defer.request)
          end

          # On error, send the error
          value.on_error do |defer, error|
            self._send_INVOCATION_error(defer.request, error, true)
            self._defers.delete(defer.request)
          end

          # For progressive, return the progress
          if value.is_a? WampClient::Defer::ProgressiveCallDefer
            value.on_progress do |defer, result|
              self.yield(defer.request, result, {progress: true}, true)
            end
          end

        # Else it was a normal response
        else
          self.yield(request, value)
        end

      rescue Exception => error
        self._send_INVOCATION_error(request, error)
      end

    end
  end
end

#_process_PUBLISH_error(msg) ⇒ Object

Processes an error from a publish request

Parameters:



590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
# File 'lib/wamp_client/session.rb', line 590

def _process_PUBLISH_error(msg)

  # Remove the pending publish and inform the caller of the failure
  s = self._requests[:publish].delete(msg.request_request)
  if s

    details = msg.details || {}
    details[:topic] = s[:t] unless details[:topic]
    details[:type] = 'publish'
    details[:session] = self

    c = s[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_PUBLISHED(msg) ⇒ Object

Processes the response to a publish request

Parameters:



570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
# File 'lib/wamp_client/session.rb', line 570

def _process_PUBLISHED(msg)

  # Remove the pending publish and alert the callback
  p = self._requests[:publish].delete(msg.publish_request)
  if p

    details = {}
    details[:topic] = p[:t]
    details[:type] = 'publish'
    details[:publication] = msg.publication
    details[:session] = self

    c = p[:c]
    c.call(p, nil, details) if c
  end

end

#_process_REGISTER_error(msg) ⇒ Object

Processes an error from a request

Parameters:



659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
# File 'lib/wamp_client/session.rb', line 659

def _process_REGISTER_error(msg)

  # Remove the pending registration and inform the caller of the failure
  r = self._requests[:register].delete(msg.request_request)
  if r

    details = msg.details || {}
    details[:procedure] = r[:p] unless details[:procedure]
    details[:type] = 'register'
    details[:session] = self

    c = r[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_REGISTERED(msg) ⇒ Object

Processes the response to a register request

Parameters:



638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
# File 'lib/wamp_client/session.rb', line 638

def _process_REGISTERED(msg)

  # Remove the pending subscription, add it to the registered ones, and inform the caller
  r = self._requests[:register].delete(msg.register_request)
  if r
    n_r = Registration.new(r[:p], r[:h], r[:o], r[:i], self, msg.registration)
    self._registrations[msg.registration] = n_r

    details = {}
    details[:procedure] = r[:p]
    details[:type] = 'register'
    details[:session] = self

    c = r[:c]
    c.call(n_r, nil, details) if c
  end

end

#_process_RESULT(msg) ⇒ Object

Processes the response to a publish request

Parameters:



924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
# File 'lib/wamp_client/session.rb', line 924

def _process_RESULT(msg)

  details = msg.details || {}

  call = self._requests[:call][msg.call_request]

  # Don't remove if progress is true and the options had receive_progress true
  self._requests[:call].delete(msg.call_request) unless (details[:progress] and (call and call[:o][:receive_progress]))

  if call
    details[:procedure] = call[:p] unless details[:procedure]
    details[:type] = 'call'
    details[:session] = self

    c = call[:c]
    c.call(CallResult.new(msg.yield_arguments, msg.yield_argumentskw), nil, details) if c
  end

end

#_process_SUBSCRIBE_error(msg) ⇒ Object

Processes an error from a request

Parameters:



439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/wamp_client/session.rb', line 439

def _process_SUBSCRIBE_error(msg)

  # Remove the pending subscription and inform the caller of the failure
  s = self._requests[:subscribe].delete(msg.request_request)
  if s

    details = msg.details || {}
    details[:topic] = s[:t] unless details[:topic]
    details[:type] = 'subscribe'
    details[:session] = self

    c = s[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_SUBSCRIBED(msg) ⇒ Object

Processes the response to a subscribe request

Parameters:



418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
# File 'lib/wamp_client/session.rb', line 418

def _process_SUBSCRIBED(msg)

  # Remove the pending subscription, add it to the registered ones, and inform the caller
  s = self._requests[:subscribe].delete(msg.subscribe_request)
  if s

    details = {}
    details[:topic] = s[:t] unless details[:topic]
    details[:type] = 'subscribe'
    details[:session] = self

    n_s = Subscription.new(s[:t], s[:h], s[:o], self, msg.subscription)
    self._subscriptions[msg.subscription] = n_s
    c = s[:c]
    c.call(n_s, nil, details) if c
  end

end

#_process_UNREGISTER_error(msg) ⇒ Object

Processes an error from a request

Parameters:



861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
# File 'lib/wamp_client/session.rb', line 861

def _process_UNREGISTER_error(msg)

  # Remove the pending subscription and inform the caller of the failure
  r = self._requests[:unregister].delete(msg.request_request)
  if r

    details = msg.details || {}
    details[:procedure] = r[:r].procedure unless details[:procedure]
    details[:type] = 'unregister'
    details[:session] = self

    c = r[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_UNREGISTERED(msg) ⇒ Object

Processes the response to a unregister request

Parameters:



840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
# File 'lib/wamp_client/session.rb', line 840

def _process_UNREGISTERED(msg)

  # Remove the pending unregistration, add it to the registered ones, and inform the caller
  r = self._requests[:unregister].delete(msg.unregister_request)
  if r
    r_s = r[:r]
    self._registrations.delete(r_s.id)

    details = {}
    details[:procedure] = r_s.procedure
    details[:type] = 'unregister'
    details[:session] = self

    c = r[:c]
    c.call(r_s, nil, details) if c
  end

end

#_process_UNSUBSCRIBE_error(msg) ⇒ Object

Processes an error from a request

Parameters:



522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
# File 'lib/wamp_client/session.rb', line 522

def _process_UNSUBSCRIBE_error(msg)

  # Remove the pending subscription and inform the caller of the failure
  s = self._requests[:unsubscribe].delete(msg.request_request)
  if s

    details = msg.details || {}
    details[:topic] = s[:s].topic unless details[:topic]
    details[:type] = 'unsubscribe'
    details[:session] = self

    c = s[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_UNSUBSCRIBED(msg) ⇒ Object

Processes the response to a unsubscribe request

Parameters:



500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
# File 'lib/wamp_client/session.rb', line 500

def _process_UNSUBSCRIBED(msg)

  # Remove the pending unsubscription, add it to the registered ones, and inform the caller
  s = self._requests[:unsubscribe].delete(msg.unsubscribe_request)
  if s
    n_s = s[:s]
    self._subscriptions.delete(n_s.id)

    details = {}
    details[:topic] = s[:s].topic
    details[:type] = 'unsubscribe'
    details[:session] = self

    c = s[:c]
    c.call(n_s, nil, details) if c
  end

end

#_receive_message(msg) ⇒ Object

Processes received messages

Parameters:

  • msg (Array)


288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/wamp_client/session.rb', line 288

def _receive_message(msg)

  message = WampClient::Message::Base.parse(msg)

  if self.verbose
    puts 'RX: ' + message.to_s if message
    puts 'RX(non-wamp): ' + msg.to_s unless message
  end

  # WAMP Session is not open
  if self.id.nil?

    # Parse the welcome message
    if message.is_a? WampClient::Message::Welcome
      self.id = message.session
      @on_join.call(message.details) unless @on_join.nil?
    elsif message.is_a? WampClient::Message::Challenge

      if @on_challenge
        signature, extra = @on_challenge.call(message.authmethod, message.extra)
      else
        signature = nil
        extra = nil
      end

      signature ||= ''
      extra ||= {}

      authenticate = WampClient::Message::Authenticate.new(signature, extra)
      self._send_message(authenticate)

    elsif message.is_a? WampClient::Message::Abort
      @on_leave.call(message.reason, message.details) unless @on_leave.nil?
    end

  # Wamp Session is open
  else

    # If goodbye, close the session
    if message.is_a? WampClient::Message::Goodbye

      # If we didn't send the goodbye, respond
      unless self._goodbye_sent
        goodbye = WampClient::Message::Goodbye.new({}, 'wamp.error.goodbye_and_out')
        self._send_message(goodbye)
      end

      # Close out session
      self.id = nil
      self.realm = nil
      self._goodbye_sent = false
      @on_leave.call(message.reason, message.details) unless @on_leave.nil?

    else

      # Process Errors
      if message.is_a? WampClient::Message::Error
        if message.request_type == WampClient::Message::Types::SUBSCRIBE
          self._process_SUBSCRIBE_error(message)
        elsif message.request_type == WampClient::Message::Types::UNSUBSCRIBE
          self._process_UNSUBSCRIBE_error(message)
        elsif message.request_type == WampClient::Message::Types::PUBLISH
          self._process_PUBLISH_error(message)
        elsif message.request_type == WampClient::Message::Types::REGISTER
          self._process_REGISTER_error(message)
        elsif message.request_type == WampClient::Message::Types::UNREGISTER
          self._process_UNREGISTER_error(message)
        elsif message.request_type == WampClient::Message::Types::CALL
          self._process_CALL_error(message)
        else
          # TODO: Some Error??  Not Implemented yet
        end

      # Process Messages
      else
        if message.is_a? WampClient::Message::Subscribed
          self._process_SUBSCRIBED(message)
        elsif message.is_a? WampClient::Message::Unsubscribed
          self._process_UNSUBSCRIBED(message)
        elsif message.is_a? WampClient::Message::Published
          self._process_PUBLISHED(message)
        elsif message.is_a? WampClient::Message::Event
          self._process_EVENT(message)
        elsif message.is_a? WampClient::Message::Registered
          self._process_REGISTERED(message)
        elsif message.is_a? WampClient::Message::Unregistered
          self._process_UNREGISTERED(message)
        elsif message.is_a? WampClient::Message::Invocation
          self._process_INVOCATION(message)
        elsif message.is_a? WampClient::Message::Interrupt
          self._process_INTERRUPT(message)
        elsif message.is_a? WampClient::Message::Result
          self._process_RESULT(message)
        else
          # TODO: Some Error??  Not Implemented yet
        end
      end

    end
  end

end

#_send_INVOCATION_error(request, error, check_defer = false) ⇒ Object

Sends an error back to the caller

Parameters:

  • request (Integer)
    • The request ID

  • error


679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
# File 'lib/wamp_client/session.rb', line 679

def _send_INVOCATION_error(request, error, check_defer=false)
  # Prevent responses for defers that have already completed or had an error
  if check_defer and not self._defers[request]
    return
  end

  if error.nil?
    error = CallError.new('wamp.error.runtime')
  elsif not error.is_a?(CallError)
    error = CallError.new('wamp.error.runtime', [error.to_s])
  end

  error_msg = WampClient::Message::Error.new(WampClient::Message::Types::INVOCATION, request, {}, error.error, error.args, error.kwargs)
  self._send_message(error_msg)
end

#_send_message(msg) ⇒ Object

Sends a message

Parameters:



281
282
283
284
# File 'lib/wamp_client/session.rb', line 281

def _send_message(msg)
  puts 'TX: ' + msg.to_s if self.verbose
  self.transport.send_message(msg.payload)
end

#call(procedure, args = nil, kwargs = nil, options = {}, &callback) ⇒ Call

Publishes and event to a topic

Parameters:

  • procedure (String)

    The procedure to invoke

  • args (Array) (defaults to: nil)

    The arguments

  • kwargs (Hash) (defaults to: nil)

    The keyword arguments

  • options (Hash) (defaults to: {})

    The options for the call

  • callback (block)

    The callback(result, error, details) called to signal if the call was a success or not

Returns:

  • (Call)

    An object representing the call



889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
# File 'lib/wamp_client/session.rb', line 889

def call(procedure, args=nil, kwargs=nil, options={}, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'call'"
  end

  self.class.check_uri('procedure', procedure)
  self.class.check_dict('options', options)
  self.class.check_list('args', args, true)
  self.class.check_dict('kwargs', kwargs, true)

  # Create a new call request
  request = self._generate_id
  self._requests[:call][request] = {p: procedure, a: args, k: kwargs, o: options, c: callback}

  # Send the message
  msg = WampClient::Message::Call.new(request, options, procedure, args, kwargs)
  self._send_message(msg)

  call = Call.new(self, request)

  # Timeout Logic
  if options[:timeout] and options[:timeout] > 0
    self.transport.timer(options[:timeout]) do
      # Once the timer expires, if the call hasn't completed, cancel it
      if self._requests[:call][call.id]
        call.cancel
      end
    end
  end

  call
end

#cancel(call, mode = 'skip') ⇒ Object

Cancels a call

Parameters:

  • call (Call)
    • The call object

  • mode (String) (defaults to: 'skip')
    • The mode of the skip. Options are ‘skip’, ‘kill’, ‘killnowait’



970
971
972
973
974
975
976
977
978
979
980
# File 'lib/wamp_client/session.rb', line 970

def cancel(call, mode='skip')
  unless is_open?
    raise RuntimeError, "Session must be open to call 'cancel'"
  end

  self.class.check_nil('call', call, false)

  # Send the message
  cancel = WampClient::Message::Cancel.new(call.id, { mode: mode })
  self._send_message(cancel)
end

#is_open?Boolean

Returns ‘true’ if the session is open

Returns:

  • (Boolean)


219
220
221
# File 'lib/wamp_client/session.rb', line 219

def is_open?
  !self.id.nil?
end

#join(realm) ⇒ Object

Joins the WAMP Router

Parameters:

  • realm (String)

    The name of the realm



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/wamp_client/session.rb', line 225

def join(realm)
  if is_open?
    raise RuntimeError, "Session must be closed to call 'join'"
  end

  self.class.check_uri('realm', realm)

  self.realm = realm

  details = {}
  details[:roles] = WAMP_FEATURES
  details[:agent] = "Ruby-WampClient-#{WampClient::VERSION}"
  details[:authid] = self.options[:authid] if self.options[:authid]
  details[:authmethods] = self.options[:authmethods] if self.options[:authmethods]

  # Send Hello message
  hello = WampClient::Message::Hello.new(realm, details)
  self._send_message(hello)
end

#leave(reason = 'wamp.close.normal', message = 'user initiated') ⇒ Object

Leaves the WAMP Router

Parameters:

  • reason (String) (defaults to: 'wamp.close.normal')

    URI signalling the reason for leaving



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/wamp_client/session.rb', line 247

def leave(reason='wamp.close.normal', message='user initiated')
  unless is_open?
    raise RuntimeError, "Session must be opened to call 'leave'"
  end

  self.class.check_uri('reason', reason, true)
  self.class.check_string('message', message, true)

  details = {}
  details[:message] = message

  # Send Goodbye message
  goodbye = WampClient::Message::Goodbye.new(details, reason)
  self._send_message(goodbye)
  self._goodbye_sent = true
end

#on_challenge(&on_challenge) ⇒ Object



166
167
168
# File 'lib/wamp_client/session.rb', line 166

def on_challenge(&on_challenge)
  @on_challenge = on_challenge
end

#on_join(&on_join) ⇒ Object



148
149
150
# File 'lib/wamp_client/session.rb', line 148

def on_join(&on_join)
  @on_join = on_join
end

#on_leave(&on_leave) ⇒ Object



156
157
158
# File 'lib/wamp_client/session.rb', line 156

def on_leave(&on_leave)
  @on_leave = on_leave
end

#publish(topic, args = nil, kwargs = nil, options = {}, &callback) ⇒ Object

Publishes and event to a topic

Parameters:

  • topic (String)

    The topic to publish the event to

  • args (Array) (defaults to: nil)

    The arguments

  • kwargs (Hash) (defaults to: nil)

    The keyword arguments

  • options (Hash) (defaults to: {})

    The options for the publish

  • callback (block)

    The callback(publish, error, details) called to signal if the publish was a success or not



549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
# File 'lib/wamp_client/session.rb', line 549

def publish(topic, args=nil, kwargs=nil, options={}, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'publish'"
  end

  self.class.check_uri('topic', topic)
  self.class.check_dict('options', options)
  self.class.check_list('args', args, true)
  self.class.check_dict('kwargs', kwargs, true)

  # Create a new publish request
  request = self._generate_id
  self._requests[:publish][request] = {t: topic, a: args, k: kwargs, o: options, c: callback} if options[:acknowledge]

  # Send the message
  publish = WampClient::Message::Publish.new(request, options, topic, args, kwargs)
  self._send_message(publish)
end

#register(procedure, handler, options = nil, interrupt = nil, &callback) ⇒ Object

Register to a procedure

Parameters:

  • procedure (String)

    The procedure to register for

  • handler (lambda)

    The handler(args, kwargs, details) when an invocation is received

  • options (Hash, nil) (defaults to: nil)

    The options for the registration

  • interrupt (lambda) (defaults to: nil)

    The handler(request, mode) when an interrupt is received

  • callback (block)

    The callback(registration, error, details) called to signal if the registration was a success or not



617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
# File 'lib/wamp_client/session.rb', line 617

def register(procedure, handler, options=nil, interrupt=nil, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'register'"
  end

  options ||= {}

  self.class.check_uri('procedure', procedure)
  self.class.check_nil('handler', handler, false)

  # Create a new registration request
  request = self._generate_id
  self._requests[:register][request] = {p: procedure, h: handler, i: interrupt, o: options, c: callback}

  # Send the message
  register = WampClient::Message::Register.new(request, options, procedure)
  self._send_message(register)
end

#subscribe(topic, handler, options = {}, &callback) ⇒ Object

Subscribes to a topic

Parameters:

  • topic (String)

    The topic to subscribe to

  • handler (lambda)

    The handler(args, kwargs, details) when an event is received

  • options (Hash) (defaults to: {})

    The options for the subscription

  • callback (block)

    The callback(subscription, error) called to signal if the subscription was a success or not



398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
# File 'lib/wamp_client/session.rb', line 398

def subscribe(topic, handler, options={}, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'subscribe'"
  end

  self.class.check_uri('topic', topic)
  self.class.check_dict('options', options)
  self.class.check_nil('handler', handler, false)

  # Create a new subscribe request
  request = self._generate_id
  self._requests[:subscribe][request] = {t: topic, h: handler, o: options, c: callback}

  # Send the message
  subscribe = WampClient::Message::Subscribe.new(request, options, topic)
  self._send_message(subscribe)
end

#unregister(registration, &callback) ⇒ Object

Unregisters from a procedure

Parameters:

  • registration (Registration)

    The registration object from when the registration was created

  • callback (block)

    The callback(registration, error, details) called to signal if the unregistration was a success or not



822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
# File 'lib/wamp_client/session.rb', line 822

def unregister(registration, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'unregister'"
  end

  self.class.check_nil('registration', registration, false)

  # Create a new unsubscribe request
  request = self._generate_id
  self._requests[:unregister][request] = { r: registration, c: callback }

  # Send the message
  unregister = WampClient::Message::Unregister.new(request, registration.id)
  self._send_message(unregister)
end

#unsubscribe(subscription, &callback) ⇒ Object

Unsubscribes from a subscription

Parameters:

  • subscription (Subscription)

    The subscription object from when the subscription was created

  • callback (block)

    The callback(subscription, error, details) called to signal if the subscription was a success or not



482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
# File 'lib/wamp_client/session.rb', line 482

def unsubscribe(subscription, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'unsubscribe'"
  end

  self.class.check_nil('subscription', subscription, false)

  # Create a new unsubscribe request
  request = self._generate_id
  self._requests[:unsubscribe][request] = { s: subscription, c: callback }

  # Send the message
  unsubscribe = WampClient::Message::Unsubscribe.new(request, subscription.id)
  self._send_message(unsubscribe)
end

#yield(request, result, options = {}, check_defer = false) ⇒ Object

Sends a result for the invocation

Parameters:

  • request (Integer)
    • The id of the request

  • result (CallError, CallResult, anything)
    • If it is a CallError, the error will be returned

  • options (Hash) (defaults to: {})
    • The options to be sent with the yield



699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
# File 'lib/wamp_client/session.rb', line 699

def yield(request, result, options={}, check_defer=false)
  # Prevent responses for defers that have already completed or had an error
  if check_defer and not self._defers[request]
    return
  end

  if result.nil?
    result = CallResult.new
  elsif result.is_a?(CallError)
    # Do nothing
  elsif not result.is_a?(CallResult)
    result = CallResult.new([result])
  end

  if result.is_a?(CallError)
    self._send_INVOCATION_error(request, result)
  else
    yield_msg = WampClient::Message::Yield.new(request, options, result.args, result.kwargs)
    self._send_message(yield_msg)
  end
end