Class: Backports::Ractor
- Inherits:
-
Object
show all
- Defined in:
- lib/backports/ractor/cloner.rb,
lib/backports/ractor/errors.rb,
lib/backports/ractor/queues.rb,
lib/backports/ractor/ractor.rb,
lib/backports/ractor/sharing.rb
Defined Under Namespace
Classes: ClosedError, Error, RemoteError
Class Attribute Summary collapse
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(*args, &block) ⇒ Ractor
Implementation notes
Uses one ‘Thread` for each `Ractor`, as well as queues for communication
The incoming queue is strict: contrary to standard queue, you can’t pop from an empty closed queue. Since standard queues return ‘nil` is those conditions, we wrap/unwrap `nil` values and consider all `nil` values to be results of closed queues. `ClosedQueueError` are re-raised as `Ractor::ClosedError`
The outgoing queue is strict and blocking. Same wrapping / raising as incoming, with an extra queue to acknowledge when a value has been read (or if the port is closed while waiting).
The last result is a bit tricky as it needs to be pushed on the outgoing queue but can not be blocking. For this, we “soft close” the outgoing port.
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/backports/ractor/ractor.rb', line 30
def initialize(*args, &block)
@ractor_incoming_queue = IncomingQueue.new
@ractor_outgoing_queue = OutgoingQueue.new
raise ::ArgumentError, 'must be called with a block' unless block
kw = args.last
if kw.is_a?(::Hash) && kw.size == 1 && kw.key?(:name)
args.pop
name = kw[:name]
end
@ractor_name = name && Backports.coerce_to_str(name)
@id = Ractor.ractor_next_id
if Ractor.main == nil @ractor_thread = ::Thread.current
@ractor_origin = nil
@ractor_thread.thread_variable_set(:backports_ractor, self)
else
@ractor_origin = caller(1, 1).first.split(':in `').first
args.map! { |a| Ractor.ractor_isolate(a, false) }
ractor_thread_start(args, block)
end
end
|
Class Attribute Details
Returns the value of attribute main.
248
249
250
|
# File 'lib/backports/ractor/ractor.rb', line 248
def main
@main
end
|
Instance Attribute Details
#ractor_incoming_queue ⇒ Object
This method is part of a private API.
You should avoid using this method if possible, as it may be removed or be changed in the future.
268
269
270
|
# File 'lib/backports/ractor/ractor.rb', line 268
def ractor_incoming_queue
@ractor_incoming_queue
end
|
#ractor_outgoing_queue ⇒ Object
This method is part of a private API.
You should avoid using this method if possible, as it may be removed or be changed in the future.
268
269
270
|
# File 'lib/backports/ractor/ractor.rb', line 268
def ractor_outgoing_queue
@ractor_outgoing_queue
end
|
#ractor_thread ⇒ Object
This method is part of a private API.
You should avoid using this method if possible, as it may be removed or be changed in the future.
268
269
270
|
# File 'lib/backports/ractor/ractor.rb', line 268
def ractor_thread
@ractor_thread
end
|
Class Method Details
226
227
228
|
# File 'lib/backports/ractor/ractor.rb', line 226
def count
::ObjectSpace.each_object(Ractor).count(&:ractor_live?)
end
|
221
222
223
224
|
# File 'lib/backports/ractor/ractor.rb', line 221
def current
::Thread.current.thread_variable_get(:backports_ractor) ||
::Thread.current.thread_variable_set(:backports_ractor, ractor_find_current)
end
|
.make_shareable(obj) ⇒ Object
211
212
213
214
215
|
# File 'lib/backports/ractor/ractor.rb', line 211
def make_shareable(obj)
return obj if ractor_check_shareability?(obj, true)
raise Ractor::Error, '#freeze does not freeze object correctly'
end
|
.ractor_isolate(val, move = false) ⇒ Object
This method is part of a private API.
You should avoid using this method if possible, as it may be removed or be changed in the future.
7
8
9
10
11
|
# File 'lib/backports/ractor/sharing.rb', line 7
def ractor_isolate(val, move = false)
return val if move
Cloner.deep_clone(val)
end
|
.ractor_mark_set_shareable(visited) ⇒ Object
49
50
51
52
53
|
# File 'lib/backports/ractor/sharing.rb', line 49
def ractor_mark_set_shareable(visited)
visited.each do |key|
@ractor_shareable[key] = Ractor
end
end
|
.ractor_next_id ⇒ Object
This method is part of a private API.
You should avoid using this method if possible, as it may be removed or be changed in the future.
243
244
245
246
|
# File 'lib/backports/ractor/ractor.rb', line 243
def ractor_next_id
@id ||= 0
@id += 1
end
|
.ractor_reset ⇒ Object
This method is part of a private API.
You should avoid using this method if possible, as it may be removed or be changed in the future.
231
232
233
234
235
236
237
238
239
240
|
# File 'lib/backports/ractor/ractor.rb', line 231
def ractor_reset
::ObjectSpace.each_object(Ractor).each do |r|
next if r == Ractor.current
next unless (th = r.ractor_thread)
th.kill
th.join
end
Ractor.current.ractor_incoming_queue.clear
end
|
.ractor_shareable_self?(obj, freeze_all) ⇒ Boolean
yield if shareability can’t be determined without looking at its parts
26
27
28
29
30
31
32
33
34
|
# File 'lib/backports/ractor/sharing.rb', line 26
def ractor_shareable_self?(obj, freeze_all)
return true if @ractor_shareable.key?(obj)
return true if ractor_shareable_by_nature?(obj, freeze_all)
if obj.frozen? || (freeze_all && obj.freeze)
yield
else
false
end
end
|
.receive ⇒ Object
Also known as:
recv
171
172
173
|
# File 'lib/backports/ractor/ractor.rb', line 171
def receive
current.__send__(:receive)
end
|
.receive_if(&block) ⇒ Object
176
177
178
|
# File 'lib/backports/ractor/ractor.rb', line 176
def receive_if(&block)
current.__send__(:receive_if, &block)
end
|
.select(*ractors, yield_value: not_given = true, move: false) ⇒ Object
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
|
# File 'lib/backports/ractor/ractor.rb', line 180
def select(*ractors, yield_value: not_given = true, move: false)
cur = Ractor.current
queues = ractors.map do |r|
r == cur ? r.ractor_incoming_queue : r.ractor_outgoing_queue
end
if !not_given
out = current.ractor_outgoing_queue
yield_value = ractor_isolate(yield_value, move)
elsif ractors.empty?
raise ::ArgumentError, 'specify at least one ractor or `yield_value`'
end
while true queues.each_with_index do |q, i|
q.pop_non_blocking do |val|
r = ractors[i]
return [r == cur ? :receive : r, val]
end
end
if out && out.num_waiting > 0
out.push(yield_value, ack: true)
return [:yield, nil]
end
sleep(0.001)
end
end
|
.shareable?(obj) ⇒ Boolean
217
218
219
|
# File 'lib/backports/ractor/ractor.rb', line 217
def shareable?(obj)
ractor_check_shareability?(obj, false)
end
|
.yield(value, move: false) ⇒ Object
164
165
166
167
168
169
|
# File 'lib/backports/ractor/ractor.rb', line 164
def yield(value, move: false)
value = ractor_isolate(value, move)
current.ractor_outgoing_queue.push(value, ack: true)
rescue ::ClosedQueueError
raise ClosedError, 'The outgoing-port is already closed'
end
|
Instance Method Details
150
151
152
|
# File 'lib/backports/ractor/ractor.rb', line 150
def [](key)
Ractor.current.ractor_locals[key]
end
|
#[]=(key, value) ⇒ Object
154
155
156
|
# File 'lib/backports/ractor/ractor.rb', line 154
def []=(key, value)
Ractor.current.ractor_locals[key] = value
end
|
#close_incoming ⇒ Object
129
130
131
132
133
|
# File 'lib/backports/ractor/ractor.rb', line 129
def close_incoming
r = ractor_incoming_queue.closed?
ractor_incoming_queue.close
r
end
|
#close_outgoing ⇒ Object
135
136
137
138
139
|
# File 'lib/backports/ractor/ractor.rb', line 135
def close_outgoing
r = ractor_outgoing_queue.closed?
ractor_outgoing_queue.close
r
end
|
117
118
119
120
121
122
123
124
125
126
127
|
# File 'lib/backports/ractor/ractor.rb', line 117
def inspect
state = RACTOR_STATE[@ractor_thread ? @ractor_thread.status : 'run']
info = [
"Ractor:##{@id}",
name,
@ractor_origin,
state,
].compact.join(' ')
"#<#{info}>"
end
|
104
105
106
|
# File 'lib/backports/ractor/ractor.rb', line 104
def name
@ractor_name
end
|
#ractor_live? ⇒ Boolean
This method is part of a private API.
You should avoid using this method if possible, as it may be removed or be changed in the future.
262
263
264
265
|
# File 'lib/backports/ractor/ractor.rb', line 262
def ractor_live?
!defined?(@ractor_thread) || @ractor_thread.status
end
|
#ractor_locals ⇒ Object
This method is part of a private API.
You should avoid using this method if possible, as it may be removed or be changed in the future.
159
160
161
|
# File 'lib/backports/ractor/ractor.rb', line 159
def ractor_locals
@ractor_locals ||= {}.compare_by_identity
end
|
#send(obj, move: false) ⇒ Object
Also known as:
<<
92
93
94
95
96
97
|
# File 'lib/backports/ractor/ractor.rb', line 92
def send(obj, move: false)
ractor_incoming_queue << Ractor.ractor_isolate(obj, move)
self
rescue ::ClosedQueueError
raise ClosedError, 'The incoming-port is already closed'
end
|
100
101
102
|
# File 'lib/backports/ractor/ractor.rb', line 100
def take
ractor_outgoing_queue.pop(ack: true)
end
|