Class: Celluloid::Mailbox

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/celluloid/mailbox.rb,
lib/celluloid/mailbox/evented.rb

Overview

Actors communicate with asynchronous messages. Messages are buffered in Mailboxes until Actors can act upon them.

Direct Known Subclasses

Evented

Defined Under Namespace

Classes: Evented

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMailbox


16
17
18
19
20
21
22
23
# File 'lib/celluloid/mailbox.rb', line 16

def initialize
  @address   = Celluloid.uuid
  @messages  = []
  @mutex     = Mutex.new
  @dead      = false
  @condition = ConditionVariable.new
  @max_size  = nil
end

Instance Attribute Details

#addressObject (readonly)

A unique address at which this mailbox can be found


13
14
15
# File 'lib/celluloid/mailbox.rb', line 13

def address
  @address
end

#max_sizeObject

Returns the value of attribute max_size


14
15
16
# File 'lib/celluloid/mailbox.rb', line 14

def max_size
  @max_size
end

Instance Method Details

#<<(message) ⇒ Object

Add a message to the Mailbox


26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/celluloid/mailbox.rb', line 26

def <<(message)
  @mutex.lock
  begin
    if mailbox_full || @dead
      dead_letter(message)
      return
    end
    if message.is_a?(SystemEvent)
      # SystemEvents are high priority messages so they get added to the
      # head of our message queue instead of the end
      @messages.unshift message
    else
      @messages << message
    end

    @condition.signal
    nil
  ensure
    @mutex.unlock rescue nil
  end
end

#alive?Boolean

Is the mailbox alive?


106
107
108
# File 'lib/celluloid/mailbox.rb', line 106

def alive?
  !@dead
end

#check(timeout = nil, &block) ⇒ Object

Receive a message from the Mailbox. May return nil and may return before the specified timeout.


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/celluloid/mailbox.rb', line 50

def check(timeout = nil, &block)
  message = nil

  @mutex.lock
  begin
    fail MailboxDead, "attempted to receive from a dead mailbox" if @dead

    message = nil
    Timers::Wait.for(timeout) do |remaining|
      message = next_message(&block)

      break if message

      @condition.wait(@mutex, remaining)
    end
  ensure
    @mutex.unlock rescue nil
  end

  message
end

#each(&block) ⇒ Object

Iterate through the mailbox


116
117
118
# File 'lib/celluloid/mailbox.rb', line 116

def each(&block)
  to_a.each(&block)
end

#inspectObject

Inspect the contents of the Mailbox


121
122
123
# File 'lib/celluloid/mailbox.rb', line 121

def inspect
  "#<#{self.class}:#{object_id.to_s(16)} @messages=[#{map(&:inspect).join(', ')}]>"
end

#receive(timeout = nil, &block) ⇒ Object

Receive a letter from the mailbox. Guaranteed to return a message. If timeout is exceeded, raise a TaskTimeout.


74
75
76
77
78
79
80
81
82
# File 'lib/celluloid/mailbox.rb', line 74

def receive(timeout = nil, &block)
  message = nil
  Timers::Wait.for(timeout) do |remaining|
    message = check(timeout, &block)
    break if message
  end
  return message if message
  fail TaskTimeout.new("receive timeout exceeded")
end

#shutdownObject

Shut down this mailbox and clean up its contents


85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/celluloid/mailbox.rb', line 85

def shutdown
  fail MailboxDead, "mailbox already shutdown" if @dead

  @mutex.lock
  begin
    yield if block_given?
    messages = @messages
    @messages = []
    @dead = true
  ensure
    @mutex.unlock rescue nil
  end

  messages.each do |msg|
    dead_letter msg
    msg.cleanup if msg.respond_to? :cleanup
  end
  true
end

#sizeObject

Number of messages in the Mailbox


126
127
128
# File 'lib/celluloid/mailbox.rb', line 126

def size
  @mutex.synchronize { @messages.size }
end

#to_aObject

Cast to an array


111
112
113
# File 'lib/celluloid/mailbox.rb', line 111

def to_a
  @mutex.synchronize { @messages.dup }
end