Class: Concurrent::MVar

Inherits:
Object
  • Object
show all
Includes:
Dereferenceable
Defined in:
lib/concurrent/mvar.rb

Overview

An ‘MVar` is a single-element container that blocks on `get` if it is empty, and blocks on `put` if it is full. It is safe to use an `MVar` from multiple threads. `MVar` can be seen as a single-element blocking queue, or a rendezvous variable.

An ‘MVar` is typically used to transfer objects between threads, where the sending thread will block if the previous message hasn’t been taken yet by the receiving thread. It can also be used to control access to some global shared state, where threads ‘take` the value, perform some operation, and then `put` it back.

Constant Summary collapse

EMPTY =

Unique value that represents that an ‘MVar` was empty

Object.new
TIMEOUT =

Unique value that represents that an ‘MVar` timed out before it was able to produce a value.

Object.new

Instance Method Summary collapse

Methods included from Dereferenceable

#value

Constructor Details

#initialize(value = EMPTY, opts = {}) ⇒ MVar

Create a new ‘MVar`, either empty or with an initial value.

Options Hash (opts):

  • :operation (Boolean) — default: false

    when ‘true` will execute the future on the global operation pool (for long-running operations), when `false` will execute the future on the global task pool (for short-running tasks)

  • :executor (object)

    when provided will run all operations on this executor rather than the global thread pool (overrides :operation)

  • :dup_on_deref (String) — default: false

    call ‘#dup` before returning the data

  • :freeze_on_deref (String) — default: false

    call ‘#freeze` before returning the data

  • :copy_on_deref (String) — default: nil

    call the given ‘Proc` passing the internal value and returning the value returned from the proc



40
41
42
43
44
45
46
# File 'lib/concurrent/mvar.rb', line 40

def initialize(value = EMPTY, opts = {})
  @value = value
  @mutex = Mutex.new
  @empty_condition = Condition.new
  @full_condition = Condition.new
  set_deref_options(opts)
end

Instance Method Details

#empty?Boolean

Returns if the ‘MVar` is currently empty.



164
165
166
# File 'lib/concurrent/mvar.rb', line 164

def empty?
  @mutex.synchronize { @value == EMPTY }
end

#full?Boolean

Returns if the ‘MVar` currently contains a value.



169
170
171
# File 'lib/concurrent/mvar.rb', line 169

def full?
  not empty?
end

#modify(timeout = nil) ⇒ Object

Atomically ‘take`, yield the value to a block for transformation, and then `put` the transformed value. Returns the transformed value. A timeout can be set to limit the time spent blocked, in which case it returns `TIMEOUT` if the time is exceeded.

Raises:

  • (ArgumentError)


92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/concurrent/mvar.rb', line 92

def modify(timeout = nil)
  raise ArgumentError.new('no block given') unless block_given?

  @mutex.synchronize do
    wait_for_full(timeout)

    # If we timed out we'll still be empty
    if unlocked_full?
      value = @value
      @value = yield value
      @full_condition.signal
      apply_deref_options(value)
    else
      TIMEOUT
    end
  end
end

#modify!Object

Non-blocking version of ‘modify` that will yield with `EMPTY` if there is no value yet.

Raises:

  • (ArgumentError)


148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/concurrent/mvar.rb', line 148

def modify!
  raise ArgumentError.new('no block given') unless block_given?

  @mutex.synchronize do
    value = @value
    @value = yield value
    if unlocked_empty?
      @empty_condition.signal
    else
      @full_condition.signal
    end
    apply_deref_options(value)
  end
end

#put(value, timeout = nil) ⇒ Object

Put a value into an ‘MVar`, blocking if there is already a value until it is empty. A timeout can be set to limit the time spent blocked, in which case it returns `TIMEOUT` if the time is exceeded.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/concurrent/mvar.rb', line 72

def put(value, timeout = nil)
  @mutex.synchronize do
    wait_for_empty(timeout)

    # If we timed out we won't be empty
    if unlocked_empty?
      @value = value
      @full_condition.signal
      apply_deref_options(value)
    else
      TIMEOUT
    end
  end
end

#set!(value) ⇒ Object

Non-blocking version of ‘put` that will overwrite an existing value.



138
139
140
141
142
143
144
145
# File 'lib/concurrent/mvar.rb', line 138

def set!(value)
  @mutex.synchronize do
    old_value = @value
    @value = value
    @full_condition.signal
    apply_deref_options(old_value)
  end
end

#take(timeout = nil) ⇒ Object

Remove the value from an ‘MVar`, leaving it empty, and blocking if there isn’t a value. A timeout can be set to limit the time spent blocked, in which case it returns ‘TIMEOUT` if the time is exceeded.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/concurrent/mvar.rb', line 52

def take(timeout = nil)
  @mutex.synchronize do
    wait_for_full(timeout)

    # If we timed out we'll still be empty
    if unlocked_full?
      value = @value
      @value = EMPTY
      @empty_condition.signal
      apply_deref_options(value)
    else
      TIMEOUT
    end
  end
end

#try_put!(value) ⇒ Object

Non-blocking version of ‘put`, that returns whether or not it was successful.



125
126
127
128
129
130
131
132
133
134
135
# File 'lib/concurrent/mvar.rb', line 125

def try_put!(value)
  @mutex.synchronize do
    if unlocked_empty?
      @value = value
      @full_condition.signal
      true
    else
      false
    end
  end
end

#try_take!Object

Non-blocking version of ‘take`, that returns `EMPTY` instead of blocking.



111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/concurrent/mvar.rb', line 111

def try_take!
  @mutex.synchronize do
    if unlocked_full?
      value = @value
      @value = EMPTY
      @empty_condition.signal
      apply_deref_options(value)
    else
      EMPTY
    end
  end
end