Module: Goat::NotificationCenter
- Defined in:
- lib/goat/notifications.rb
Defined Under Namespace
Classes: Receiver
Class Method Summary collapse
- .configure(opts = {}) ⇒ Object
- .load_notif(notif) ⇒ Object
- .notif_to_json(notif) ⇒ Object
- .notify(notif) ⇒ Object
- .process_notification(notif) ⇒ Object
- .receive(line) ⇒ Object
- .received_notifications ⇒ Object
- .schedule ⇒ Object
- .set_defaults ⇒ Object
- .start_receiver ⇒ Object
- .subscribe(obj, meth, sig) ⇒ Object
- .subscribers ⇒ Object
- .wants_notification(sub, notif) ⇒ Object
Class Method Details
.configure(opts = {}) ⇒ Object
31 32 33 34 35 |
# File 'lib/goat/notifications.rb', line 31 def self.configure(opts={}) @host = opts[:host] @recv_port = opts[:recv_port] @send_port = opts[:send_port] end |
.load_notif(notif) ⇒ Object
57 58 59 |
# File 'lib/goat/notifications.rb', line 57 def self.load_notif(notif) JSON.load(notif) end |
.notif_to_json(notif) ⇒ Object
61 62 63 |
# File 'lib/goat/notifications.rb', line 61 def self.notif_to_json(notif) notif.to_json end |
.notify(notif) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/goat/notifications.rb', line 98 def self.notify(notif) set_defaults # maybe #schedule or #configure never got called if Dynamic.variable?(:txn) notif['txn'] = Dynamic[:txn] notif['txn_pgid'] = Dynamic[:txn_pgid] end nid = String.random notif['_nid'] = nid process_notification(notif) # ensure local delivery happens first received_notifications << nid if EM.reactor_running? EM.connect(@host, @send_port) do |c| # TODO: alert if this fails c.send_data(notif_to_json(notif) + "\n") c.close_connection(true) end else s = TCPSocket.open(@host, @send_port) s.write(notif_to_json(notif) + "\n") s.close end rescue SocketError => e $stderr.puts "Couldn't notify host #{@host.inspect}:#{@send_port.inspect}" raise e end |
.process_notification(notif) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/goat/notifications.rb', line 80 def self.process_notification(notif) if(notif['txn']) bind = {:txn => notif['txn'], :txn_pgid => notif['txn_pgid']} else bind = {} end Dynamic.let(bind) do subscribers.each do |sub| if wants_notification(sub, notif) $stderr.puts "Dispatching to #{sub[:delegate]}" meth = sub[:meth] sub[:delegate].send(meth, notif) end end end end |
.receive(line) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/goat/notifications.rb', line 67 def self.receive(line) notif = load_notif(line) nid = notif['_nid'] Goat.logd "received notif #{notif.inspect}" if $verbose raise "No _nid" unless nid unless received_notifications.include?(nid) received_notifications << nid process_notification(notif) end end |
.received_notifications ⇒ Object
65 |
# File 'lib/goat/notifications.rb', line 65 def self.received_notifications; @recv_notif ||= Set.new; end |
.schedule ⇒ Object
43 44 45 |
# File 'lib/goat/notifications.rb', line 43 def self.schedule EM.next_tick { set_defaults; self.start_receiver } end |
.set_defaults ⇒ Object
37 38 39 40 41 |
# File 'lib/goat/notifications.rb', line 37 def self.set_defaults @host ||= '127.0.0.1' @recv_port ||= 8000 @send_port ||= 8001 end |
.start_receiver ⇒ Object
27 28 29 |
# File 'lib/goat/notifications.rb', line 27 def self.start_receiver Receiver.start(@host, @recv_port) end |
.subscribe(obj, meth, sig) ⇒ Object
128 129 130 |
# File 'lib/goat/notifications.rb', line 128 def self.subscribe(obj, meth, sig) subscribers << {:sig => sig, :meth => meth, :delegate => obj} end |
.subscribers ⇒ Object
47 |
# File 'lib/goat/notifications.rb', line 47 def self.subscribers; @subscribers ||= Set.new; end |
.wants_notification(sub, notif) ⇒ Object
49 50 51 52 53 54 55 |
# File 'lib/goat/notifications.rb', line 49 def self.wants_notification(sub, notif) wants = true sub[:sig].each do |k, v| wants = false unless notif[k] == v end wants end |