Class: Concurrent::TimerSet

Inherits:
Object
  • Object
show all
Includes:
RubyExecutor
Defined in:
lib/concurrent/executor/timer_set.rb

Overview

Executes a collection of tasks at the specified times. A master thread monitors the set and schedules each task for execution at the appropriate time. Tasks are run on the global task pool or on the supplied executor.

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RubyExecutor

#<<, #running?, #shutdown, #shutdown?, #shuttingdown?, #wait_for_termination

Methods included from Logging

#log

Methods included from Executor

#can_overflow?

Constructor Details

#initialize(opts = {}) ⇒ TimerSet

Create a new set of timed tasks.

Parameters:

  • opts (Hash) (defaults to: {})

    the options controlling how the future will be processed

Options Hash (opts):

  • :operation (Boolean) — default: false

    when ‘true` will execute the future on the global operation pool (for long-running operations), when `false` will execute the future on the global task pool (for short-running tasks)

  • :executor (object)

    when provided will run all operations on this executor rather than the global thread pool (overrides :operation)



24
25
26
27
28
29
30
# File 'lib/concurrent/executor/timer_set.rb', line 24

def initialize(opts = {})
  @queue = PriorityQueue.new(order: :min)
  @task_executor = OptionsParser::get_executor_from(opts)
  @timer_executor = SingleThreadExecutor.new
  @condition = Condition.new
  init_executor
end

Class Method Details

.calculate_schedule_time(intended_time, now = Time.now) ⇒ Fixnum

Calculate an Epoch time with milliseconds at which to execute a task. If the given time is a ‘Time` object it will be converted accordingly. If the time is an integer value greater than zero it will be understood as a number of seconds in the future and will be added to the current time to calculate Epoch.

Parameters:

  • intended_time (Object)

    the time (as a ‘Time` object or an integer) to schedule the task for execution

  • now (Time) (defaults to: Time.now)

    (Time.now) the time from which to calculate an interval

Returns:

  • (Fixnum)

    the intended time as seconds/millis from Epoch

Raises:

  • (ArgumentError)

    if the intended execution time is not in the future



79
80
81
82
83
84
85
86
87
# File 'lib/concurrent/executor/timer_set.rb', line 79

def self.calculate_schedule_time(intended_time, now = Time.now)
  if intended_time.is_a?(Time)
    raise ArgumentError.new('schedule time must be in the future') if intended_time <= now
    intended_time
  else
    raise ArgumentError.new('seconds must be greater than zero') if intended_time.to_f < 0.0
    now + intended_time
  end
end

Instance Method Details

#post(intended_time, *args) { ... } ⇒ Boolean

Post a task to be execute at the specified time. The given time may be either a ‘Time` object or the number of seconds to wait. If the intended execution time is within 1/100th of a second of the current time the task will be immediately post to the executor.

Parameters:

  • intended_time (Object)

    the time to schedule the task for execution

Yields:

  • the task to be performed

Returns:

  • (Boolean)

    true if the message is post, false after shutdown

Raises:

  • (ArgumentError)

    if the intended execution time is not in the future

  • (ArgumentError)

    if no block is given



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/concurrent/executor/timer_set.rb', line 45

def post(intended_time, *args, &task)
  time = TimerSet.calculate_schedule_time(intended_time).to_f
  raise ArgumentError.new('no block given') unless block_given?

  mutex.synchronize do
    return false unless running?

    if (time - Time.now.to_f) <= 0.01
      @task_executor.post(*args, &task)
    else
      @queue.push(Task.new(time, args, task))
      @timer_executor.post(&method(:process_tasks))
    end

    true
  end

end