Class: SimpleWorker::TaskQueue

Inherits:
Object
  • Object
show all
Includes:
RedisSupport
Defined in:
lib/simpleworker/task_queue.rb

Overview

TaskQueue.new(redis, hostname, jobid, opts)

where hostname is the machines hostname or a unique identifier and ‘opts’ is a Hash of options:

:namespace    => String prefix to keys in redis used by SimpleWorker (default: simpleworker)
:task_timeout => Fixnum time after which a task expires, this should be > timout set in Runner (default: 10 seconds)

Constant Summary collapse

DEFAULT_OPTIONS =
{
  :namespace    => 'simpleworker'
}

Instance Attribute Summary

Attributes included from RedisSupport

#jobid, #namespace

Instance Method Summary collapse

Constructor Details

#initialize(redis, hostname, jobid, opts = {}) ⇒ TaskQueue

Returns a new instance of TaskQueue.



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/simpleworker/task_queue.rb', line 18

def initialize(redis, hostname, jobid, opts = {})
  opts               = DEFAULT_OPTIONS.dup.merge(opts)
  @redis             = redis
  @namespace         = opts[:namespace]
  @task_timeout      = opts[:task_timeout]
  @jobid             = jobid
  @hostname          = hostname
  @active_key_prefix = "#{active_tasks_key}:#{@hostname}"
  @task_timeout      = JSON.parse(@redis.get(config_key))['task_timeout']
  load_lua_scripts
end

Instance Method Details

#each_taskObject



55
56
57
58
59
60
61
62
# File 'lib/simpleworker/task_queue.rb', line 55

def each_task
  until pop.nil?
    local_task = @current_task
    fire_task_start local_task
    yield local_task
    fire_task_stop local_task
  end
end

#expire_current_taskObject



46
47
48
49
# File 'lib/simpleworker/task_queue.rb', line 46

def expire_current_task
  @redis.del "#{@active_key_prefix}:#{@current_task}" if @current_task
  @current_task = nil
end

#fire_log_message(msg) ⇒ Object



51
52
53
# File 'lib/simpleworker/task_queue.rb', line 51

def fire_log_message(msg)
  push_to_log('on_log', @hostname, msg)
end

#fire_startObject



30
31
32
# File 'lib/simpleworker/task_queue.rb', line 30

def fire_start
  push_to_log('on_node_start', @hostname)
end

#fire_stopObject



34
35
36
# File 'lib/simpleworker/task_queue.rb', line 34

def fire_stop
  push_to_log('on_node_stop', @hostname)
end

#fire_task_start(task) ⇒ Object



38
39
40
# File 'lib/simpleworker/task_queue.rb', line 38

def fire_task_start(task)
  push_to_log('on_task_start', @hostname, task)
end

#fire_task_stop(task) ⇒ Object



42
43
44
# File 'lib/simpleworker/task_queue.rb', line 42

def fire_task_stop(task)
  push_to_log('on_task_stop', @hostname, task)
end

#popObject



64
65
66
67
68
69
70
# File 'lib/simpleworker/task_queue.rb', line 64

def pop
  @redis.srem(active_tasks_key, "#{@active_key_prefix}:#{@current_task}") if @current_task
  @current_task = @redis.evalsha(@reliable_queue_sha,
                                 :keys => [tasks_key, active_tasks_key],
                                 :argv => [namespace, jobid, @hostname, @task_timeout])
  @current_task
end