Class: Stomper::Receivers::Threaded
- Inherits:
-
Object
- Object
- Stomper::Receivers::Threaded
- Defined in:
- lib/stomper/receivers/threaded.rb
Overview
Basic threaded receiver
Defined Under Namespace
Classes: StopReceiver
Instance Attribute Summary collapse
-
#running ⇒ true, false
(also: #running?)
readonly
Returns true if the receiver is currently running, false otherwise.
Instance Method Summary collapse
-
#initialize(connection) ⇒ Threaded
constructor
Creates a new threaded receiver for the supplied Connection.
-
#start ⇒ self
Starts the receiver by creating a new thread to continually poll the connection for new Stomp frames.
-
#stop ⇒ self
Stops the receiver by shutting down the polling thread.
Constructor Details
#initialize(connection) ⇒ Threaded
Creates a new threaded receiver for the supplied Connection. Invoking #start on this receiver will create a new thread that will continually call receive on the connection. Stopping this receiver with #stop will terminate the thread.
21 22 23 24 25 26 |
# File 'lib/stomper/receivers/threaded.rb', line 21 def initialize(connection) @connection = connection @running = false @run_mutex = ::Mutex.new @run_thread = nil end |
Instance Attribute Details
#running ⇒ true, false (readonly) Also known as: running?
Returns true if the receiver is currently running, false otherwise. If the polling thread is terminated due to a raised exception, this attribute will be false.
12 13 14 |
# File 'lib/stomper/receivers/threaded.rb', line 12 def running @running end |
Instance Method Details
#start ⇒ self
Starts the receiver by creating a new thread to continually poll the connection for new Stomp frames. If an error is raised while calling Connection#receive, the polling thread will terminate, and #running? will return false.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/stomper/receivers/threaded.rb', line 33 def start is_starting = @run_mutex.synchronize { @running = true unless @running } if is_starting @run_thread = Thread.new do begin until @connection.receive.nil? end rescue ::Stomper::Receivers::Threaded::StopReceiver rescue Exception => ex @running = false raise ex end @running = false end end self end |
#stop ⇒ self
Stops the receiver by shutting down the polling thread. If an error was raised within the thread, this method will generally re-raise the error. The one exception to this behavior is if the error raised was an instance of IOError
and a call to Connection#connected? returns false, in which case the error is ignored. The reason for this is that performing a read operation on a closed stream will raise an IOError
. It is likely that when shutting down a connection and its receiver, the polling thread may be blocked on reading from the stream and raise such an error.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/stomper/receivers/threaded.rb', line 61 def stop stopped = @run_mutex.synchronize { @run_thread.nil? } unless stopped @run_thread.raise(::Stomper::Receivers::Threaded::StopReceiver.new) begin @run_thread.join rescue ::IOError, ::SystemCallError raise if @connection.connected? rescue ::Stomper::Receivers::Threaded::StopReceiver => ex end @run_thread = nil end self end |