Module: Scheduler::Deferrable

Included in:
Defer
Defined in:
lib/scheduler/defer.rb

Instance Method Summary collapse

Instance Method Details

#async=(val) ⇒ Object

for test and sidekiq



47
48
49
# File 'lib/scheduler/defer.rb', line 47

def async=(val)
  @async = val
end

#do_all_workObject



77
78
79
# File 'lib/scheduler/defer.rb', line 77

def do_all_work
  do_work(non_block = true) while !@queue.empty?
end

#initializeObject



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/scheduler/defer.rb', line 9

def initialize
  @async = !Rails.env.test?
  @queue =
    WorkQueue::ThreadSafeWrapper.new(
      WorkQueue::FairQueue.new(500) { WorkQueue::BoundedQueue.new(10) },
    )

  @mutex = Mutex.new
  @stats_mutex = Mutex.new
  @paused = false
  @thread = nil
  @reactor = nil
  @timeout = DEFAULT_TIMEOUT
  @stats = LruRedux::ThreadSafeCache.new(STATS_CACHE_SIZE)
end

#later(desc = nil, db = RailsMultisite::ConnectionManagement.current_db, force: true, &blk) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/scheduler/defer.rb', line 51

def later(desc = nil, db = RailsMultisite::ConnectionManagement.current_db, force: true, &blk)
  @stats_mutex.synchronize do
    stats = (@stats[desc] ||= { queued: 0, finished: 0, duration: 0, errors: 0 })
    stats[:queued] += 1
  end

  if @async
    start_thread if !@thread&.alive? && !@paused
    @queue.push({ key: db, task: [db, blk, desc] }, force: force)
  else
    blk.call
  end
end

#lengthObject



29
30
31
# File 'lib/scheduler/defer.rb', line 29

def length
  @queue.size
end

#pauseObject



37
38
39
40
# File 'lib/scheduler/defer.rb', line 37

def pause
  stop!
  @paused = true
end

#resumeObject



42
43
44
# File 'lib/scheduler/defer.rb', line 42

def resume
  @paused = false
end

#statsObject



33
34
35
# File 'lib/scheduler/defer.rb', line 33

def stats
  @stats_mutex.synchronize { @stats.to_a }
end

#stop!Object



65
66
67
68
69
70
# File 'lib/scheduler/defer.rb', line 65

def stop!
  @thread.kill if @thread&.alive?
  @thread = nil
  @reactor&.stop
  @reactor = nil
end

#stopped?Boolean

test only

Returns:

  • (Boolean)


73
74
75
# File 'lib/scheduler/defer.rb', line 73

def stopped?
  !@thread&.alive?
end

#timeout=(t) ⇒ Object



25
26
27
# File 'lib/scheduler/defer.rb', line 25

def timeout=(t)
  @mutex.synchronize { @timeout = t }
end