Class: EventShipper::UDP

Inherits:
Object
  • Object
show all
Defined in:
lib/event_shipper/udp.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port) ⇒ UDP

Returns a new instance of UDP.



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/event_shipper/udp.rb', line 10

def initialize host, port
  @host, @port = host, port
  @socket = UDPSocket.new
  
  # The Bson-Filter acts as a terminator for hash based messages and
  # turns things into BSON for the wire. 
  @filters = []
  wrap Filter::Transmission.new
  
  @stats = Stats.new      
end

Instance Attribute Details

#statsObject (readonly)

Returns the value of attribute stats.



8
9
10
# File 'lib/event_shipper/udp.rb', line 8

def stats
  @stats
end

Instance Method Details

#closeObject



22
23
24
# File 'lib/event_shipper/udp.rb', line 22

def close
  @socket.close
end

#decode(obj) ⇒ Object



33
34
35
# File 'lib/event_shipper/udp.rb', line 33

def decode obj
  @filters.reverse.inject([obj, {}]) { |o, f| f.de(*o) }
end

#dispatchObject

Enters a loop, receiving messages, yielding them to the block.



69
70
71
72
73
74
75
76
# File 'lib/event_shipper/udp.rb', line 69

def dispatch
  @socket.bind @host, @port
  loop do
    datagram, source_info = @socket.recvfrom(10 * 1024)

    handle_message(datagram, &Proc.new)
  end
end

#encode(obj) ⇒ Object



30
31
32
# File 'lib/event_shipper/udp.rb', line 30

def encode obj
  @filters.inject(obj) { |o, f| f.en(o) }
end

#handle_message(string) ⇒ Object

Handles a single message; decoding it and passing it to the caller.



54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/event_shipper/udp.rb', line 54

def handle_message string
  msg, attributes = decode(string)

  if msg
    @stats.count_message

    queue = attributes[:queue]
    yield queue, msg
  else
    @stats.count_failure
  end
end

#send(hash, queue = 'queue') ⇒ Object

Sends messages via UDP to a central proxy. Using the queue argument, the caller can decide which redis queue events end up in.

Parameters:

  • hash (Hash)

    Attributes of the event to send.

  • queue (String) (defaults to: 'queue')

    Queue to put events into on the other end



42
43
44
45
46
47
48
49
50
# File 'lib/event_shipper/udp.rb', line 42

def send hash, queue='queue'
  event = Protocol.event(
    queue: queue, 
    json: hash.to_json)

  @socket.send encode(event), 
    0,    # flags...
    @host, @port
end

#wrap(filter) ⇒ Object



26
27
28
# File 'lib/event_shipper/udp.rb', line 26

def wrap filter
  @filters << filter
end