Class: Wamp::Worker::Runner::Main
- Defined in:
- lib/wamp/worker/runner.rb
Overview
This class is the main runner
Instance Attribute Summary collapse
-
#challenge ⇒ Object
readonly
Returns the value of attribute challenge.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#descriptor_queue ⇒ Object
readonly
Returns the value of attribute descriptor_queue.
-
#queue_monitor ⇒ Object
readonly
Returns the value of attribute queue_monitor.
Attributes inherited from Base
Instance Method Summary collapse
-
#_start ⇒ Object
Starts the run loop.
-
#_stop ⇒ Object
Stops the run loop.
- #challenge_handler(authmethod, extra) ⇒ Object
-
#initialize(name = nil, **options) ⇒ Main
constructor
Constructor.
- #join_handler(session, details) ⇒ Object
- #leave_handler(reason, details) ⇒ Object
-
#tick_handler ⇒ Object
This method periodically checks if any work has come in from the queues.
Methods inherited from Base
#active?, #logger, #start, #stop
Constructor Details
#initialize(name = nil, **options) ⇒ Main
Constructor
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/wamp/worker/runner.rb', line 102 def initialize(name=nil, **) super name # Combine the options = Wamp::Worker.config.connection(self.name).merge # Setup different options @challenge = [:challenge] @client = [:client] || Wamp::Client::Connection.new() @active = false # Create a queue for passing messages to the main runner @descriptor_queue = ::Queue.new # Note: since the queue monitor is attached to the same worker, # we need to lock the UUIDs together. This will make sure they # delegate background tasks correctly uuid = self.dispatcher.uuid # Create a command queue monitor @queue_monitor = Background.new(self.name, uuid: uuid) do |runner| descriptor = runner.dispatcher.check_queues self.descriptor_queue.push(descriptor) if descriptor end # Add the tick loop handler self.client.transport_class.add_tick_loop { self.tick_handler } # Initialize the last tick @last_tick = Time.now.to_i # Catch SIGINT Signal.trap('INT') { self.stop } Signal.trap('TERM') { self.stop } end |
Instance Attribute Details
#challenge ⇒ Object (readonly)
Returns the value of attribute challenge.
98 99 100 |
# File 'lib/wamp/worker/runner.rb', line 98 def challenge @challenge end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
98 99 100 |
# File 'lib/wamp/worker/runner.rb', line 98 def client @client end |
#descriptor_queue ⇒ Object (readonly)
Returns the value of attribute descriptor_queue.
98 99 100 |
# File 'lib/wamp/worker/runner.rb', line 98 def descriptor_queue @descriptor_queue end |
#queue_monitor ⇒ Object (readonly)
Returns the value of attribute queue_monitor.
98 99 100 |
# File 'lib/wamp/worker/runner.rb', line 98 def queue_monitor @queue_monitor end |
Instance Method Details
#_start ⇒ Object
Starts the run loop
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/wamp/worker/runner.rb', line 140 def _start # On join, we need to subscribe and register the different handlers self.client.on :join do |session, details| self.join_handler session, details end # On leave, we will print a message self.client.on :leave do |reason, details| self.leave_handler(reason, details) end # On challenge, we will run the users challenge code self.client.on :challenge do |authmethod, details| self.challenge_handler(authmethod, details) end # Start the monitors self.queue_monitor.start # Log info logger.info("#{self.class.name} '#{self.name}' started") # Start the connection self.client.open end |
#_stop ⇒ Object
Stops the run loop
169 170 171 172 173 174 175 176 |
# File 'lib/wamp/worker/runner.rb', line 169 def _stop # Stop the other threads self.queue_monitor.stop # Stop the event machine self.client.close end |
#challenge_handler(authmethod, extra) ⇒ Object
198 199 200 201 202 203 204 205 206 207 |
# File 'lib/wamp/worker/runner.rb', line 198 def challenge_handler(authmethod, extra) logger.info("#{self.class.name} runner '#{self.name}' challenge") if self.challenge self.challenge.call(authmethod, extra) else self.stop raise(ArgumentError, "client asked for '#{authmethod}' challenge, but no ':challenge' option was provided") end end |
#join_handler(session, details) ⇒ Object
178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/wamp/worker/runner.rb', line 178 def join_handler(session, details) logger.info("#{self.class.name} runner '#{self.name}' joined session with realm '#{details[:realm]}'") # Set the session self.dispatcher.session = session # Register for the procedures Wamp::Worker.register_procedures(self.name, self.dispatcher, session) # Subscribe to the topics Wamp::Worker.subscribe_topics(self.name, self.dispatcher, session) end |
#leave_handler(reason, details) ⇒ Object
191 192 193 194 195 196 |
# File 'lib/wamp/worker/runner.rb', line 191 def leave_handler(reason, details) logger.info("#{self.class.name} runner '#{self.name}' left session: #{reason}") # Clear the session self.dispatcher.session = nil end |
#tick_handler ⇒ Object
This method periodically checks if any work has come in from the queues
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/wamp/worker/runner.rb', line 211 def tick_handler # This code will implement the ticker every second. This tells the # requestors that the worker is alive current_time = Time.now.to_i if current_time > @last_tick self.dispatcher.increment_ticker @last_tick = current_time end # Loop until the queue is empty until self.descriptor_queue.empty? do # Pop the value and process it descriptor = self.descriptor_queue.pop self.dispatcher.process(descriptor) end end |