Class: EventShipper::UDP
- Inherits:
-
Object
- Object
- EventShipper::UDP
- Defined in:
- lib/event_shipper/udp.rb
Instance Attribute Summary collapse
-
#stats ⇒ Object
readonly
Returns the value of attribute stats.
Instance Method Summary collapse
- #close ⇒ Object
- #decode(obj) ⇒ Object
-
#dispatch ⇒ Object
Enters a loop, receiving messages, yielding them to the block.
- #encode(obj) ⇒ Object
-
#handle_message(string) ⇒ Object
Handles a single message; decoding it and passing it to the caller.
-
#initialize(host, port) ⇒ UDP
constructor
A new instance of UDP.
-
#send(hash, queue = 'queue') ⇒ Object
Sends messages via UDP to a central proxy.
- #wrap(filter) ⇒ Object
Constructor Details
#initialize(host, port) ⇒ UDP
Returns a new instance of UDP.
10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/event_shipper/udp.rb', line 10 def initialize host, port @host, @port = host, port @socket = UDPSocket.new # The Bson-Filter acts as a terminator for hash based messages and # turns things into BSON for the wire. @filters = [] wrap Filter::Transmission.new @stats = Stats.new end |
Instance Attribute Details
#stats ⇒ Object (readonly)
Returns the value of attribute stats.
8 9 10 |
# File 'lib/event_shipper/udp.rb', line 8 def stats @stats end |
Instance Method Details
#close ⇒ Object
22 23 24 |
# File 'lib/event_shipper/udp.rb', line 22 def close @socket.close end |
#decode(obj) ⇒ Object
33 34 35 |
# File 'lib/event_shipper/udp.rb', line 33 def decode obj @filters.reverse.inject([obj, {}]) { |o, f| f.de(*o) } end |
#dispatch ⇒ Object
Enters a loop, receiving messages, yielding them to the block.
69 70 71 72 73 74 75 76 |
# File 'lib/event_shipper/udp.rb', line 69 def dispatch @socket.bind @host, @port loop do datagram, source_info = @socket.recvfrom(10 * 1024) (datagram, &Proc.new) end end |
#encode(obj) ⇒ Object
30 31 32 |
# File 'lib/event_shipper/udp.rb', line 30 def encode obj @filters.inject(obj) { |o, f| f.en(o) } end |
#handle_message(string) ⇒ Object
Handles a single message; decoding it and passing it to the caller.
54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/event_shipper/udp.rb', line 54 def string msg, attributes = decode(string) if msg @stats. queue = attributes[:queue] yield queue, msg else @stats.count_failure end end |
#send(hash, queue = 'queue') ⇒ Object
Sends messages via UDP to a central proxy. Using the queue argument, the caller can decide which redis queue events end up in.
42 43 44 45 46 47 48 49 50 |
# File 'lib/event_shipper/udp.rb', line 42 def send hash, queue='queue' event = Protocol.event( queue: queue, json: hash.to_json) @socket.send encode(event), 0, # flags... @host, @port end |
#wrap(filter) ⇒ Object
26 27 28 |
# File 'lib/event_shipper/udp.rb', line 26 def wrap filter @filters << filter end |