Class: Droid

Inherits:
Object
  • Object
show all
Includes:
BackwardsCompatibleMethods, EMTimerUtils, QueueMethods
Defined in:
lib/droid.rb,
lib/droid/em.rb,
lib/droid/sync.rb,
lib/droid/queue.rb,
lib/droid/utils.rb,
lib/droid/heroku.rb,
lib/droid/publish.rb,
lib/droid/request.rb,
lib/droid/json_server.rb,
lib/droid/utilization.rb

Direct Known Subclasses

HerokuDroid

Defined Under Namespace

Modules: BackwardsCompatibleMethods, EMTimerUtils, QueueMethods, Utilization, Utils Classes: BackwardsCompatibleQueue, BadPayload, BaseQueue, ExpiredMessage, JSONServer, ListenQueue, ReplyQueue, Request, SyncException, UnknownReplyTo, WorkerQueue

Constant Summary collapse

DEFAULT_TTL =
300

Class Method Summary collapse

Instance Method Summary collapse

Methods included from EMTimerUtils

included, #periodic_timer, #timer

Methods included from BackwardsCompatibleMethods

#listen4

Methods included from QueueMethods

#listener, #worker

Constructor Details

#initialize(name, opts = {}) ⇒ Droid

Returns a new instance of Droid.



134
135
136
137
138
139
140
141
# File 'lib/droid.rb', line 134

def initialize(name, opts={})
  log.info "=== #{name} droid initializing"

  self.class.name = name
  self.class.start do
    yield self if block_given?
  end
end

Class Method Details

.bunnyObject



11
12
13
14
15
16
17
# File 'lib/droid/sync.rb', line 11

def self.bunny
  @@bunny ||= begin
    b = Bunny.new(default_config)
    b.start
    b
  end
end

.call(queue_name, data, opts = {}, popts = {}) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/droid/sync.rb', line 53

def self.call(queue_name, data, opts={}, popts={})
  opts[:reply_to] ||= Droid::Utils.generate_reply_to(queue_name)
  q = nil
  begin
    reconnect_on_error do
      q = bunny.queue(opts[:reply_to], :auto_delete => true)
      publish_to_ex(queue_name, data, opts, popts)
      pop(q)
    end
  ensure
    if q
      q.delete rescue nil
    end
  end
end

.closing?Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/droid.rb', line 105

def self.closing?
  ::AMQP.closing?
end

.default_configObject



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/droid.rb', line 68

def self.default_config
  uri = URI.parse(ENV["AMQP_URL"] || 'amqp://guest:guest@localhost:5672/')
  {
    :vhost => uri.path,
    :host => uri.host,
    :user => uri.user,
    :port => uri.port || 5672,
    :pass => uri.password
  }
rescue Object => e
  raise "invalid AMQP_URL: (#{uri.inspect}) #{e.class} -> #{e.message}"
end

.gen_queue(droid, name) ⇒ Object

Need this to be backwards compatible



106
107
108
109
110
111
112
# File 'lib/droid/utils.rb', line 106

def self.gen_queue(droid, name)
  dn = droid
  dn = dn.name if dn.respond_to?(:name)
  dn ||= "d"
  dn = dn.gsub(" ", "")
  Droid::Utils.generate_queue(name, droid)
end

.handle_error(err) ⇒ Object



109
110
111
# File 'lib/droid.rb', line 109

def self.handle_error(err)
  log.error "#{err.class}: #{err.message}", :exception => err
end

.logObject



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/droid.rb', line 49

def self.log
  @@log ||= begin
    require 'logger'
    Logger.class_eval <<EORUBY
      alias_method :notice, :info

      alias_method :error_og, :error
      def error(err, opts={})
        e = opts[:exception]
        if e.respond_to?(:backtrace)
          err += "\n" + e.backtrace.join("\n  ")
        end
        error_og(err)
      end
EORUBY
    Logger.new($stderr)
  end
end

.log=(log) ⇒ Object



45
46
47
# File 'lib/droid.rb', line 45

def self.log=(log)
  @@log = log
end

.nameObject



37
38
39
# File 'lib/droid.rb', line 37

def self.name
  @@name ||= nil
end

.name=(name) ⇒ Object



41
42
43
# File 'lib/droid.rb', line 41

def self.name=(name)
  @@name = name
end

.pop(q) ⇒ Object



