Class: EventMachine::Postman

Inherits:
Object
  • Object
show all
Defined in:
lib/em-postman/postman.rb,
lib/em-postman/version.rb,
lib/em-postman/synchrony.rb

Constant Summary collapse

VERSION =
"0.1.0"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(mailbox, options = {}) ⇒ Postman

Returns a new instance of Postman.



14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/em-postman/postman.rb', line 14

def initialize(mailbox, options = {})
  @mailbox = mailbox
  @timeout = options[:timeout] || 5
  @redis_options = {:db => 0, :host => 'localhost', :port => 6379}

  @options = options
  if options[:redis].is_a?(EventMachine::Hiredis::Client)
    @redis = options[:redis]
    @redis_options = extract_redis_options(@redis)
  else
    @redis_options.merge(options[:redis] || {})
  end
end

Instance Attribute Details

#loggerObject



6
7
8
# File 'lib/em-postman/postman.rb', line 6

def logger
  @logger ||= Logger.new(STDOUT)
end

#mailboxObject

Returns the value of attribute mailbox.



3
4
5
# File 'lib/em-postman/postman.rb', line 3

def mailbox
  @mailbox
end

Instance Method Details

#clearObject



32
33
34
# File 'lib/em-postman/postman.rb', line 32

def clear
  redis.del(inbox_name)
end

#handlersObject



10
11
12
# File 'lib/em-postman/postman.rb', line 10

def handlers
  @handlers ||= {}
end

#listenObject



59
60
61
# File 'lib/em-postman/postman.rb', line 59

def listen
  listen_messages
end

#listen_messages(redis = new_redis_client) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/em-postman/postman.rb', line 87

def listen_messages(redis = new_redis_client)
  EM.next_tick do
    deferable = redis.brpop(inbox_name, @timeout)
    deferable.callback do |channel, message|
      handle_rpop(channel, message) unless channel.nil?
      listen_messages(redis)
    end

    deferable.errback do |error|
      logger.error "Postman[#{mailbox}]: #{error.inspect}"
      listen_messages(redis)
    end
  end
end

#onmessage(method, &callback) ⇒ Object



36
37
38
39
40
41
# File 'lib/em-postman/postman.rb', line 36

def onmessage(method, &callback)
  debug "##{method} registered"

  handlers[method.to_s] ||= []
  handlers[method.to_s] << callback
end

#redisObject



28
29
30
# File 'lib/em-postman/postman.rb', line 28

def redis
  @redis ||= new_redis_client
end

#send_message(recipient, method, body) ⇒ Object



49
50
51
52
53
54
55
56
57
# File 'lib/em-postman/postman.rb', line 49

def send_message(recipient, method, body)
  message = MultiJson.encode({
    'method' => method,
    'body' => body
  })

  debug "-> #{recipient}##{method}: #{body.inspect}"
  redis.lpush("postman:inbox_#{recipient}", message)
end

#unlisten(method, callback) ⇒ Object



43
44
45
46
47
# File 'lib/em-postman/postman.rb', line 43

def unlisten(method, callback)
  if cbs = handlers[method.to_s]
    cbs.delete(callback)
  end
end