Module: Qs::Client::InstanceMethods

Defined in:
lib/qs/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#redisObject (readonly)

Returns the value of attribute redis.



28
29
30
# File 'lib/qs/client.rb', line 28

def redis
  @redis
end

#redis_connect_hashObject (readonly)

Returns the value of attribute redis_connect_hash.



28
29
30
# File 'lib/qs/client.rb', line 28

def redis_connect_hash
  @redis_connect_hash
end

Instance Method Details

#append(queue_redis_key, encoded_payload) ⇒ Object



59
60
61
# File 'lib/qs/client.rb', line 59

def append(queue_redis_key, encoded_payload)
  self.redis.connection{ |c| c.lpush(queue_redis_key, encoded_payload) }
end

#block_dequeue(*args) ⇒ Object



55
56
57
# File 'lib/qs/client.rb', line 55

def block_dequeue(*args)
  self.redis.connection{ |c| c.brpop(*args) }
end

#clear(redis_key) ⇒ Object



67
68
69
# File 'lib/qs/client.rb', line 67

def clear(redis_key)
  self.redis.connection{ |c| c.del(redis_key) }
end

#clear_subscriptions(queue) ⇒ Object



88
89
90
91
92
93
94
95
# File 'lib/qs/client.rb', line 88

def clear_subscriptions(queue)
  pattern = Qs::Event::SubscribersRedisKey.new('*')
  event_subs_keys = self.redis.connection{ |c| c.keys(pattern) }

  redis_transaction do |c|
    event_subs_keys.each{ |key| c.srem(key, queue.name) }
  end
end

#enqueue(queue, job_name, job_params = nil) ⇒ Object



34
35
36
37
38
# File 'lib/qs/client.rb', line 34

def enqueue(queue, job_name, job_params = nil)
  job = Qs::Job.new(job_name, :params => job_params)
  enqueue!(queue, job)
  job
end

#event_subscribers(event) ⇒ Object



97
98
99
# File 'lib/qs/client.rb', line 97

def event_subscribers(event)
  self.redis.connection{ |c| c.smembers(event.subscribers_redis_key) }
end

#initialize(redis_connect_hash) ⇒ Object



30
31
32
# File 'lib/qs/client.rb', line 30

def initialize(redis_connect_hash)
  @redis_connect_hash = redis_connect_hash
end

#pingObject



71
72
73
# File 'lib/qs/client.rb', line 71

def ping
  self.redis.connection{ |c| c.ping }
end

#prepend(queue_redis_key, encoded_payload) ⇒ Object



63
64
65
# File 'lib/qs/client.rb', line 63

def prepend(queue_redis_key, encoded_payload)
  self.redis.connection{ |c| c.rpush(queue_redis_key, encoded_payload) }
end

#publish(channel, name, params = nil) ⇒ Object



40
41
42
# File 'lib/qs/client.rb', line 40

def publish(channel, name, params = nil)
  publish!(channel, name, :event_params => params)
end

#publish_as(publisher, channel, name, params = nil) ⇒ Object



44
45
46
47
48
49
# File 'lib/qs/client.rb', line 44

def publish_as(publisher, channel, name, params = nil)
  publish!(channel, name, {
    :event_params    => params,
    :event_publisher => publisher,
  })
end

#push(queue_name, payload_hash) ⇒ Object

Raises:

  • (NotImplementedError)


51
52
53
# File 'lib/qs/client.rb', line 51

def push(queue_name, payload_hash)
  raise NotImplementedError
end

#sync_subscriptions(queue) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/qs/client.rb', line 75

def sync_subscriptions(queue)
  pattern = Qs::Event::SubscribersRedisKey.new('*')
  all_event_subs_keys = self.redis.connection{ |c| c.keys(pattern) }

  event_subs_keys = queue.event_route_names.map do |route_name|
    Qs::Event::SubscribersRedisKey.new(route_name)
  end
  redis_transaction do |c|
    all_event_subs_keys.each{ |key| c.srem(key, queue.name) }
    event_subs_keys.each{ |key| c.sadd(key, queue.name) }
  end
end