Class: Actor::Mailbox

Inherits:
Object
  • Object
show all
Defined in:
lib/revactor/mailbox.rb

Overview

Actor mailbox. For purposes of efficiency the mailbox also handles suspending and resuming an actor when no messages match its filter set.

Defined Under Namespace

Classes: Filter, Timer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMailbox

Returns a new instance of Mailbox.



15
16
17
18
# File 'lib/revactor/mailbox.rb', line 15

def initialize
  @timer = nil
  @queue = []
end

Instance Attribute Details

#timed_outObject

Returns the value of attribute timed_out.



12
13
14
# File 'lib/revactor/mailbox.rb', line 12

def timed_out
  @timed_out
end

#timeout_actionObject

Returns the value of attribute timeout_action.



13
14
15
# File 'lib/revactor/mailbox.rb', line 13

def timeout_action
  @timeout_action
end

#timerObject

Returns the value of attribute timer.



11
12
13
# File 'lib/revactor/mailbox.rb', line 11

def timer
  @timer
end

Instance Method Details

#<<(message) ⇒ Object

Add a message to the mailbox queue



21
22
23
# File 'lib/revactor/mailbox.rb', line 21

def <<(message)
  @queue << message
end

#clearObject

Clear the mailbox



88
89
90
# File 'lib/revactor/mailbox.rb', line 88

def clear
  @queue.clear
end

#empty?Boolean

Is the mailbox empty?

Returns:

  • (Boolean)


83
84
85
# File 'lib/revactor/mailbox.rb', line 83

def empty?
  @queue.empty?
end

#receive {|filter| ... } ⇒ Object

Attempt to receive a message

Yields:

  • (filter)

Raises:

  • (ArgumentError)


26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/revactor/mailbox.rb', line 26

def receive
  raise ArgumentError, "no filter block given" unless block_given?

  # Clear mailbox processing variables
  action = matched_index = nil
  at = 0

  # Clear timeout variables
  @timed_out = false
  @timeout_action = nil

  # Build the filter
  filter = Filter.new(self)
  yield filter
  raise ArgumentError, "empty filter" if filter.empty?

  # Process incoming messages
  while action.nil?
    at.upto(@queue.size - 1) do |i|
      next unless action = filter.match(@queue[i])
      matched_index = i
      break
    end
    
    at = @queue.size

    # Ignore timeouts if we've matched a message
    if action
      @timed_out = false
      break
    end
    
    # Otherwise run the timeout action
    action = @timeout_action if @timed_out

    # If no matching action is found, reschedule until we get another message
    Actor.reschedule unless action
  end

  @timeout_action = nil
  
  if @timer
    @timer.detach if @timer.attached?
    @timer = nil
  end

  # If we encountered a timeout, call the action directly
  if @timed_out
    @timed_out = false
    return action.call
  end

  # Otherwise we matched a message, so process it with the action      
  action.call @queue.delete_at(matched_index)
end