Class: WampRails::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/wamp_rails/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = nil) ⇒ Client

Constructor for creating a client. Options are

Parameters:

  • options (Hash) (defaults to: nil)

    The different options to pass to the connection

Options Hash (options):

  • :name (String)
    • The name of the WAMP Client

  • :wamp (WampClient::Connection)
    • Allows a different WAMP to be passed in

  • :uri (String)

    The uri of the WAMP router to connect to

  • :realm (String)

    The realm to connect to

  • :protocol (String, nil)

    The protocol (default if wamp.2.json)

  • :authid (String, nil)

    The id to authenticate with

  • :authmethods (Array, nil)

    The different auth methods that the client supports

  • :headers (Hash)

    Custom headers to include during the connection

  • :serializer (WampClient::Serializer::Base)

    The serializer to use (default is json)



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/wamp_rails/client.rb', line 40

def initialize(options=nil)
  self.options = options || {}
  self.cmd_queue = Queue.new
  self.registrations = []
  self.subscriptions = []
  self.name = self.options[:name] || 'default'
  self.wamp = self.options[:wamp] || WampClient::Connection.new(self.options)
  self.verbose = self.options[:verbose]
  self.active = false

  # WAMP initialization.   Note that all callbacks are called on the reactor thread
  self.wamp.on_connect do
    puts "WAMP Rails Client #{self.name} connection established" if self.verbose
  end

  self.wamp.on_join do |session, details|
    puts "WAMP Rails Client #{self.name} was established" if self.verbose
    self.active = true

    # Register the procedures
    self.registrations.each do |registration|
      self._queue_command(registration)
    end

    # Subscribe to the topics
    self.subscriptions.each do |subscription|
      self._queue_command(subscription)
    end
  end

  self.wamp.on_leave do |reason, details|
    puts "WAMP Rails Client #{self.name} left because '#{reason}'" if self.verbose
    self.active = false
  end

  self.wamp.on_disconnect do |reason|
    puts "WAMP Rails Client #{self.name} disconnected because '#{reason}'" if self.verbose
    self.active = false
  end

  self.wamp.on_challenge do |authmethod, extra|
    if @on_challenge
      @on_challenge.call(authmethod, extra)
    else
      puts "WAMP Rails Client #{self.name} auth challenge was received but no method was implemented." if self.verbose
      nil
    end
  end

  # Catch SIGINT
  Signal.trap('INT') { self.close }
  Signal.trap('TERM') { self.close }
end

Instance Attribute Details

#activeObject

Returns the value of attribute active.



7
8
9
# File 'lib/wamp_rails/client.rb', line 7

def active
  @active
end

#cmd_queueObject

Returns the value of attribute cmd_queue.



7
8
9
# File 'lib/wamp_rails/client.rb', line 7

def cmd_queue
  @cmd_queue
end

#nameObject

Returns the value of attribute name.



7
8
9
# File 'lib/wamp_rails/client.rb', line 7

def name
  @name
end

#optionsObject

Returns the value of attribute options.



7
8
9
# File 'lib/wamp_rails/client.rb', line 7

def options
  @options
end

#registrationsObject

Returns the value of attribute registrations.



8
9
10
# File 'lib/wamp_rails/client.rb', line 8

def registrations
  @registrations
end

#subscriptionsObject

Returns the value of attribute subscriptions.



8
9
10
# File 'lib/wamp_rails/client.rb', line 8

def subscriptions
  @subscriptions
end

#threadObject

Returns the value of attribute thread.



7
8
9
# File 'lib/wamp_rails/client.rb', line 7

def thread
  @thread
end

#verboseObject

Returns the value of attribute verbose.



7
8
9
# File 'lib/wamp_rails/client.rb', line 7

def verbose
  @verbose
end

#wampObject

Returns the value of attribute wamp.



7
8
9
# File 'lib/wamp_rails/client.rb', line 7

def wamp
  @wamp
end

Instance Method Details

#_execute_command(command) ⇒ Object

Executes the command

Parameters:

  • - (WampRails::Command::Base)

    The command to execute



191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/wamp_rails/client.rb', line 191

def _execute_command(command)
  begin
    unless self.is_active?
      raise WampRails::Error.new("WAMP Rails Client #{self.name} is currently not active.")
    end

    command.execute
  rescue Exception => e
    puts e.to_s if self.verbose
    command.callback(nil, {error: 'wamp_rails.error', args: [e.to_s], kwargs: nil}, nil)
  end
end

