Class: Agent::Selector

Inherits:
Object
  • Object
show all
Defined in:
lib/agent/selector.rb

Defined Under Namespace

Classes: Case

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSelector

Returns a new instance of Selector.



21
22
23
24
25
26
27
28
29
# File 'lib/agent/selector.rb', line 21

def initialize
  @ordered_cases = []
  @cases         = {}
  @operations    = {}
  @blocking_once = BlockingOnce.new
  @notifier      = Notifier.new
  @default_case  = nil
  @selected      = false
end

Instance Attribute Details

#casesObject (readonly)

Returns the value of attribute cases.



17
18
19
# File 'lib/agent/selector.rb', line 17

def cases
  @cases
end

Instance Method Details

#case(chan, direction, value = nil, &blk) ⇒ Object



45
46
47
48
49
# File 'lib/agent/selector.rb', line 45

def case(chan, direction, value=nil, &blk)
  raise "invalid case, must be a channel" unless chan.is_a?(Channel)
  raise Errors::InvalidDirection if direction != :send && direction != :receive
  add_case(chan, direction, value, &blk)
end

#default(&blk) ⇒ Object



31
32
33
34
35
36
37
# File 'lib/agent/selector.rb', line 31

def default(&blk)
  if @default_case
    raise Errors::DefaultCaseAlreadyDefinedError
  else
    @default_case = self.case(channel!(TrueClass, 1), :receive, &blk)
  end
end

#selectObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/agent/selector.rb', line 51

def select
  raise Errors::AlreadySelectedError if @selected

  if !@ordered_cases.empty?
    @ordered_cases.each do |cse|
      if cse.direction == :send
        @operations[cse.channel] << cse.channel.send(cse.value, :uuid => cse.uuid,
                                                                :blocking_once => @blocking_once,
                                                                :notifier => @notifier,
                                                                :deferred => true)
      else # :receive || :timeout
        @operations[cse.channel] << cse.channel.receive(:uuid => cse.uuid,
                                                        :blocking_once => @blocking_once,
                                                        :notifier => @notifier,
                                                        :deferred => true)
      end
    end

    if @default_case
      @default_case.channel.send(true, :uuid => @default_case.uuid, :blocking_once => @blocking_once, :notifier => @notifier, :deferred => true)
    end

    @notifier.wait

    execute_case(@notifier.payload)
  end
ensure
  @selected = true
  close_default_channel
  dequeue_operations
end

#timeout(t, &blk) ⇒ Object



39
40
41
42
43
# File 'lib/agent/selector.rb', line 39

def timeout(t, &blk)
  s = channel!(TrueClass, 1)
  go!{ sleep t; s.send(true); s.close }
  add_case(s, :timeout, &blk)
end