Module: Quandl::Command::Task::Threading

Extended by:
ActiveSupport::Concern
Included in:
Quandl::Command::Task
Defined in:
lib/quandl/command/task/threading.rb

Defined Under Namespace

Modules: ClassMethods

Instance Method Summary collapse

Instance Method Details

#await_thread_pool_lock!(key) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/quandl/command/task/threading.rb', line 60

def await_thread_pool_lock!(key)
  got_lock = false
  # try to get a lock
  mutex.synchronize do
    unless thread_pool_locked?(key)
      obtain_thread_pool_lock(key)
      got_lock = true
    end
  end
  # try again unless lock was obtained
  if !got_lock
    sleep(0.05)
    await_thread_pool_lock!(key)
  end
end

#background_job(*args, &block) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/quandl/command/task/threading.rb', line 35

def background_job(*args, &block)
  # dont execute jobs when shutdown signaled
  return false if exiting?
  # options
  opts = args.extract_options!.symbolize_keys!
  key = opts[:lock]
  # without threads process in foreground
  return block.call unless threads?
  # process with pool
  job = thread_pool.process do
    # wait for lock
    await_thread_pool_lock!(key) if key.present?
    # if this key is locked
    block.call
    # unlock
    release_thread_pool_lock(key) if key.present?
  end
  # onwards
  job
end

#exiting?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/quandl/command/task/threading.rb', line 104

def exiting?
  threads? && thread_pool.shutdown?
end

#force_load_json_supportObject



108
109
110
# File 'lib/quandl/command/task/threading.rb', line 108

def force_load_json_support
  { force_load: true }.to_json rescue nil
end

#mutexObject



56
57
58
# File 'lib/quandl/command/task/threading.rb', line 56

def mutex
  @mutex ||= Mutex.new
end

#obtain_thread_pool_lock(key) ⇒ Object



80
81
82
# File 'lib/quandl/command/task/threading.rb', line 80

def obtain_thread_pool_lock(key)
  thread_pool_locks[key] = true
end

#release_thread_pool_lock(key) ⇒ Object



84
85
86
# File 'lib/quandl/command/task/threading.rb', line 84

def release_thread_pool_lock(key)
  thread_pool_locks.delete(key)
end

#shutdown_thread_pool_on_sigintObject



21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/quandl/command/task/threading.rb', line 21

def shutdown_thread_pool_on_sigint
  debug("starting up with #{threads} threads")
  return unless threads?
  trap('SIGINT') do
    debug "exit signal received"
    unless thread_pool.shutdown?
      debug "waiting for executing jobs to finish"
      thread_pool.shutdown
    end
    debug "exiting now"
    exit
  end
end

#thread_poolObject



76
77
78
# File 'lib/quandl/command/task/threading.rb', line 76

def thread_pool
  @thread_pool ||= Thread.pool( threads )
end

#thread_pool_locked?(key) ⇒ Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/quandl/command/task/threading.rb', line 88

def thread_pool_locked?(key)
  thread_pool_locks[key] == true
end

#thread_pool_locksObject



92
93
94
# File 'lib/quandl/command/task/threading.rb', line 92

def thread_pool_locks
  @thread_pool_locks ||= {}
end

#threadsObject



100
101
102
# File 'lib/quandl/command/task/threading.rb', line 100

def threads
  @threads ||= options.threads.present? ? options.threads.to_i : 10
end

#threads?Boolean

Returns:

  • (Boolean)


96
97
98
# File 'lib/quandl/command/task/threading.rb', line 96

def threads?
  threads > 1
end