Module: PubSub
- Defined in:
- lib/pubsub/pubsub.rb
Class Method Summary collapse
Instance Method Summary collapse
- #publish(channel, *args) ⇒ Object
- #subscribe(channel, procs = nil, &block) ⇒ Object
- #unsubscribe(channel = nil, proc = nil) ⇒ Object
Class Method Details
.run ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/pubsub/pubsub.rb', line 45 def PubSub.run @@stopped = false PubSub.thread while true if @@stopped PubSub.queue.clear break end if PubSub.queue.empty? PubSub.waiting.push.each {|thread| thread.wakeup } sleep end unless PubSub.queue.empty? publish_info = PubSub.queue.shift sender = publish_info[:sender] channel = publish_info[:channel] params = publish_info[:params] logger.info "Publish ObjectID:#{sender.__id__} of class #{sender.class.name} to #{channel} with #{params.to_s}" channel.sub_before "/" do |sub_channel| PubSub.events.fire sub_channel, *params end PubSub.events.fire "*", *params end end end |
Instance Method Details
#publish(channel, *args) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/pubsub/pubsub.rb', line 31 def publish(channel, *args) mutex.synchronize do logger.info "Add to publish queue #{self.class.name} to #{channel} with #{args.to_s}" PubSub.queue.push({:sender => self, :channel => channel, :params => args}) PubSub.thread.wakeup if PubSub.thread.status == "sleep" if PubSub.queue.count > PubSub.queue_size #puts "Mutex sleep, queue length #{PubSub.queue.count}" PubSub.waiting.push Thread.current mutex.sleep end end end |
#subscribe(channel, procs = nil, &block) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/pubsub/pubsub.rb', line 6 def subscribe(channel, procs = nil, &block) mutex.synchronize do logger.info "Subscribe #{self.class.name} to #{channel}" PubSub.events.listen channel, procs, &block procs_collected = [] if procs.respond_to?(:each) && procs.respond_to?(:to_a) procs_collected += procs.to_a elsif procs procs_collected << procs end procs_collected << block if block channel_handlers[channel] ||= [] channel_handlers[channel] += procs_collected end end |
#unsubscribe(channel = nil, proc = nil) ⇒ Object
25 26 27 28 29 |
# File 'lib/pubsub/pubsub.rb', line 25 def unsubscribe(channel=nil, proc = nil) mutex.synchronize do unsubscribe_safe(channel, proc ) end end |