Module: QC
- Defined in:
- lib/queue_classic.rb,
lib/queue_classic/queue.rb,
lib/queue_classic/setup.rb,
lib/queue_classic/worker.rb,
lib/queue_classic/railtie.rb,
lib/queue_classic/conn_adapter.rb,
lib/generators/queue_classic/install_generator.rb
Defined Under Namespace
Modules: Setup Classes: ConnAdapter, InstallGenerator, Queue, Railtie, Worker
Constant Summary collapse
- APP_NAME =
You can use the APP_NAME to query for postgres related process information in the pg_stat_activity table.
ENV["QC_APP_NAME"] || "queue_classic"
- WAIT_TIME =
Number of seconds to block on the listen chanel for new jobs.
(ENV["QC_LISTEN_TIME"] || 5).to_i
- TABLE_NAME =
Why do you want to change the table name? Just deal with the default OK? If you do want to change this, you will need to update the PL/pgSQL lock_head() function. Come on. Don’t do it.… Just stick with the default.
"queue_classic_jobs"
- QUEUE =
Each row in the table will have a column that notes the queue. You can point your workers at different queues but only one at a time.
ENV["QUEUE"] || "default"
- QUEUES =
(ENV["QUEUES"] && ENV["QUEUES"].split(",")) || []
- TOP_BOUND =
Set this to 1 for strict FIFO. There is nothing special about 9.…
(ENV["QC_TOP_BOUND"] || 9).to_i
- FORK_WORKER =
Set this variable if you wish for the worker to fork a UNIX process for each locked job. Remember to re-establish any database connections. See the worker for more details.
!ENV["QC_FORK_WORKER"].nil?
Class Method Summary collapse
- .default_conn_adapter ⇒ Object
- .default_conn_adapter=(conn) ⇒ Object
- .default_queue ⇒ Object
- .default_queue=(queue) ⇒ Object
- .has_connection? ⇒ Boolean
- .log(data) ⇒ Object
- .log_yield(data) ⇒ Object
- .measure(data) ⇒ Object
-
.method_missing(sym, *args, &block) ⇒ Object
Defer method calls on the QC module to the default queue.
-
.respond_to_missing?(method_name, include_private = false) ⇒ Boolean
Ensure QC.respond_to?(:enqueue) equals true (ruby 1.9 only).
-
.unlock_jobs_of_dead_workers ⇒ Object
This will unlock all jobs any postgres’ PID that is not existing anymore to prevent any infinitely locked jobs.
Class Method Details
.default_conn_adapter ⇒ Object
59 60 61 |
# File 'lib/queue_classic.rb', line 59 def self.default_conn_adapter @conn_adapter ||= ConnAdapter.new end |
.default_conn_adapter=(conn) ⇒ Object
63 64 65 |
# File 'lib/queue_classic.rb', line 63 def self.default_conn_adapter=(conn) @conn_adapter = conn end |
.default_queue ⇒ Object
49 50 51 52 53 |
# File 'lib/queue_classic.rb', line 49 def self.default_queue @default_queue ||= begin Queue.new(QUEUE) end end |
.default_queue=(queue) ⇒ Object
45 46 47 |
# File 'lib/queue_classic.rb', line 45 def self.default_queue=(queue) @default_queue = queue end |
.has_connection? ⇒ Boolean
55 56 57 |
# File 'lib/queue_classic.rb', line 55 def self.has_connection? !@conn_adapter.nil? end |
.log(data) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/queue_classic.rb', line 80 def self.log(data) result = nil data = {:lib => "queue-classic"}.merge(data) if block_given? start = Time.now result = yield data.merge(:elapsed => Integer((Time.now - t0)*1000)) end data.reduce(out=String.new) do |s, tup| s << [tup.first, tup.last].join("=") << " " end puts(out) if ENV["DEBUG"] return result end |
.log_yield(data) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/queue_classic.rb', line 67 def self.log_yield(data) begin t0 = Time.now yield rescue => e log({:at => "error", :error => e.inspect}.merge(data)) raise ensure t = Integer((Time.now - t0)*1000) log(data.merge(:elapsed => t)) unless e end end |
.measure(data) ⇒ Object
95 96 97 98 99 |
# File 'lib/queue_classic.rb', line 95 def self.measure(data) if ENV['QC_MEASURE'] $stdout.puts("measure#qc.#{data}") end end |
.method_missing(sym, *args, &block) ⇒ Object
Defer method calls on the QC module to the default queue. This facilitates QC.enqueue()
36 37 38 |
# File 'lib/queue_classic.rb', line 36 def self.method_missing(sym, *args, &block) default_queue.send(sym, *args, &block) end |
.respond_to_missing?(method_name, include_private = false) ⇒ Boolean
Ensure QC.respond_to?(:enqueue) equals true (ruby 1.9 only)
41 42 43 |
# File 'lib/queue_classic.rb', line 41 def self.respond_to_missing?(method_name, include_private = false) default_queue.respond_to?(method_name) end |
.unlock_jobs_of_dead_workers ⇒ Object
This will unlock all jobs any postgres’ PID that is not existing anymore to prevent any infinitely locked jobs
103 104 105 |
# File 'lib/queue_classic.rb', line 103 def self.unlock_jobs_of_dead_workers @conn_adapter.execute("UPDATE #{QC::TABLE_NAME} SET locked_at = NULL, locked_by = NULL WHERE locked_by NOT IN (SELECT pid FROM pg_stat_activity);") end |