Class: Airflow::Reactor

Inherits:
Object
  • Object
show all
Includes:
Utils::Logger, Singleton
Defined in:
lib/async_flow/reactor.rb

Defined Under Namespace

Classes: AsyncWrapper

Instance Method Summary collapse

Methods included from Utils::Logger

#logger

Constructor Details

#initializeReactor

Returns a new instance of Reactor.



32
33
34
35
36
37
38
# File 'lib/async_flow/reactor.rb', line 32

def initialize
  @commands = Queue.new
  @mutex = Mutex.new
  @thread = nil
  @state = State.bg_state
  start
end

Instance Method Details

#schedule(&block) ⇒ Object

Raises:



51
52
53
54
55
# File 'lib/async_flow/reactor.rb', line 51

def schedule(&block)
  raise Error, "reactor is in #{state.state} state" unless state.running?

  commands << block
end

#startObject



57
58
59
60
61
62
63
64
65
66
# File 'lib/async_flow/reactor.rb', line 57

def start
  logger.info "starting reactor"
  mutex.synchronize do
    return if state.running?

    @thread = spawn_thread { run_reactor }
    state.running!
    logger.info "reactor started"
  end
end

#stopObject



40
41
42
43
44
45
46
47
48
49
# File 'lib/async_flow/reactor.rb', line 40

def stop
  mutex.synchronize do
    return if state.stopped?

    logger.info "stopping reactor"
    schedule { state.stop! }
    thread.join
    logger.info "reactor thread stopped"
  end
end