Class: Concurrent::Edge::Event

Inherits:
Synchronization::LockableObject
  • Object
show all
Includes:
Concern::Deprecation, Concern::Logging
Defined in:
lib/concurrent/edge/future.rb

Overview

Represents an event which will happen in future (will be completed). It has to always happen.

Direct Known Subclasses

CompletableEvent, Future

Instance Method Summary collapse

Constructor Details

#initialize(promise, default_executor) ⇒ Event

Returns a new instance of Event.



189
190
191
192
193
194
195
196
197
198
199
# File 'lib/concurrent/edge/future.rb', line 189

def initialize(promise, default_executor)
  super()
  @Promise            = promise
  @DefaultExecutor    = default_executor
  @Touched            = AtomicBoolean.new(false)
  @Callbacks          = LockFreeStack.new
  # TODO (pitr 12-Sep-2015): replace with AtomicFixnum, avoid aba problem
  # TODO (pitr 12-Sep-2015): look at java.util.concurrent solution
  @Waiters            = LockFreeStack.new
  self.internal_state = PENDING
end

Instance Method Details

#chain(executor = nil) {|success, value, reason| ... } ⇒ Object Also known as: then

Yields:

  • (success, value, reason)

    of the parent



252
253
254
# File 'lib/concurrent/edge/future.rb', line 252

def chain(executor = nil, &callback)
  ChainPromise.new(self, @DefaultExecutor, executor || @DefaultExecutor, &callback).future
end

#chain_completable(completable_event) ⇒ Object Also known as: tangle



258
259
260
# File 'lib/concurrent/edge/future.rb', line 258

def chain_completable(completable_event)
  on_completion! { completable_event.complete_with COMPLETED }
end

#completed?(state = internal_state) ⇒ Boolean Also known as: complete?

Has the Event been completed?

Returns:

  • (Boolean)


220
221
222
# File 'lib/concurrent/edge/future.rb', line 220

def completed?(state = internal_state)
  state.completed?
end

#default_executorExecutor

Returns current default executor.

Returns:

  • (Executor)

    current default executor

See Also:



247
248
249
# File 'lib/concurrent/edge/future.rb', line 247

def default_executor
  @DefaultExecutor
end

#delayEvent

Inserts delay into the chain of Futures making rest of it lazy evaluated.

Returns:



278
279
280
# File 'lib/concurrent/edge/future.rb', line 278

def delay
  ZipEventEventPromise.new(self, Delay.new(@DefaultExecutor).event, @DefaultExecutor).event
end

#inspectObject



320
321
322
# File 'lib/concurrent/edge/future.rb', line 320

def inspect
  "#{to_s[0..-2]} blocks:[#{blocks.map(&:to_s).join(', ')}]>"
end

#on_completion(executor = nil) {|success, value, reason| ... } ⇒ Object

Returns self.

Yields:

  • (success, value, reason)

    executed async on ‘executor` when completed

Returns:

  • self



300
301
302
# File 'lib/concurrent/edge/future.rb', line 300

def on_completion(executor = nil, &callback)
  add_callback :pr_async_callback_on_completion, executor || @DefaultExecutor, callback
end

#on_completion! {|success, value, reason| ... } ⇒ Object

Returns self.

Yields:

  • (success, value, reason)

    executed sync when completed

Returns:

  • self



306
307
308
# File 'lib/concurrent/edge/future.rb', line 306

def on_completion!(&callback)
  add_callback :pr_callback_on_completion, callback
end

#pending?(state = internal_state) ⇒ Boolean Also known as: incomplete?

Is Event/Future pending?

Returns:

  • (Boolean)


208
209
210
# File 'lib/concurrent/edge/future.rb', line 208

def pending?(state = internal_state)
  !state.completed?
end

#set(*args, &block) ⇒ Object



324
325
326
327
# File 'lib/concurrent/edge/future.rb', line 324

def set(*args, &block)
  raise 'Use CompletableEvent#complete or CompletableFuture#complete instead, ' +
            'constructed by Concurrent.event or Concurrent.future respectively.'
end

#state:pending, :completed

Returns:

  • (:pending, :completed)


202
203
204
# File 'lib/concurrent/edge/future.rb', line 202

def state
  internal_state.to_sym
end

#then_select(*channels) ⇒ Future

Zips with selected value form the suplied channels

Returns:



294
295
296
# File 'lib/concurrent/edge/future.rb', line 294

def then_select(*channels)
  ZipFutureEventPromise(Concurrent.select(*channels), self, @DefaultExecutor).future
end

#to_sObject



316
317
318
# File 'lib/concurrent/edge/future.rb', line 316

def to_s
  "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state.to_sym}>"
end

#unscheduled?Boolean

Returns:

  • (Boolean)


212
213
214
# File 'lib/concurrent/edge/future.rb', line 212

def unscheduled?
  raise 'unsupported'
end

#wait(timeout = nil) ⇒ Event, ...

Note:

a thread should wait only once! For repeated checking use faster ‘completed?` check. If thread waits periodically it will dangerously grow the waiters stack.

Wait until Event is #complete?

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Event, true, false)

    self or true/false if timeout is used



232
233
234
235
236
# File 'lib/concurrent/edge/future.rb', line 232

def wait(timeout = nil)
  touch
  result = wait_until_complete(timeout)
  timeout ? result : self
end

#with_default_executor(executor) ⇒ Event

Changes default executor for rest of the chain

Returns:



312
313
314
# File 'lib/concurrent/edge/future.rb', line 312

def with_default_executor(executor)
  EventWrapperPromise.new(self, executor).future
end

#zip(other) ⇒ Event Also known as: &

Zip with future producing new Future

Returns:



266
267
268
269
270
271
272
# File 'lib/concurrent/edge/future.rb', line 266

def zip(other)
  if other.is?(Future)
    ZipFutureEventPromise.new(other, self, @DefaultExecutor).future
  else
    ZipEventEventPromise.new(self, other, @DefaultExecutor).future
  end
end