Class: Exekutor::Internal::Provider

Inherits:
Object
  • Object
show all
Includes:
Callbacks, Executable, Logger
Defined in:
lib/exekutor/internal/provider.rb

Overview

Reserves jobs and provides them to an executor

Constant Summary

Constants included from Executable

Executable::STATES

Instance Method Summary collapse

Methods included from Executable

#consecutive_errors, #restart_delay, #running?, #state

Methods included from Callbacks

#add_callback

Constructor Details

#initialize(reserver:, executor:, pool:, polling_interval: 60.seconds, interval_jitter: polling_interval.to_i > 1 ? polling_interval * 0.1 : 0) ⇒ Provider

Creates a new provider

Parameters:

  • reserver (Reserver)

    the job reserver

  • executor (Executor)

    the job executor

  • pool (ThreadPoolExecutor)

    the thread pool to use

  • polling_interval (ActiveSupport::Duration) (defaults to: 60.seconds)

    the polling interval

  • interval_jitter (Float) (defaults to: polling_interval.to_i > 1 ? polling_interval * 0.1 : 0)

    the polling interval jitter



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/exekutor/internal/provider.rb', line 30

def initialize(reserver:, executor:, pool:, polling_interval: 60.seconds,
               interval_jitter: polling_interval.to_i > 1 ? polling_interval * 0.1 : 0)
  super()
  @reserver = reserver
  @executor = executor
  @pool = pool

  @polling_interval = polling_interval.to_i.freeze
  @interval_jitter = interval_jitter.to_f.freeze

  @event = Concurrent::Event.new
  @thread_running = Concurrent::AtomicBoolean.new false

  @next_job_scheduled_at = Concurrent::AtomicReference.new UNKNOWN
  @next_poll_at = Concurrent::AtomicReference.new nil
end

Instance Method Details

#pollObject

Makes the provider poll for jobs

Raises:



65
66
67
68
69
70
# File 'lib/exekutor/internal/provider.rb', line 65

def poll
  raise Exekutor::Error, "Provider is not running" unless running?

  @next_poll_at.set Time.now.to_f
  @event.set
end

#startObject

Starts the provider.



48
49
50
51
52
53
54
55
56
# File 'lib/exekutor/internal/provider.rb', line 48

def start
  return false unless compare_and_set_state :pending, :started

  # Always poll at startup to fill up threads, use small jitter so workers started at the same time dont hit
  # the db at the same time
  @next_poll_at.set (1 + (2 * Kernel.rand)).seconds.from_now.to_f
  start_thread
  true
end

#stopObject

Stops the provider



59
60
61
62
# File 'lib/exekutor/internal/provider.rb', line 59

def stop
  self.state = :stopped
  @event.set
end

#update_earliest_scheduled_at(scheduled_at = UNKNOWN) ⇒ Float?

Updates the timestamp for when the next job is scheduled. Gets the earliest scheduled_at from the DB if no argument is given. Updates the timestamp for the earliest job is a timestamp is given and that timestamp is before the known timestamp. Does nothing if a timestamp is given and the earliest job timestamp is not known.

Parameters:

  • scheduled_at (Time, Numeric) (defaults to: UNKNOWN)

    the time a job is scheduled at

Returns:

  • (Float, nil)

    the timestamp for the next job, or nil if the timestamp is unknown or no jobs are pending



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/exekutor/internal/provider.rb', line 77

def update_earliest_scheduled_at(scheduled_at = UNKNOWN)
  scheduled_at = scheduled_at.to_f if scheduled_at.is_a? Time
  unless scheduled_at == UNKNOWN || scheduled_at.is_a?(Numeric)
    raise ArgumentError, "scheduled_at must be a Time or Numeric"
  end

  changed = false
  earliest_scheduled_at = @next_job_scheduled_at.update do |current|
    earliest = ScheduledAtComparer.determine_earliest(current, scheduled_at, @reserver)
    changed = earliest != current
    earliest
  end

  if earliest_scheduled_at == UNKNOWN
    nil
  else
    @event.set if changed && earliest_scheduled_at.present?
    earliest_scheduled_at
  end
end