Class: SimpleWorker::TaskQueue
- Inherits:
-
Object
- Object
- SimpleWorker::TaskQueue
- Includes:
- RedisSupport
- Defined in:
- lib/simpleworker/task_queue.rb
Overview
TaskQueue.new(redis, hostname, jobid, opts)
where hostname is the machines hostname or a unique identifier and ‘opts’ is a Hash of options:
:namespace => String prefix to keys in redis used by SimpleWorker (default: simpleworker)
:task_timeout => Fixnum time after which a task expires, this should be > timout set in Runner (default: 10 seconds)
Constant Summary collapse
- DEFAULT_OPTIONS =
{ :namespace => 'simpleworker' }
Instance Attribute Summary
Attributes included from RedisSupport
Instance Method Summary collapse
- #each_task ⇒ Object
- #expire_current_task ⇒ Object
- #fire_log_message(msg) ⇒ Object
- #fire_start ⇒ Object
- #fire_stop ⇒ Object
- #fire_task_start(task) ⇒ Object
- #fire_task_stop(task) ⇒ Object
-
#initialize(redis, hostname, jobid, opts = {}) ⇒ TaskQueue
constructor
A new instance of TaskQueue.
- #pop ⇒ Object
Constructor Details
#initialize(redis, hostname, jobid, opts = {}) ⇒ TaskQueue
Returns a new instance of TaskQueue.
18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/simpleworker/task_queue.rb', line 18 def initialize(redis, hostname, jobid, opts = {}) opts = DEFAULT_OPTIONS.dup.merge(opts) @redis = redis @namespace = opts[:namespace] @task_timeout = opts[:task_timeout] @jobid = jobid @hostname = hostname @active_key_prefix = "#{active_tasks_key}:#{@hostname}" @task_timeout = JSON.parse(@redis.get(config_key))['task_timeout'] load_lua_scripts end |
Instance Method Details
#each_task ⇒ Object
55 56 57 58 59 60 61 62 |
# File 'lib/simpleworker/task_queue.rb', line 55 def each_task until pop.nil? local_task = @current_task fire_task_start local_task yield local_task fire_task_stop local_task end end |
#expire_current_task ⇒ Object
46 47 48 49 |
# File 'lib/simpleworker/task_queue.rb', line 46 def expire_current_task @redis.del "#{@active_key_prefix}:#{@current_task}" if @current_task @current_task = nil end |
#fire_log_message(msg) ⇒ Object
51 52 53 |
# File 'lib/simpleworker/task_queue.rb', line 51 def (msg) push_to_log('on_log', @hostname, msg) end |
#fire_start ⇒ Object
30 31 32 |
# File 'lib/simpleworker/task_queue.rb', line 30 def fire_start push_to_log('on_node_start', @hostname) end |
#fire_stop ⇒ Object
34 35 36 |
# File 'lib/simpleworker/task_queue.rb', line 34 def fire_stop push_to_log('on_node_stop', @hostname) end |
#fire_task_start(task) ⇒ Object
38 39 40 |
# File 'lib/simpleworker/task_queue.rb', line 38 def fire_task_start(task) push_to_log('on_task_start', @hostname, task) end |
#fire_task_stop(task) ⇒ Object
42 43 44 |
# File 'lib/simpleworker/task_queue.rb', line 42 def fire_task_stop(task) push_to_log('on_task_stop', @hostname, task) end |
#pop ⇒ Object
64 65 66 67 68 69 70 |
# File 'lib/simpleworker/task_queue.rb', line 64 def pop @redis.srem(active_tasks_key, "#{@active_key_prefix}:#{@current_task}") if @current_task @current_task = @redis.evalsha(@reliable_queue_sha, :keys => [tasks_key, active_tasks_key], :argv => [namespace, jobid, @hostname, @task_timeout]) @current_task end |