Class: OnStomp::Components::ThreadedProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/onstomp/components/threaded_processor.rb

Overview

An IO processor that does its work on its own thread.

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ ThreadedProcessor

Creates a new processor for the client

Parameters:



7
8
9
10
11
# File 'lib/onstomp/components/threaded_processor.rb', line 7

def initialize client
  @client = client
  @run_thread = nil
  @closing = false
end

Instance Method Details

#joinself

Causes the thread this method was invoked in to pass until the processor is no longer running.

Returns:

  • (self)


53
54
55
56
57
# File 'lib/onstomp/components/threaded_processor.rb', line 53

def join
  Thread.pass while running?
  @run_thread && @run_thread.join
  self
end

#prepare_to_closeObject

Prepares the conneciton for closing by flushing its write buffer.



40
41
42
43
44
45
46
47
48
# File 'lib/onstomp/components/threaded_processor.rb', line 40

def prepare_to_close
  if running?
    @closing = true
    Thread.pass until @run_thread.stop?
    @client.connection.flush_write_buffer
    @closing = false
    @run_thread.wakeup
  end
end

#running?true, false

Returns true if its IO thread has been created and is alive, otherwise false.

Returns:

  • (true, false)


16
17
18
# File 'lib/onstomp/components/threaded_processor.rb', line 16

def running?
  @run_thread && @run_thread.alive?
end

#startself

Starts the processor by creating a new thread that continually invokes OnStomp::Connections::Base#io_process while the client is connected.

Returns:

  • (self)


24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/onstomp/components/threaded_processor.rb', line 24

def start
  @run_thread = Thread.new do
    begin
      while @client.connected?
        @client.connection.io_process
        Thread.stop if @closing
      end
    rescue OnStomp::StopReceiver
    rescue Exception
      raise
    end
  end
  self
end

#stopself

Forcefully stops the processor and joins its IO thread to the callee’s thread.

Returns:

  • (self)

Raises:

  • (IOError, SystemCallError)

    if either were raised in the IO thread and the client is still connected after the thread is joined.



65
66
67
68
69
70
71
72
73
74
# File 'lib/onstomp/components/threaded_processor.rb', line 65

def stop
  begin
    @run_thread.raise OnStomp::StopReceiver if @run_thread.alive?
    @run_thread.join
  rescue IOError, SystemCallError
    raise if @client.connected?
  end
  @run_thread = nil
  self
end