Class: Wamp::Worker::Runner::Main

Inherits:
Base
  • Object
show all
Defined in:
lib/wamp/worker/runner.rb

Overview

This class is the main runner

Instance Attribute Summary collapse

Attributes inherited from Base

#dispatcher, #name

Instance Method Summary collapse

Methods inherited from Base

#active?, #logger, #start, #stop

Constructor Details

#initialize(name = nil, **options) ⇒ Main

Constructor



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/wamp/worker/runner.rb', line 102

def initialize(name=nil, **options)
  super name

  # Combine the options
  options = Wamp::Worker.config.connection(self.name).merge options

  # Setup different options
  @challenge = options[:challenge]
  @client = options[:client] || Wamp::Client::Connection.new(options)
  @active = false

  # Create a queue for passing messages to the main runner
  @descriptor_queue = ::Queue.new

  # Note: since the queue monitor is attached to the same worker,
  # we need to lock the UUIDs together.  This will make sure they
  # delegate background tasks correctly
  uuid = self.dispatcher.uuid

  # Create a command queue monitor
  @queue_monitor = Background.new(self.name, uuid: uuid) do |runner|
    descriptor = runner.dispatcher.check_queues
    self.descriptor_queue.push(descriptor) if descriptor
  end

  # Add the tick loop handler
  self.client.transport_class.add_tick_loop { self.tick_handler }

  # Initialize the last tick
  @last_tick = Time.now.to_i

  # Catch SIGINT
  Signal.trap('INT') { self.stop }
  Signal.trap('TERM') { self.stop }
end

Instance Attribute Details

#challengeObject (readonly)

Returns the value of attribute challenge.



98
99
100
# File 'lib/wamp/worker/runner.rb', line 98

def challenge
  @challenge
end

#clientObject (readonly)

Returns the value of attribute client.



98
99
100
# File 'lib/wamp/worker/runner.rb', line 98

def client
  @client
end

#descriptor_queueObject (readonly)

Returns the value of attribute descriptor_queue.



98
99
100
# File 'lib/wamp/worker/runner.rb', line 98

def descriptor_queue
  @descriptor_queue
end

#queue_monitorObject (readonly)

Returns the value of attribute queue_monitor.



98
99
100
# File 'lib/wamp/worker/runner.rb', line 98

def queue_monitor
  @queue_monitor
end

Instance Method Details

#_startObject

Starts the run loop



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/wamp/worker/runner.rb', line 140

def _start

  # On join, we need to subscribe and register the different handlers
  self.client.on :join do |session, details|
    self.join_handler session, details
  end

  # On leave, we will print a message
  self.client.on :leave do |reason, details|
    self.leave_handler(reason, details)
  end

  # On challenge, we will run the users challenge code
  self.client.on :challenge do |authmethod, details|
    self.challenge_handler(authmethod, details)
  end

  # Start the monitors
  self.queue_monitor.start

  # Log info
  logger.info("#{self.class.name} '#{self.name}' started")

  # Start the connection
  self.client.open
end

#_stopObject

Stops the run loop



169
170
171
172
173
174
175
176
# File 'lib/wamp/worker/runner.rb', line 169

def _stop

  # Stop the other threads
  self.queue_monitor.stop

  # Stop the event machine
  self.client.close
end

#challenge_handler(authmethod, extra) ⇒ Object



198
199
200
201
202
203
204
205
206
207
# File 'lib/wamp/worker/runner.rb', line 198

def challenge_handler(authmethod, extra)
  logger.info("#{self.class.name} runner '#{self.name}' challenge")

  if self.challenge
    self.challenge.call(authmethod, extra)
  else
    self.stop
    raise(ArgumentError, "client asked for '#{authmethod}' challenge, but no ':challenge' option was provided")
  end
end

#join_handler(session, details) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/wamp/worker/runner.rb', line 178

def join_handler(session, details)
  logger.info("#{self.class.name} runner '#{self.name}' joined session with realm '#{details[:realm]}'")

  # Set the session
  self.dispatcher.session = session

  # Register for the procedures
  Wamp::Worker.register_procedures(self.name, self.dispatcher, session)

  # Subscribe to the topics
  Wamp::Worker.subscribe_topics(self.name, self.dispatcher, session)
end

#leave_handler(reason, details) ⇒ Object



191
192
193
194
195
196
# File 'lib/wamp/worker/runner.rb', line 191

def leave_handler(reason, details)
  logger.info("#{self.class.name} runner '#{self.name}' left session: #{reason}")

  # Clear the session
  self.dispatcher.session = nil
end

#tick_handlerObject

This method periodically checks if any work has come in from the queues



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/wamp/worker/runner.rb', line 211

def tick_handler

  # This code will implement the ticker every second.  This tells the
  # requestors that the worker is alive
  current_time = Time.now.to_i
  if current_time > @last_tick
    self.dispatcher.increment_ticker
    @last_tick = current_time
  end

  # Loop until the queue is empty
  until self.descriptor_queue.empty? do

    # Pop the value and process it
    descriptor = self.descriptor_queue.pop
    self.dispatcher.process(descriptor)

  end
end