Module: RCelery
- Defined in:
- lib/rcelery.rb,
lib/rcelery/pool.rb,
lib/rcelery/task.rb,
lib/rcelery/rails.rb,
lib/rcelery/daemon.rb,
lib/rcelery/events.rb,
lib/rcelery/worker.rb,
lib/rcelery/railtie.rb,
lib/rcelery/version.rb,
lib/rcelery/task/runner.rb,
lib/rcelery/async_result.rb,
lib/rcelery/eager_result.rb,
lib/rcelery/task/context.rb,
lib/rcelery/task_support.rb,
lib/rcelery/configuration.rb
Defined Under Namespace
Modules: Rails, TaskSupport
Classes: AsyncResult, Configuration, Daemon, EagerResult, Events, Pool, Railtie, Task, Worker
Constant Summary
collapse
- VERSION =
'1.0.1'
Class Method Summary
collapse
Class Method Details
.channel ⇒ Object
53
54
55
|
# File 'lib/rcelery.rb', line 53
def self.channel
@channel ||= AMQP::Channel.new
end
|
.eager_mode? ⇒ Boolean
77
78
79
|
# File 'lib/rcelery.rb', line 77
def self.eager_mode?
@config.eager_mode if @config
end
|
.exchanges ⇒ Object
73
74
75
|
# File 'lib/rcelery.rb', line 73
def self.exchanges
@exchanges
end
|
.publish(exchange, message, options = {}) ⇒ Object
81
82
83
84
85
|
# File 'lib/rcelery.rb', line 81
def self.publish(exchange, message, options = {})
options[:routing_key] ||= queue_name
options[:content_type] = 'application/json'
exchanges[exchange].publish(message.to_json, options)
end
|
.queue ⇒ Object
69
70
71
|
# File 'lib/rcelery.rb', line 69
def self.queue
@queue
end
|
.queue_name ⇒ Object
61
62
63
|
# File 'lib/rcelery.rb', line 61
def self.queue_name
"rcelery.#{@application}"
end
|
.running? ⇒ Boolean
65
66
67
|
# File 'lib/rcelery.rb', line 65
def self.running?
@running
end
|
.start(config = {}) ⇒ Object
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/rcelery.rb', line 17
def self.start(config = {})
config = Configuration.new(config) if config.is_a?(Hash)
@config = config
@application = config.application
unless eager_mode?
if AMQP.connection.nil? || !AMQP.connection.connected?
@thread = Thread.new { AMQP.start(config.to_hash) }
end
channel = RCelery.channel
@exchanges = {
:request => channel.direct('celery', :durable => true),
:result => channel.direct('celeryresults', :durable => true, :auto_delete => true),
:event => channel.topic('celeryev', :durable => true)
}
@queue = channel.queue(RCelery.queue_name, :durable => true).bind(
exchanges[:request], :routing_key => RCelery.queue_name)
end
@running = true
self
end
|
.stop ⇒ Object
43
44
45
46
47
48
49
50
51
|
# File 'lib/rcelery.rb', line 43
def self.stop
AMQP.stop { EM.stop } unless eager_mode?
@channel = nil
@running = false
@queue = nil
@exchanges = nil
@thread.kill unless eager_mode?
@thread = nil
end
|
.thread ⇒ Object
57
58
59
|
# File 'lib/rcelery.rb', line 57
def self.thread
@thread
end
|