Module: Quandl::Command::Task::Threading
- Extended by:
- ActiveSupport::Concern
- Included in:
- Quandl::Command::Task
- Defined in:
- lib/quandl/command/task/threading.rb
Defined Under Namespace
Modules: ClassMethods
Instance Method Summary collapse
- #await_thread_pool_lock!(key) ⇒ Object
- #background_job(*args, &block) ⇒ Object
- #exiting? ⇒ Boolean
- #force_load_json_support ⇒ Object
- #mutex ⇒ Object
- #obtain_thread_pool_lock(key) ⇒ Object
- #release_thread_pool_lock(key) ⇒ Object
- #shutdown_thread_pool_on_sigint ⇒ Object
- #thread_pool ⇒ Object
- #thread_pool_locked?(key) ⇒ Boolean
- #thread_pool_locks ⇒ Object
- #threads ⇒ Object
- #threads? ⇒ Boolean
Instance Method Details
#await_thread_pool_lock!(key) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/quandl/command/task/threading.rb', line 60 def await_thread_pool_lock!(key) got_lock = false # try to get a lock mutex.synchronize do unless thread_pool_locked?(key) obtain_thread_pool_lock(key) got_lock = true end end # try again unless lock was obtained if !got_lock sleep(0.05) await_thread_pool_lock!(key) end end |
#background_job(*args, &block) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/quandl/command/task/threading.rb', line 35 def background_job(*args, &block) # dont execute jobs when shutdown signaled return false if exiting? # options opts = args..symbolize_keys! key = opts[:lock] # without threads process in foreground return block.call unless threads? # process with pool job = thread_pool.process do # wait for lock await_thread_pool_lock!(key) if key.present? # if this key is locked block.call # unlock release_thread_pool_lock(key) if key.present? end # onwards job end |
#exiting? ⇒ Boolean
104 105 106 |
# File 'lib/quandl/command/task/threading.rb', line 104 def exiting? threads? && thread_pool.shutdown? end |
#force_load_json_support ⇒ Object
108 109 110 |
# File 'lib/quandl/command/task/threading.rb', line 108 def force_load_json_support { force_load: true }.to_json rescue nil end |
#mutex ⇒ Object
56 57 58 |
# File 'lib/quandl/command/task/threading.rb', line 56 def mutex @mutex ||= Mutex.new end |
#obtain_thread_pool_lock(key) ⇒ Object
80 81 82 |
# File 'lib/quandl/command/task/threading.rb', line 80 def obtain_thread_pool_lock(key) thread_pool_locks[key] = true end |
#release_thread_pool_lock(key) ⇒ Object
84 85 86 |
# File 'lib/quandl/command/task/threading.rb', line 84 def release_thread_pool_lock(key) thread_pool_locks.delete(key) end |
#shutdown_thread_pool_on_sigint ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/quandl/command/task/threading.rb', line 21 def shutdown_thread_pool_on_sigint debug("starting up with #{threads} threads") return unless threads? trap('SIGINT') do debug "exit signal received" unless thread_pool.shutdown? debug "waiting for executing jobs to finish" thread_pool.shutdown end debug "exiting now" exit end end |
#thread_pool ⇒ Object
76 77 78 |
# File 'lib/quandl/command/task/threading.rb', line 76 def thread_pool @thread_pool ||= Thread.pool( threads ) end |
#thread_pool_locked?(key) ⇒ Boolean
88 89 90 |
# File 'lib/quandl/command/task/threading.rb', line 88 def thread_pool_locked?(key) thread_pool_locks[key] == true end |
#thread_pool_locks ⇒ Object
92 93 94 |
# File 'lib/quandl/command/task/threading.rb', line 92 def thread_pool_locks @thread_pool_locks ||= {} end |
#threads ⇒ Object
100 101 102 |
# File 'lib/quandl/command/task/threading.rb', line 100 def threads @threads ||= .threads.present? ? .threads.to_i : 10 end |
#threads? ⇒ Boolean
96 97 98 |
# File 'lib/quandl/command/task/threading.rb', line 96 def threads? threads > 1 end |