Class: ExcADG::Broker

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/excadg/broker.rb

Overview

handle requests sending/receiving though Ractor’s interface

Defined Under Namespace

Classes: CantSendRequest, RequestProcessingFailed, UnknownRequestType

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#data_storeObject (readonly)

Returns the value of attribute data_store.



16
17
18
# File 'lib/excadg/broker.rb', line 16

def data_store
  @data_store
end

#vtrackerObject (readonly)

Returns the value of attribute vtracker.



16
17
18
# File 'lib/excadg/broker.rb', line 16

def vtracker
  @vtracker
end

Class Method Details

.ask(request) ⇒ Object

is used from vertices to send requests to the broker

Parameters:

Returns:

  • data received in response from the main ractor

Raises:



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/excadg/broker.rb', line 24

def self.ask request
  raise UnknownRequestType, request unless request.is_a? Request

  begin
    Ractor.main.send request
  rescue StandardError => e
    raise CantSendRequest, cause: e
  end

  Log.info 'getting response'
  resp = Ractor.receive
  Log.debug "got response #{resp}"
  raise resp if resp.is_a? StandardError

  Log.debug 'returning response'
  resp
end

Instance Method Details

#start(track: true) ⇒ Object

start processing vertices asks

Parameters:

  • track (defaults to: true)

    whether to track vertices using VTracker

Returns:

  • messanges processing Thread



45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/excadg/broker.rb', line 45

def start track: true
  @vtracker = VTracker.new if track
  Thread.report_on_exception = false
  @messenger = Thread.new { loop { process_request } }

  at_exit {
    Log.info 'shutting down messenger'
    @messenger.kill
    Log.info 'messenger is stut down'
  }

  Log.info 'broker is started'
  @messenger
end

#teardownObject

stop processing vertices asks usually follows #wait_all



62
63
64
65
66
# File 'lib/excadg/broker.rb', line 62

def teardown
  return if @messenger.nil?

  @messenger.kill while @messenger.alive?
end

#wait_all(timeout: 60, period: 1) ⇒ Thread

makes a thread to wait for all known vertices to reach a final state; it expects some vertices to be started in the outer scope, so it waits even if there are no vertices at all yet

Parameters:

  • timeout (defaults to: 60)

    total waiting timeout in seconds, nil means wait forever

  • period (defaults to: 1)

    time between vertices state check

Returns:

  • (Thread)

    that waits for all deps, typical usage is ‘Broker.instance.wait_all.join`



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/excadg/broker.rb', line 74

def wait_all timeout: 60, period: 1
  Thread.report_on_exception = false
  Thread.new {
    Log.info "timeout is #{timeout || ''} seconds"
    Timeout.timeout(timeout) {
      loop {
        sleep period
        if @data_store.empty?
          Log.info 'no vertices in data store, keep waiting'
          next
        end
        states = @data_store.to_a.group_by(&:state).keys
        Log.info "vertices in #{states} states exist"
        # that's the only final states for vertices
        break if (states - %i[done failed]).empty?
      }
    }
  }
end