Class: Magent::GenericChannel

Inherits:
Object
  • Object
show all
Includes:
Failure, Stats
Defined in:
lib/magent/generic_channel.rb

Direct Known Subclasses

AsyncChannel

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Stats

#on_job_failed, #on_job_processed, #on_quit, #on_start, #stats, #stats_collection

Methods included from Failure

#enqueue_error, #error_collection, #error_count, #errors, #failed, #remove_error, #retry_error

Constructor Details

#initialize(name) ⇒ GenericChannel

Returns a new instance of GenericChannel.



9
10
11
# File 'lib/magent/generic_channel.rb', line 9

def initialize(name)
  @name = "magent.#{name}"
end

Instance Attribute Details

#current_jobObject (readonly)

Returns the value of attribute current_job.



7
8
9
# File 'lib/magent/generic_channel.rb', line 7

def current_job
  @current_job
end

#nameObject (readonly)

Returns the value of attribute name.



6
7
8
# File 'lib/magent/generic_channel.rb', line 6

def name
  @name
end

Instance Method Details

#collectionObject



36
37
38
# File 'lib/magent/generic_channel.rb', line 36

def collection
  @collection ||= Magent.database.collection(@name)
end

#dequeueObject



25
26
27
28
29
# File 'lib/magent/generic_channel.rb', line 25

def dequeue
  if @current_job = self.next_message
    @current_job["message"]
  end
end

#enqueue(message, priority = 3) ⇒ Object



13
14
15
# File 'lib/magent/generic_channel.rb', line 13

def enqueue(message, priority = 3)
  collection.save({:_id => generate_uid, :message => message, :priority => priority, :created_at => Time.now.to_i, :retries => 0})
end

#message_countObject



17
18
19
# File 'lib/magent/generic_channel.rb', line 17

def message_count
  collection.count # TODO: number of processed messages (create a collection for stats)
end

#next_messageObject



31
32
33
34
# File 'lib/magent/generic_channel.rb', line 31

def next_message
  collection.find_and_modify(:sort => [[:priority, Mongo::ASCENDING], [:created_at, Mongo::DESCENDING]],
                             :remove => true) rescue {}
end

#queue_countObject



21
22
23
# File 'lib/magent/generic_channel.rb', line 21

def queue_count
  collection.count
end

#retry_current_jobObject



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/magent/generic_channel.rb', line 40

def retry_current_job
  return false if !@current_job

  @current_job['retries'] ||= 0
  if @current_job['retries'] < 20
    @current_job['retries'] += 1
    collection.save(@current_job)
    true
  else
    false
  end
end