Method: NATS#on_msg
- Defined in:
- lib/nats/client.rb
#on_msg(subject, sid, reply, msg) ⇒ Object
:nodoc:
833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 |
# File 'lib/nats/client.rb', line 833 def on_msg(subject, sid, reply, msg) #:nodoc: # Accounting - We should account for inbound even if they are not processed. @msgs_received += 1 @bytes_received += msg.bytesize if msg return unless sub = @subs[sid] # Check for auto_unsubscribe sub[:received] += 1 if sub[:max] # Client side support in case server did not receive unsubscribe return unsubscribe(sid) if (sub[:received] > sub[:max]) # cleanup here if we have hit the max.. @subs.delete(sid) if (sub[:received] == sub[:max]) end if cb = sub[:callback] case cb.arity when 0 then cb.call when 1 then cb.call(msg) when 2 then cb.call(msg, reply) else cb.call(msg, reply, subject) end end # Check for a timeout, and cancel if received >= expected if (sub[:timeout] && sub[:received] >= sub[:expected]) EM.cancel_timer(sub[:timeout]) sub[:timeout] = nil end end |