Class: ExcADG::Broker
- Inherits:
-
Object
- Object
- ExcADG::Broker
- Includes:
- Singleton
- Defined in:
- lib/excadg/broker.rb
Overview
handle requests sending/receiving though Ractor’s interface
Defined Under Namespace
Classes: CantSendRequest, RequestProcessingFailed, UnknownRequestType
Instance Attribute Summary collapse
-
#data_store ⇒ Object
readonly
Returns the value of attribute data_store.
-
#vtracker ⇒ Object
readonly
Returns the value of attribute vtracker.
Class Method Summary collapse
-
.ask(request) ⇒ Object
is used from vertices to send requests to the broker.
Instance Method Summary collapse
-
#start(track: true) ⇒ Object
start processing vertices asks.
-
#teardown ⇒ Object
stop processing vertices asks usually follows #wait_all.
-
#wait_all(timeout: 60, period: 1) ⇒ Thread
makes a thread to wait for all known vertices to reach a final state; it expects some vertices to be started in the outer scope, so it waits even if there are no vertices at all yet.
Instance Attribute Details
#data_store ⇒ Object (readonly)
Returns the value of attribute data_store.
16 17 18 |
# File 'lib/excadg/broker.rb', line 16 def data_store @data_store end |
#vtracker ⇒ Object (readonly)
Returns the value of attribute vtracker.
16 17 18 |
# File 'lib/excadg/broker.rb', line 16 def vtracker @vtracker end |
Class Method Details
.ask(request) ⇒ Object
is used from vertices to send requests to the broker
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/excadg/broker.rb', line 24 def self.ask request raise UnknownRequestType, request unless request.is_a? Request begin Ractor.main.send request rescue StandardError => e raise CantSendRequest, cause: e end Log.info 'getting response' resp = Ractor.receive Log.debug "got response #{resp}" raise resp if resp.is_a? StandardError Log.debug 'returning response' resp end |
Instance Method Details
#start(track: true) ⇒ Object
start processing vertices asks
45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/excadg/broker.rb', line 45 def start track: true @vtracker = VTracker.new if track Thread.report_on_exception = false @messenger = Thread.new { loop { process_request } } at_exit { Log.info 'shutting down messenger' @messenger.kill Log.info 'messenger is stut down' } Log.info 'broker is started' @messenger end |
#teardown ⇒ Object
stop processing vertices asks usually follows #wait_all
62 63 64 65 66 |
# File 'lib/excadg/broker.rb', line 62 def teardown return if @messenger.nil? @messenger.kill while @messenger.alive? end |
#wait_all(timeout: 60, period: 1) ⇒ Thread
makes a thread to wait for all known vertices to reach a final state; it expects some vertices to be started in the outer scope, so it waits even if there are no vertices at all yet
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/excadg/broker.rb', line 74 def wait_all timeout: 60, period: 1 Thread.report_on_exception = false Thread.new { Log.info "timeout is #{timeout || '∞'} seconds" Timeout.timeout(timeout) { loop { sleep period if @data_store.empty? Log.info 'no vertices in data store, keep waiting' next end states = @data_store.to_a.group_by(&:state).keys Log.info "vertices in #{states} states exist" # that's the only final states for vertices break if (states - %i[done failed]).empty? } } } end |