Class: QC::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/queue_classic/queue.rb

Overview

The queue class maps a queue abstraction onto a database table.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, top_bound = nil) ⇒ Queue

Returns a new instance of Queue.



10
11
12
13
# File 'lib/queue_classic/queue.rb', line 10

def initialize(name, top_bound=nil)
  @name = name
  @top_bound = top_bound || QC::TOP_BOUND
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



9
10
11
# File 'lib/queue_classic/queue.rb', line 9

def name
  @name
end

#top_boundObject (readonly)

Returns the value of attribute top_bound.



9
10
11
# File 'lib/queue_classic/queue.rb', line 9

def top_bound
  @top_bound
end

Instance Method Details

#conn_adapterObject



19
20
21
# File 'lib/queue_classic/queue.rb', line 19

def conn_adapter
  @adapter ||= QC.default_conn_adapter
end

#conn_adapter=(a) ⇒ Object



15
16
17
# File 'lib/queue_classic/queue.rb', line 15

def conn_adapter=(a)
  @adapter = a
end

#countObject



83
84
85
86
87
88
89
# File 'lib/queue_classic/queue.rb', line 83

def count
  QC.log_yield(:measure => 'queue.count') do
    s = "SELECT COUNT(*) FROM #{TABLE_NAME} WHERE q_name = $1"
    r = conn_adapter.execute(s, name)
    r["count"].to_i
  end
end

#delete(id) ⇒ Object



70
71
72
73
74
# File 'lib/queue_classic/queue.rb', line 70

def delete(id)
  QC.log_yield(:measure => 'queue.delete') do
    conn_adapter.execute("DELETE FROM #{TABLE_NAME} where id = $1", id)
  end
end

#delete_allObject



76
77
78
79
80
81
# File 'lib/queue_classic/queue.rb', line 76

def delete_all
  QC.log_yield(:measure => 'queue.delete_all') do
    s = "DELETE FROM #{TABLE_NAME} WHERE q_name = $1"
    conn_adapter.execute(s, name)
  end
end

#enqueue(method, *args) ⇒ Object

enqueue(m,a) inserts a row into the jobs table and trigger a notification. The job’s queue is represented by a name column in the row. There is a trigger on the table which will send a NOTIFY event on a channel which corresponds to the name of the queue. The method argument is a string encoded ruby expression. The expression will be separated by a ‘.` character and then `eval`d. Examples of the method argument include: `puts`, `Kernel.puts`, `MyObject.new.puts`. The args argument will be encoded as JSON and stored as a JSON datatype in the row. (If the version of PG does not support JSON, then the args will be stored as text. The args are stored as a collection and then splatted inside the worker. Examples of args include: `’hello world’‘, `[’hello world’]‘, `’hello’, ‘world’‘.



37
38
39
40
41
42
# File 'lib/queue_classic/queue.rb', line 37

def enqueue(method, *args)
  QC.log_yield(:measure => 'queue.enqueue') do
    s="INSERT INTO #{TABLE_NAME} (q_name, method, args) VALUES ($1, $2, $3)"
    res = conn_adapter.execute(s, name, method, JSON.dump(args))
  end
end

#lockObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/queue_classic/queue.rb', line 44

def lock
  QC.log_yield(:measure => 'queue.lock') do
    s = "SELECT * FROM lock_head($1, $2)"
    if r = conn_adapter.execute(s, name, top_bound)
      {}.tap do |job|
        job[:id] = r["id"]
        job[:q_name] = r["q_name"]
        job[:method] = r["method"]
        job[:args] = JSON.parse(r["args"])
        if r["created_at"]
          job[:created_at] = Time.parse(r["created_at"])
          ttl = Integer((Time.now - job[:created_at]) * 1000)
          QC.measure("time-to-lock=#{ttl}ms source=#{name}")
        end
      end
    end
  end
end

#unlock(id) ⇒ Object



63
64
65
66
67
68
# File 'lib/queue_classic/queue.rb', line 63

def unlock(id)
  QC.log_yield(:measure => 'queue.unlock') do
    s = "UPDATE #{TABLE_NAME} set locked_at = null where id = $1"
    conn_adapter.execute(s, id)
  end
end