Class: Droid
- Inherits:
-
Object
show all
- Includes:
- BackwardsCompatibleMethods, EMTimerUtils, QueueMethods
- Defined in:
- lib/droid.rb,
lib/droid/em.rb,
lib/droid/sync.rb,
lib/droid/queue.rb,
lib/droid/utils.rb,
lib/droid/heroku.rb,
lib/droid/publish.rb,
lib/droid/request.rb,
lib/droid/version.rb,
lib/droid/json_server.rb,
lib/droid/utilization.rb
Defined Under Namespace
Modules: BackwardsCompatibleMethods, EMTimerUtils, QueueMethods, Utilization, Utils
Classes: BackwardsCompatibleQueue, BadPayload, BaseQueue, ExpiredMessage, JSONServer, ListenQueue, ReplyQueue, Request, SyncException, UnknownReplyTo, WorkerQueue
Constant Summary
collapse
- DEFAULT_TTL =
300
Class Method Summary
collapse
-
.bunny ⇒ Object
-
.call(queue_name, data, opts = {}, popts = {}) ⇒ Object
-
.closing? ⇒ Boolean
-
.default_config ⇒ Object
-
.gen_queue(droid, name) ⇒ Object
Need this to be backwards compatible.
-
.handle_error(err) ⇒ Object
-
.log ⇒ Object
-
.log=(log) ⇒ Object
-
.name ⇒ Object
-
.name=(name) ⇒ Object
-
.pop(q) ⇒ Object
-
.publish(ex_name, data, opts = {}, popts = {}) ⇒ Object
default is publish to exchange.
-
.publish_to_ex(ex_name, data, opts = {}, popts = {}) ⇒ Object
publish to exchange directly.
-
.publish_to_q(queue_name, data, opts = {}, popts = {}) ⇒ Object
publish to queue directly.
-
.reconnect_on_error ⇒ Object
-
.reset_bunny ⇒ Object
-
.start(name, opts = {}) ⇒ Object
-
.stop_safe ⇒ Object
-
.version ⇒ Object
-
.wait_for_tcp_port(host, port, opts = {}) ⇒ Object
Instance Method Summary
collapse
included, #periodic_timer, #timer
#listen4
#listener, #worker
Constructor Details
#initialize(name, opts = {}) ⇒ Droid
Returns a new instance of Droid.
135
136
137
138
139
140
141
142
|
# File 'lib/droid.rb', line 135
def initialize(name, opts={})
log.info "=== #{name} droid initializing"
self.class.name = name
self.class.start do
yield self if block_given?
end
end
|
Class Method Details
.bunny ⇒ Object
11
12
13
14
15
16
17
|
# File 'lib/droid/sync.rb', line 11
def self.bunny
@@bunny ||= begin
b = Bunny.new(default_config)
b.start
b
end
end
|
.call(queue_name, data, opts = {}, popts = {}) ⇒ Object
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/droid/sync.rb', line 53
def self.call(queue_name, data, opts={}, popts={})
opts[:reply_to] ||= Droid::Utils.generate_reply_to(queue_name)
q = nil
begin
reconnect_on_error do
q = bunny.queue(opts[:reply_to], :auto_delete => true)
publish_to_ex(queue_name, data, opts, popts)
pop(q)
end
ensure
if q
q.delete rescue nil
end
end
end
|
.closing? ⇒ Boolean
106
107
108
|
# File 'lib/droid.rb', line 106
def self.closing?
::AMQP.closing?
end
|
.default_config ⇒ Object
69
70
71
72
73
74
75
76
77
78
79
80
|
# File 'lib/droid.rb', line 69
def self.default_config
uri = URI.parse(ENV["AMQP_URL"] || 'amqp://guest:guest@localhost:5672/')
{
:vhost => uri.path,
:host => uri.host,
:user => uri.user,
:port => uri.port || 5672,
:pass => uri.password
}
rescue Object => e
raise "invalid AMQP_URL: (#{uri.inspect}) #{e.class} -> #{e.message}"
end
|
.gen_queue(droid, name) ⇒ Object
Need this to be backwards compatible
106
107
108
109
110
111
112
|
# File 'lib/droid/utils.rb', line 106
def self.gen_queue(droid, name)
dn = droid
dn = dn.name if dn.respond_to?(:name)
dn ||= "d"
dn = dn.gsub(" ", "")
Droid::Utils.generate_queue(name, droid)
end
|
.handle_error(err) ⇒ Object
110
111
112
|
# File 'lib/droid.rb', line 110
def self.handle_error(err)
log.error "#{err.class}: #{err.message}", :exception => err
end
|
.log ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/droid.rb', line 50
def self.log
@@log ||= begin
require 'logger'
Logger.class_eval <<EORUBY
alias_method :notice, :info
alias_method :error_og, :error
def error(err, opts={})
e = opts[:exception]
if e.respond_to?(:backtrace)
err += "\n" + e.backtrace.join("\n ")
end
error_og(err)
end
EORUBY
Logger.new($stderr)
end
end
|
.log=(log) ⇒ Object
46
47
48
|
# File 'lib/droid.rb', line 46
def self.log=(log)
@@log = log
end
|
.name ⇒ Object
38
39
40
|
# File 'lib/droid.rb', line 38
def self.name
@@name ||= nil
end
|
.name=(name) ⇒ Object
42
43
44
|
# File 'lib/droid.rb', line 42
def self.name=(name)
@@name = name
end
|
.pop(q) ⇒ Object
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/droid/sync.rb', line 42
def self.pop(q)
begin
loop do
result = q.pop
result = result[:payload] if result.is_a?(Hash)
return JSON.parse(result) unless result == :queue_empty
sleep 0.1
end
end
end
|
.publish(ex_name, data, opts = {}, popts = {}) ⇒ Object
default is publish to exchange
25
26
27
|
# File 'lib/droid/publish.rb', line 25
def self.publish(ex_name, data, opts={}, popts={})
publish_to_ex(ex_name, data, opts, popts)
end
|
.publish_to_ex(ex_name, data, opts = {}, popts = {}) ⇒ Object
publish to exchange directly
17
18
19
20
21
22
23
24
|
# File 'lib/droid/publish.rb', line 17
def self.publish_to_ex(ex_name, data, opts={}, popts={})
reconnect_on_error do
ex = bunny.exchange(ex_name)
json, popts = Droid::Utils.format_publish(data, opts, popts)
ex.publish(json, popts)
end
log.info "amqp_publish exchange=#{ex_name} #{Droid::Utils.format_data_summary(data, popts[:headers])}" unless opts[:log] == false
end
|
.publish_to_q(queue_name, data, opts = {}, popts = {}) ⇒ Object
publish to queue directly
70
71
72
73
74
75
76
77
|
# File 'lib/droid/sync.rb', line 70
def self.publish_to_q(queue_name, data, opts={}, popts={})
reconnect_on_error do
q = bunny.queue(queue_name)
json, popts = Droid::Utils.format_publish(data, opts, popts)
q.publish(json, popts)
end
log.info "amqp_publish queue=#{queue_name} #{Droid::Utils.format_data_summary(data, popts[:headers])}" unless opts[:log] == false
end
|
.reconnect_on_error ⇒ Object
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
# File 'lib/droid/sync.rb', line 23
def self.reconnect_on_error
DroidTimer::timeout(20) do
begin
yield if block_given?
rescue Bunny::ProtocolError
sleep 0.5
retry
rescue Bunny::ConnectionError
sleep 0.5
reset_bunny
retry
rescue Bunny::ServerDownError
sleep 0.5
reset_bunny
retry
end
end
end
|
.reset_bunny ⇒ Object
7
8
9
|
# File 'lib/droid/sync.rb', line 7
def self.reset_bunny
@@bunny = nil
end
|
.start(name, opts = {}) ⇒ Object
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
# File 'lib/droid.rb', line 82
def self.start(opts={})
config = opts[:config] || self.default_config
wait_for_tcp_port(config[:host], config[:port])
begin
::Signal.trap('INT') { ::AMQP.stop{ ::EM.stop } }
::Signal.trap('TERM'){ ::AMQP.stop{ ::EM.stop } }
::AMQP.start(config) do
yield if block_given?
end
rescue ::AMQP::Error => e
log.debug "Caught #{e.class}, sleeping to avoid inittab thrashing"
sleep 5
log.debug "Done."
raise
end
end
|
.stop_safe ⇒ Object
102
103
104
|
# File 'lib/droid.rb', line 102
def self.stop_safe
::EM.add_timer(0.2) { ::AMQP.stop { ::EM.stop } }
end
|
.version ⇒ Object
2
3
4
|
# File 'lib/droid/version.rb', line 2
def self.version
"1.1.1"
end
|
.wait_for_tcp_port(host, port, opts = {}) ⇒ Object
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
# File 'lib/droid.rb', line 114
def self.wait_for_tcp_port(host, port, opts={})
require 'socket'
opts[:retries] ||= 6
opts[:timeout] ||= 5
opts[:retries].times do
begin
DroidTimer::timeout(opts[:timeout]) do
TCPSocket.new(host.to_s, port).close
end
return
rescue Object => e
log.info "#{host}:#{port} not available, waiting... #{e.class}: #{e.message}"
sleep 1
end
end
raise "#{host}:#{port} did not come up after #{opts[:retries]} retries"
end
|
Instance Method Details
#log ⇒ Object
148
149
150
|
# File 'lib/droid.rb', line 148
def log
self.class.log
end
|
#publish(*args) ⇒ Object
144
145
146
|
# File 'lib/droid.rb', line 144
def publish(*args)
Droid.publish(*args)
end
|