#_queue_command(command, callback = nil) ⇒ Object

Queues the command and blocks it the callback is not nil

Parameters:

  • - (WampRails::Command::Base)

    The command to queue

  • - (Block)

    The block to call when complete



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/wamp_rails/client.rb', line 173

def _queue_command(command, callback=nil)

  # If the current thread is the EM thread, execute the command.  Else put it in the queue
  if self.thread == Thread.current
    self._execute_command(command)
  else
    self.cmd_queue.push(command)
  end

  # If the callback is defined, block until it finishes
  if callback
    callback_args = command.queue.pop
    callback.call(callback_args.result, callback_args.error, callback_args.details)
  end
end

#add_procedure(procedure, klass, options = nil) ⇒ Object

Adds a procedure to the client

Raises:



121
122
123
124
125
# File 'lib/wamp_rails/client.rb', line 121

def add_procedure(procedure, klass, options=nil)
  options ||= {}
  raise WampRails::Error.new('"add_procedure" must be called BEFORE "open"') if self.thread
  self.registrations << WampRails::Command::Register.new(procedure, klass, options, self)
end

#add_subscription(topic, klass, options = nil) ⇒ Object

Adds a subscription to the client

Raises:



128
129
130
131
132
# File 'lib/wamp_rails/client.rb', line 128

def add_subscription(topic, klass, options=nil)
  options ||= {}
  raise WampRails::Error.new('"add_subscription" must be called BEFORE "open"') if self.thread
  self.subscriptions << WampRails::Command::Subscribe.new(topic, klass, options, self)
end

#call(procedure, args = nil, kwargs = nil, options = {}, &callback) ⇒ Object

Note:

This method is blocking if the callback is not nil

Performs a WAMP call



140
141
142
143
# File 'lib/wamp_rails/client.rb', line 140

def call(procedure, args=nil, kwargs=nil, options={}, &callback)
  command = WampRails::Command::Call.new(procedure, args, kwargs, options, self)
  self._queue_command(command, callback)
end

#closeObject

Closes the connection



109
110
111
# File 'lib/wamp_rails/client.rb', line 109

def close
  self.wamp.close
end

#is_active?Boolean

Returns true if the connection is active

Returns:

  • (Boolean)


19
20
21
# File 'lib/wamp_rails/client.rb', line 19

def is_active?
  self.active
end

#on_challenge(&on_challenge) ⇒ Object



14
15
16
# File 'lib/wamp_rails/client.rb', line 14

def on_challenge(&on_challenge)
  @on_challenge = on_challenge
end

#openObject

Opens the connection



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/wamp_rails/client.rb', line 95

def open
  # Create the background thread
  self.thread = Thread.new do
    EM.tick_loop do
      unless self.cmd_queue.empty?
        command = self.cmd_queue.pop
        self._execute_command(command)
      end
    end
    self.wamp.open
  end
end

#publish(topic, args = nil, kwargs = nil, options = {}, &callback) ⇒ Object

Note:

This method is blocking if the callback is not nil

Performs a WAMP publish



147
148
149
150
# File 'lib/wamp_rails/client.rb', line 147

def publish(topic, args=nil, kwargs=nil, options={}, &callback)
  command = WampRails::Command::Publish.new(topic, args, kwargs, options, self)
  self._queue_command(command, callback)
end

#register(procedure, klass, options = {}, &callback) ⇒ Object

Note:

This method is blocking if the callback is not nil

Performs a WAMP register



154
155
156
157
# File 'lib/wamp_rails/client.rb', line 154

def register(procedure, klass, options={}, &callback)
  command = WampRails::Command::Register.new(procedure, klass, options, self)
  self._queue_command(command, callback)
end

#routes(&block) ⇒ Object

Used to configure the routes for the client



116
117
118
# File 'lib/wamp_rails/client.rb', line 116

def routes(&block)
  self.instance_eval(&block)
end

#subscribe(topic, klass, options = {}, &callback) ⇒ Object

Note:

This method is blocking if the callback is not nil

Performs a WAMP subscribe



161
162
163
164
# File 'lib/wamp_rails/client.rb', line 161

def subscribe(topic, klass, options={}, &callback)
  command = WampRails::Command::Subscribe.new(topic, klass, options, self)
  self._queue_command(command, callback)
end

#wait_for_activeObject

Waits for the session to become active



24
25
26
27
# File 'lib/wamp_rails/client.rb', line 24

def wait_for_active
  until self.is_active?
  end
end