Module: RCelery

Defined in:
lib/rcelery.rb,
lib/rcelery/pool.rb,
lib/rcelery/task.rb,
lib/rcelery/rails.rb,
lib/rcelery/daemon.rb,
lib/rcelery/events.rb,
lib/rcelery/worker.rb,
lib/rcelery/railtie.rb,
lib/rcelery/version.rb,
lib/rcelery/task/runner.rb,
lib/rcelery/async_result.rb,
lib/rcelery/eager_result.rb,
lib/rcelery/task/context.rb,
lib/rcelery/task_support.rb,
lib/rcelery/configuration.rb

Defined Under Namespace

Modules: Rails, TaskSupport Classes: AsyncResult, Configuration, Daemon, EagerResult, Events, Pool, Railtie, Task, Worker

Constant Summary collapse

VERSION =
'1.0.1'

Class Method Summary collapse

Class Method Details

.channelObject



53
54
55
# File 'lib/rcelery.rb', line 53

def self.channel
  @channel ||= AMQP::Channel.new
end

.eager_mode?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/rcelery.rb', line 77

def self.eager_mode?
  @config.eager_mode if @config
end

.exchangesObject



73
74
75
# File 'lib/rcelery.rb', line 73

def self.exchanges
  @exchanges
end

.publish(exchange, message, options = {}) ⇒ Object



81
82
83
84
85
# File 'lib/rcelery.rb', line 81

def self.publish(exchange, message, options = {})
  options[:routing_key] ||= queue_name
  options[:content_type] = 'application/json'
  exchanges[exchange].publish(message.to_json, options)
end

.queueObject



69
70
71
# File 'lib/rcelery.rb', line 69

def self.queue
  @queue
end

.queue_nameObject



61
62
63
# File 'lib/rcelery.rb', line 61

def self.queue_name
  "rcelery.#{@application}"
end

.running?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/rcelery.rb', line 65

def self.running?
  @running
end

.start(config = {}) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/rcelery.rb', line 17

def self.start(config = {})
  config = Configuration.new(config) if config.is_a?(Hash)
  @config = config

  @application = config.application

  unless eager_mode?
    if AMQP.connection.nil? || !AMQP.connection.connected?
      @thread = Thread.new { AMQP.start(config.to_hash) }
    end

    channel = RCelery.channel
    @exchanges = {
      :request => channel.direct('celery', :durable => true),
      :result => channel.direct('celeryresults', :durable => true, :auto_delete => true),
      :event => channel.topic('celeryev', :durable => true)
    }
    @queue  = channel.queue(RCelery.queue_name, :durable => true).bind(
      exchanges[:request], :routing_key => RCelery.queue_name)
  end

  @running = true

  self
end

.stopObject



43
44
45
46
47
48
49
50
51
# File 'lib/rcelery.rb', line 43

def self.stop
  AMQP.stop { EM.stop } unless eager_mode?
  @channel = nil
  @running = false
  @queue = nil
  @exchanges = nil
  @thread.kill unless eager_mode?
  @thread = nil
end

.threadObject



57
58
59
# File 'lib/rcelery.rb', line 57

def self.thread
  @thread
end