Class: QueueManager::Task

Inherits:
Object
  • Object
show all
Includes:
GlobalID::Identification, Util
Defined in:
lib/queue_manager/task.rb

Constant Summary collapse

MARKER =
'*'
MARKED_REGEXP =

/^*/

Regexp.new("^#{92.chr}#{MARKER}")

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

included

Constructor Details

#initialize(id, score) ⇒ Task

Returns a new instance of Task.

Parameters:

  • id (String)

    The unique identifier of the task

  • score (Fixnum)

    Timestamp of the task



83
84
85
# File 'lib/queue_manager/task.rb', line 83

def initialize(id, score)
  @id, @score = id, score
end

Instance Attribute Details

#idObject (readonly)

Instance of QueueManager::Task provides detailed information about the task and allows you to manage it. You can change a job of the task, pass additional parameters or delete the task.



77
78
79
# File 'lib/queue_manager/task.rb', line 77

def id
  @id
end

#scoreObject (readonly)

Instance of QueueManager::Task provides detailed information about the task and allows you to manage it. You can change a job of the task, pass additional parameters or delete the task.



77
78
79
# File 'lib/queue_manager/task.rb', line 77

def score
  @score
end

Class Method Details

.add(id, job:, **options) ⇒ QueueManager::Task

Add a new task to the queue

Parameters:

  • id (String)

    The unique identifier of the task

  • job (Symbol)

    Job class name

  • options (Hash)

    Hash of additional options

Returns:

Raises:

  • (ArgumentError)


23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/queue_manager/task.rb', line 23

def self.add(id, job:, **options)
  raise ArgumentError, 'Job should be present' unless job

  transaction do
    time = redis.zscore(config.queue, "#{MARKER}#{id}") || timestamp
    score = time + config.delay

    task = new(id, score)
    task.job = job
    task.options = options.to_json

    redis.multi do
      redis.zadd(config.queue, score, id)
    end

    logger.info "Add new task \"#{id}\" with job: \"#{job}\""
    return task
  end
end

.handling_queueObject

Check the queue and run tasks



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/queue_manager/task.rb', line 46

def self.handling_queue
  # Return the first element from range
  id, score = redis.zrange(config.queue, 0, 0, with_scores: true).flatten

  return false if id.blank? && score.blank?
  return false if score > timestamp

  new_score = timestamp + config.timeout

  if MARKED_REGEXP =~ id
    redis.zadd(config.queue, new_score, id)
    logger.info "Time is over for the task \"#{id}\". Updated time"
  else
    redis.zrem(config.queue, id)
    redis.zadd(config.queue, new_score, "#{MARKER}#{id}")
    logger.info "Task \"#{id}\" is taken into work"
  end

  original_id = id.tr('*', '')
  task = new(original_id, score.to_i)
  task.update_score(new_score)
  options = JSON.load(task.options).symbolize_keys

  task.job.constantize.public_send(:perform_later, task, original_id, **options)
  logger.info "Launched job: #{task.job}.perform_later(task, \"#{original_id}\", #{options})"
end

Instance Method Details

#removeBoolean Also known as: done, delete

Remove task from the queue by score

Returns:

  • (Boolean)

    True or false



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/queue_manager/task.rb', line 115

def remove
  transaction do
    marked_id = "#{MARKER}#{id}"
    redis_score = redis.zscore(config.queue, marked_id)

    return false unless score.to_i == redis_score.to_i

    redis.multi do
      clear_task
      redis.zrem(config.queue, marked_id)
    end

    logger.info "The task \"#{id}\" is removed from the queue"
  end
  true
end

#to_global_idGlobalID

Convert task to global id

Returns:

  • (GlobalID)

    Instance of GlobalID



139
140
141
# File 'lib/queue_manager/task.rb', line 139

def to_global_id
  GlobalID.create(self, app: config.identifier, score: score)
end

#update_score(value) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/queue_manager/task.rb', line 87

def update_score(value)
  transaction do
    _job, _options = job, options

    redis.multi do
      clear_task
      @score = value
      self.job = _job
      self.options = _options
    end
  end
end