42
43
44
45
46
47
48
49
50
51
# File 'lib/droid/sync.rb', line 42

def self.pop(q)
  begin
    loop do
      result = q.pop
      result = result[:payload] if result.is_a?(Hash)
      return JSON.parse(result) unless result == :queue_empty
      sleep 0.1
    end
  end
end

.publish(ex_name, data, opts = {}, popts = {}) ⇒ Object

default is publish to exchange



25
26
27
# File 'lib/droid/publish.rb', line 25

def self.publish(ex_name, data, opts={}, popts={})
  publish_to_ex(ex_name, data, opts, popts)
end

.publish_to_ex(ex_name, data, opts = {}, popts = {}) ⇒ Object

publish to exchange directly



17
18
19
20
21
22
23
24
# File 'lib/droid/publish.rb', line 17

def self.publish_to_ex(ex_name, data, opts={}, popts={})
  reconnect_on_error do
    ex = bunny.exchange(ex_name)
    json, popts = Droid::Utils.format_publish(data, opts, popts)
    ex.publish(json, popts)
  end
  log.info "amqp_publish exchange=#{ex_name} #{Droid::Utils.format_data_summary(data, popts[:headers])}" unless opts[:log] == false
end

.publish_to_q(queue_name, data, opts = {}, popts = {}) ⇒ Object

publish to queue directly



70
71
72
73
74
75
76
77
# File 'lib/droid/sync.rb', line 70

def self.publish_to_q(queue_name, data, opts={}, popts={})
  reconnect_on_error do
    q = bunny.queue(queue_name)
    json, popts = Droid::Utils.format_publish(data, opts, popts)
    q.publish(json, popts)
  end
  log.info "amqp_publish queue=#{queue_name} #{Droid::Utils.format_data_summary(data, popts[:headers])}" unless opts[:log] == false
end

.reconnect_on_errorObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/droid/sync.rb', line 23

def self.reconnect_on_error
  DroidTimer::timeout(20) do
    begin
      yield if block_given?
    rescue Bunny::ProtocolError
      sleep 0.5
      retry
    rescue Bunny::ConnectionError
      sleep 0.5
      reset_bunny
      retry
    rescue Bunny::ServerDownError
      sleep 0.5
      reset_bunny
      retry
    end
  end
end

.reset_bunnyObject



7
8
9
# File 'lib/droid/sync.rb', line 7

def self.reset_bunny
  @@bunny = nil
end

.start(name, opts = {}) ⇒ Object

Raises:



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/droid.rb', line 81

def self.start(opts={})
  config = opts[:config] || self.default_config

  wait_for_tcp_port(config[:host], config[:port])

  begin
    ::Signal.trap('INT') { ::AMQP.stop{ ::EM.stop } }
    ::Signal.trap('TERM'){ ::AMQP.stop{ ::EM.stop } }

    ::AMQP.start(config) do
      yield if block_given?
    end
  rescue ::AMQP::Error => e
    log.debug "Caught #{e.class}, sleeping to avoid inittab thrashing"
    sleep 5
    log.debug "Done."
    raise
  end
end

.stop_safeObject



101
102
103
# File 'lib/droid.rb', line 101

def self.stop_safe
  ::EM.add_timer(0.2) { ::AMQP.stop { ::EM.stop } }
end

.versionObject



33
34
35
# File 'lib/droid.rb', line 33

def self.version
  @@version ||= File.read(File.dirname(__FILE__) + '/../VERSION').strip
end

.wait_for_tcp_port(host, port, opts = {}) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/droid.rb', line 113

def self.wait_for_tcp_port(host, port, opts={})
  require 'socket'

  opts[:retries] ||= 6
  opts[:timeout] ||= 5

  opts[:retries].times do
    begin
      DroidTimer::timeout(opts[:timeout]) do
        TCPSocket.new(host.to_s, port).close
      end
      return
    rescue Object => e
      log.info "#{host}:#{port} not available, waiting... #{e.class}: #{e.message}"
      sleep 1
    end
  end

  raise "#{host}:#{port} did not come up after #{opts[:retries]} retries"
end

Instance Method Details

#logObject



147
148
149
# File 'lib/droid.rb', line 147

def log
  self.class.log
end

#publish(*args) ⇒ Object



143
144
145
# File 'lib/droid.rb', line 143

def publish(*args)
  Droid.publish(*args)
end