Module: Minion

Extended by:
Minion
Included in:
Minion
Defined in:
lib/minion.rb,
lib/minion/handler.rb

Defined Under Namespace

Classes: Handler

Instance Method Summary collapse

Instance Method Details

#amqp_urlObject



110
111
112
# File 'lib/minion.rb', line 110

def amqp_url
  @@amqp_url ||= ENV["AMQP_URL"] || "amqp://guest:guest@localhost/"
end

#amqp_url=(url) ⇒ Object



114
115
116
# File 'lib/minion.rb', line 114

def amqp_url=(url)
  @@amqp_url = url
end

#check_allObject



84
85
86
# File 'lib/minion.rb', line 84

def check_all
  @@handlers.each { |h| h.check }
end

#decode_json(string) ⇒ Object



76
77
78
79
80
81
82
# File 'lib/minion.rb', line 76

def decode_json(string)
  if defined? ActiveSupport::JSON
    ActiveSupport::JSON.decode string
  else
    JSON.load string
  end
end

#enqueue(jobs, data = {}) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/minion.rb', line 14

def enqueue(jobs, data = {})
  raise "cannot enqueue a nil job" if jobs.nil?
  raise "cannot enqueue an empty job" if jobs.empty?

  ## jobs can be one or more jobs
  if jobs.respond_to? :shift
    queue = jobs.shift
    data["next_job"] = jobs unless jobs.empty?
  else
    queue = jobs
  end

  log "send: #{queue}:#{data.to_json}"
  bunny.queue(queue, :durable => true, :auto_delete => false).publish(data.to_json)
end

#error(&blk) ⇒ Object



35
36
37
# File 'lib/minion.rb', line 35

def error(&blk)
  @@error_handler = blk
end

#job(queue, options = {}, &blk) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/minion.rb', line 43

def job(queue, options = {}, &blk)
  handler = Minion::Handler.new queue
  handler.when = options[:when] if options[:when]
  handler.unsub = lambda do
    log "unsubscribing to #{queue}"
    MQ.queue(queue).unsubscribe
  end
  handler.sub = lambda do
    @@ack = true
    log "subscribing to #{queue}"
    MQ.queue(queue).subscribe(:ack => true) do |h,m|
      return if AMQP.closing?
      begin
        log "recv: #{queue}:#{m}"

        args = decode_json(m)

        result = yield(args)

        next_job(args, result)
      rescue Object => e
        raise unless error_handler
        error_handler.call(e,queue,m,h)
      end
      h.ack if should_ack?
      check_all
    end
  end
  @@handlers ||= []
  at_exit { Minion.run } if @@handlers.size == 0
  @@handlers << handler
end

#log(msg) ⇒ Object



30
31
32
33
# File 'lib/minion.rb', line 30

def log(msg)
  @@logger ||= proc { |m| puts "#{Time.now} :minion: #{m}" }
  @@logger.call(msg)
end

#logger(&blk) ⇒ Object



39
40
41
# File 'lib/minion.rb', line 39

def logger(&blk)
  @@logger = blk
end

#not_ackObject



92
93
94
# File 'lib/minion.rb', line 92

def not_ack()
  @@ack = false
end

#runObject



96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/minion.rb', line 96

def run
  log "Starting minion"

  Signal.trap('INT') { AMQP.stop{ EM.stop } }
  Signal.trap('TERM'){ AMQP.stop{ EM.stop } }

  EM.run do
    AMQP.start(amqp_config) do
      MQ.prefetch(1)
      check_all
    end
  end
end

#should_ack?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/minion.rb', line 88

def should_ack?
  @@ack
end

#url=(url) ⇒ Object



10
11
12
# File 'lib/minion.rb', line 10

def url=(url)
  @@config_url = url
end