Class: ThreadJob::Memory::Store

Inherits:
JobStore
  • Object
show all
Defined in:
lib/thread_job/backends/memory/store.rb

Instance Method Summary collapse

Constructor Details

#initialize(max_retries = 10, logger = Logger.new(STDOUT)) ⇒ Store

Returns a new instance of Store.



15
16
17
18
19
20
21
# File 'lib/thread_job/backends/memory/store.rb', line 15

def initialize(max_retries=10, logger=Logger.new(STDOUT))
  @jobs = {}
  @failed_jobs = {}
  @mutex = Mutex.new
  @logger = logger
  @max_retries = max_retries
end

Instance Method Details

#complete_job(queue_name, job_id) ⇒ Object



75
76
77
78
79
80
81
82
83
# File 'lib/thread_job/backends/memory/store.rb', line 75

def complete_job(queue_name, job_id)
  @mutex.synchronize {
    job = get_job(queue_name, job_id)
    if job
      @jobs[queue_name].delete(job)
      @logger.info("[MemoryStore] job: '#{job.job_name}' has been completed and removed from the queue")
    end
  }
end

#fail_job(queue_name, job_id) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/thread_job/backends/memory/store.rb', line 85

def fail_job(queue_name, job_id)
  @mutex.synchronize {
    job = get_job(queue_name, job_id)
    if job
      job.status = FAILED
      job.attempts += 1

      if job.attempts == @max_retries
        @failed_jobs[queue_name].push(job)
        @jobs[queue_name].delete(job)
        @logger.warn("[MemoryStore] job: '#{job.job_name}' has failed the reached the maximum amount of retries (#{@max_retries}) and is being removed from the queue.")
      else
        @logger.info("[MemoryStore] failed job: '#{job.job_name}' has been requeued and attempted #{job.attempts} times")
      end
    end
  }
end

#get_job(queue_name, job_id) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/thread_job/backends/memory/store.rb', line 59

def get_job(queue_name, job_id)
  found_job = false
  if @jobs[queue_name] != nil
    @jobs[queue_name].each do |job|
      if job.id == job_id
        found_job = true
        return job
      end
    end
  end

  @logger.warn("[MemoryStore] unable to get job: #{job_id} from queue: #{queue_name}")

  return nil
end

#poll_for_job(queue_name) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/thread_job/backends/memory/store.rb', line 42

def poll_for_job(queue_name)
  @jobs[queue_name] ||= []
  @logger.debug("[MemoryStore] Polling for jobs, #{@jobs[queue_name].length} in the queue")

  @mutex.synchronize {
    @jobs[queue_name].each do |record|
      if record.status == AVAILABLE || (record.status == FAILED && record.attempts < @max_retries)
        record.status = WORKING
        @logger.debug("[MemoryStore] Sending job '#{record.job_name}' to the thread pool for work")
        return {id: record.id, job: record.job, job_name: record.job_name}
      end
    end
  }

  return nil
end

#save_job(queue_name, job_name, job) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/thread_job/backends/memory/store.rb', line 23

def save_job(queue_name, job_name, job)
  @mutex.synchronize {
    queued_jobs = @jobs[queue_name] ||= []
    failed_queue_jobs = @failed_jobs[queue_name] ||= []

    rec = Memory::Record.new
    rec.attempts = 0
    rec.id = queued_jobs.count + 1
    rec.job_name = job_name
    rec.job = job
    rec.status = AVAILABLE
    rec.queue_name = queue_name

    queued_jobs.push(rec)
  }

  @logger.info("[MemoryStore] Saved #{job_name}")
end