Class: Concurrent::Channel

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Enumerable
Defined in:
lib/concurrent-ruby-edge/concurrent/channel.rb,
lib/concurrent-ruby-edge/concurrent/channel/tick.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/timer.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/ticker.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/sliding.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/dropping.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/put_clause.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/take_clause.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/after_clause.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/error_clause.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/default_clause.rb

Overview

Defined Under Namespace

Modules: Buffer Classes: Tick, ValidationError

Constant Summary collapse

Error =
Class.new(StandardError)

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Channel

Returns a new instance of Channel.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 47

def initialize(opts = {})
  # undocumented -- for internal use only
  if opts.is_a? Buffer::Base
    self.buffer = opts
    return
  end

  capacity = opts[:capacity] || opts[:size]
  buffer = opts[:buffer]

  if capacity && buffer == :unbuffered
    raise ArgumentError.new('unbuffered channels cannot have a capacity')
  elsif capacity.nil? && buffer.nil?
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif capacity == 0 && buffer == :buffered
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif buffer == :unbuffered
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif capacity.nil? || capacity < 1
    raise ArgumentError.new('capacity must be at least 1 for this buffer type')
  else
    buffer ||= :buffered
    self.buffer = BUFFER_TYPES[buffer].new(capacity)
  end

  self.validator = opts.fetch(:validator, DEFAULT_VALIDATOR)
end

Class Method Details

.go(*args, &block) ⇒ Object



224
225
226
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 224

def go(*args, &block)
  go_via(GOROUTINES, *args, &block)
end

.go_loop(*args, &block) ⇒ Object



233
234
235
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 233

def go_loop(*args, &block)
  go_loop_via(GOROUTINES, *args, &block)
end

.go_loop_via(executor, *args, &block) ⇒ Object

Raises:

  • (ArgumentError)


237
238
239
240
241
242
243
244
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 237

def go_loop_via(executor, *args, &block)
  raise ArgumentError.new('no block given') unless block_given?
  executor.post(block, *args) do
    loop do
      break unless block.call(*args)
    end
  end
end

.go_via(executor, *args, &block) ⇒ Object

Raises:

  • (ArgumentError)


228
229
230
231
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 228

def go_via(executor, *args, &block)
  raise ArgumentError.new('no block given') unless block_given?
  executor.post(*args, &block)
end

.select(*args) {|selector, args| ... } ⇒ Object Also known as: alt

Yields:

  • (selector, args)

Raises:

  • (ArgumentError)


216
217
218
219
220
221
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 216

def select(*args)
  raise ArgumentError.new('no block given') unless block_given?
  selector = Selector.new
  yield(selector, *args)
  selector.execute
end

.ticker(interval) ⇒ Object Also known as: tick



211
212
213
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 211

def ticker(interval)
  Channel.new(Buffer::Ticker.new(interval))
end

.timer(seconds) ⇒ Object Also known as: after



206
207
208
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 206

def timer(seconds)
  Channel.new(Buffer::Timer.new(seconds))
end

Instance Method Details

#eachObject

Raises:

  • (ArgumentError)


193
194
195
196
197
198
199
200
201
202
203
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 193

def each
  raise ArgumentError.new('no block given') unless block_given?
  loop do
    item, more = do_next
    if item != Concurrent::NULL
      yield(item)
    elsif !more
      break
    end
  end
end

#nextObject

Examples:


jobs = Channel.new

Channel.go do
  loop do
    j, more = jobs.next
    if more
      print "received job #{j}\n"
    else
      print "received all jobs\n"
      break
    end
  end
end


159
160
161
162
163
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 159

def next
  item, more = do_next
  item = nil if item == Concurrent::NULL
  return item, more
end

#next?Boolean

Returns:

  • (Boolean)


165
166
167
168
169
170
171
172
173
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 165

def next?
  item, more = do_next
  item = if item == Concurrent::NULL
           Concurrent::Maybe.nothing
         else
           Concurrent::Maybe.just(item)
         end
  return item, more
end

#offer(item) ⇒ Object



99
100
101
102
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 99

def offer(item)
  return false unless validate(item, false, false)
  do_offer(item)
end

#offer!(item) ⇒ Object

Raises:



104
105
106
107
108
109
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 104

def offer!(item)
  validate(item, false, true)
  ok = do_offer(item)
  raise Error if !ok
  ok
end

#offer?(item) ⇒ Boolean

Returns:

  • (Boolean)


111
112
113
114
115
116
117
118
119
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 111

def offer?(item)
  if !validate(item, true, false)
    Concurrent::Maybe.nothing('invalid value')
  elsif do_offer(item)
    Concurrent::Maybe.just(true)
  else
    Concurrent::Maybe.nothing
  end
end

#pollObject



175
176
177
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 175

def poll
  (item = do_poll) == Concurrent::NULL ? nil : item
end

#poll!Object

Raises:



179
180
181
182
183
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 179

def poll!
  item = do_poll
  raise Error if item == Concurrent::NULL
  item
end

#poll?Boolean

Returns:

  • (Boolean)


185
186
187
188
189
190
191
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 185

def poll?
  if (item = do_poll) == Concurrent::NULL
    Concurrent::Maybe.nothing
  else
    Concurrent::Maybe.just(item)
  end
end

#put(item) ⇒ Object Also known as: send, <<



75
76
77
78
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 75

def put(item)
  return false unless validate(item, false, false)
  do_put(item)
end

#put!(item) ⇒ Object

Raises:



82
83
84
85
86
87
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 82

def put!(item)
  validate(item, false, true)
  ok = do_put(item)
  raise Error if !ok
  ok
end

#put?(item) ⇒ Boolean

Returns:

  • (Boolean)


89
90
91
92
93
94
95
96
97
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 89

def put?(item)
  if !validate(item, true, false)
    Concurrent::Maybe.nothing('invalid value')
  elsif do_put(item)
    Concurrent::Maybe.just(true)
  else
    Concurrent::Maybe.nothing
  end
end

#takeObject Also known as: receive, ~



121
122
123
124
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 121

def take
  item = do_take
  item == Concurrent::NULL ? nil : item
end

#take!Object

Raises:



128
129
130
131
132
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 128

def take!
  item = do_take
  raise Error if item == Concurrent::NULL
  item
end

#take?Boolean

Returns:

  • (Boolean)


134
135
136
137
138
139
140
141
142
# File 'lib/concurrent-ruby-edge/concurrent/channel.rb', line 134

def take?
  item = do_take
  item = if item == Concurrent::NULL
           Concurrent::Maybe.nothing
         else
           Concurrent::Maybe.just(item)
         end
  item
end