Class: Thread::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/thread/channel.rb

Overview

A channel lets you send and receive various messages in a thread-safe way.

It also allows for guards upon sending and retrieval, to ensure the passed messages are safe to be consumed.

Instance Method Summary collapse

Constructor Details

#initialize(messages = [], &block) ⇒ Channel

Create a channel with optional initial messages and optional channel guard.



19
20
21
22
23
24
25
26
27
# File 'lib/thread/channel.rb', line 19

def initialize (messages = [], &block)
	@messages = []
	@mutex    = Mutex.new
	@check    = block

	messages.each {|o|
		send o
	}
end

Instance Method Details

#receive(&block) ⇒ Object

Receive a message, if there are none the call blocks until there’s one.

If a block is passed, it’s used as guard to match to a message.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/thread/channel.rb', line 50

def receive (&block)
	message = nil
	found   = false

	if block
		until found
			@mutex.synchronize {
				if index = @messages.find_index(&block)
					message = @messages.delete_at(index)
					found   = true
				else
					cond.wait @mutex
				end
			}
		end
	else
		until found
			@mutex.synchronize {
				if @messages.empty?
					cond.wait @mutex
				end

				unless @messages.empty?
					message = @messages.shift
					found   = true
				end
			}
		end
	end

	message
end

#receive!(&block) ⇒ Object

Receive a message, if there are none the call returns nil.

If a block is passed, it’s used as guard to match to a message.



86
87
88
89
90
91
92
# File 'lib/thread/channel.rb', line 86

def receive! (&block)
	if block
		@messages.delete_at(@messages.find_index(&block))
	else
		@messages.shift
	end
end

#send(what) ⇒ Object

Send a message to the channel.

If there’s a guard, the value is passed to it, if the guard returns a falsy value an ArgumentError exception is raised and the message is not sent.



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/thread/channel.rb', line 33

def send (what)
	if @check && !@check.call(what)
		raise ArgumentError, 'guard mismatch'
	end

	@mutex.synchronize {
		@messages << what

		cond.broadcast if cond?
	}

	self
end