Module: Chasqui

Extended by:
Forwardable
Defined in:
lib/chasqui.rb,
lib/chasqui/config.rb,
lib/chasqui/version.rb,
lib/chasqui/subscriber.rb,
lib/chasqui/subscription.rb,
lib/chasqui/workers/worker.rb,
lib/chasqui/workers/resque_worker.rb,
lib/chasqui/workers/sidekiq_worker.rb

Defined Under Namespace

Classes: Broker, CLI, Config, ConfigurationError, MultiBroker, ResqueWorker, SidekiqWorker, Subscriber, Subscription, Worker

Constant Summary collapse

Defaults =
{
  inbox_queue: 'inbox',
  redis_namespace: 'chasqui',
  publish_channel: '__default',
  broker_poll_interval: 3
}.freeze
CONFIG_SETTINGS =
[
  :broker_poll_interval,
  :channel,
  :inbox_queue,
  :logger,
  :redis,
  :worker_backend
]
VERSION =
"0.9.3"
HandlerAlreadyRegistered =
Class.new StandardError

Class Method Summary collapse

Class Method Details

.configObject



27
28
29
# File 'lib/chasqui.rb', line 27

def config
  @config ||= Config.new
end

.configure {|@config| ... } ⇒ Object

Yields:



22
23
24
25
# File 'lib/chasqui.rb', line 22

def configure(&block)
  @config ||= Config.new
  yield @config
end

.publish(event, *args) ⇒ Object



31
32
33
# File 'lib/chasqui.rb', line 31

def publish(event, *args)
  redis.lpush inbox_queue, build_payload(event, *args).to_json
end

.subscribe(options = {}, &block) ⇒ Object



35
36
37
38
39
40
41
42
43
# File 'lib/chasqui.rb', line 35

def subscribe(options={}, &block)
  queue = options.fetch :queue
  channel = options.fetch :channel, config.channel

  create_subscription(queue, channel).tap do |subscription|
    subscription.subscriber.evaluate(&block) if block_given?
    redis.sadd subscription_key(channel), subscription.subscription_id
  end
end

.subscriber_class_name(queue) ⇒ Object



59
60
61
62
# File 'lib/chasqui.rb', line 59

def subscriber_class_name(queue)
  queue_name_constant = queue.split(':').last.gsub(/[^\w]/, '_')
  "Subscriber__#{queue_name_constant}".to_sym
end

.subscription(queue) ⇒ Object



55
56
57
# File 'lib/chasqui.rb', line 55

def subscription(queue)
  subscriptions[queue.to_s]
end

.subscription_key(channel) ⇒ Object



64
65
66
# File 'lib/chasqui.rb', line 64

def subscription_key(channel)
  "subscriptions:#{channel}"
end

.unsubscribe(channel, options = {}, &block) ⇒ Object



45
46
47
48
49
50
51
52
53
# File 'lib/chasqui.rb', line 45

def unsubscribe(channel, options={}, &block)
  queue = options.fetch :queue
  subscription = subscriptions[queue.to_s]

  if subscription
    redis.srem subscription_key(channel), subscription.subscription_id
    subscription.subscription_id
  end
end