Module: PryRemoteEm::Broker
- Includes:
- Proto
- Defined in:
- lib/pry-remote-em/broker.rb
Class Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Proto
#receive_auth, #receive_banner, #receive_clear_buffer, #receive_completion, #receive_data, #receive_msg, #receive_msg_bcast, #receive_object, #receive_prompt, #receive_raw, #receive_server_list, #receive_shell_cmd, #receive_shell_data, #receive_shell_result, #receive_shell_sig, #receive_start_tls, #receive_unknown, #send_auth, #send_banner, #send_clear_buffer, #send_completion, #send_heatbeat, #send_msg, #send_msg_bcast, #send_object, #send_prompt, #send_proxy_connection, #send_raw, #send_register_server, #send_server_list, #send_server_reload_list, #send_shell_cmd, #send_shell_data, #send_shell_result, #send_shell_sig, #send_start_tls, #send_unregister_server
Class Attribute Details
Returns the value of attribute host.
10
11
12
|
# File 'lib/pry-remote-em/broker.rb', line 10
def host
@host
end
|
.listening ⇒ Object
Also known as:
listening?
Returns the value of attribute listening.
10
11
12
|
# File 'lib/pry-remote-em/broker.rb', line 10
def listening
@listening
end
|
Returns the value of attribute port.
10
11
12
|
# File 'lib/pry-remote-em/broker.rb', line 10
def port
@port
end
|
Class Method Details
.connected? ⇒ Boolean
106
107
108
|
# File 'lib/pry-remote-em/broker.rb', line 106
def connected?
@connected
end
|
102
103
104
|
# File 'lib/pry-remote-em/broker.rb', line 102
def hbeats
@hbeats ||= {}
end
|
52
53
54
55
|
# File 'lib/pry-remote-em/broker.rb', line 52
def log
return opts[:logger] if opts[:logger]
@log ||= Logger.new(STDERR)
end
|
48
49
50
|
# File 'lib/pry-remote-em/broker.rb', line 48
def opts
@opts ||= {}
end
|
.register(description) ⇒ Object
61
62
63
|
# File 'lib/pry-remote-em/broker.rb', line 61
def register(description)
client { |c| c.send_register_server(description[:id], description[:urls], description[:name], description[:details], description[:metrics]) }
end
|
.register_server(id, description) ⇒ Object
69
70
71
72
73
|
# File 'lib/pry-remote-em/broker.rb', line 69
def register_server(id, description)
servers[id] = description
watch_heartbeats(id)
log.info("[pry-remote-em broker] registered #{id} #{description.inspect}")
end
|
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/pry-remote-em/broker.rb', line 30
def restart
log.info("[pry-remote-em broker] restarting on pryem://#{host}:#{port}")
@waiting = nil
@client = nil
run(@host, @port, @opts) do
PryRemoteEm.servers.each do |id, description|
next unless EM.get_sockname(description[:server])
register(
id: description[:id],
urls: description[:urls],
name: description[:name],
details: description[:details],
metrics: PryRemoteEm::Metrics.list
)
end
end
end
|
.run(host = nil, port = nil, opts = {}) ⇒ Object
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# File 'lib/pry-remote-em/broker.rb', line 13
def run(host = nil, port = nil, opts = {})
host ||= ENV['PRYEMBROKER'].nil? || ENV['PRYEMBROKER'].empty? ? DEFAULT_BROKER_HOST : ENV['PRYEMBROKER']
port ||= ENV['PRYEMBROKERPORT'].nil? || ENV['PRYEMBROKERPORT'].empty? ? DEFAULT_BROKER_PORT : ENV['PRYEMBROKERPORT']
port = port.to_i if port.kind_of?(String)
raise "root permission required for port below 1024 (#{port})" if port < 1024 && Process.euid != 0
@host = host
@port = port
opts = opts.dup
opts[:tls] = false
@opts = opts
start_server(host, port, opts) unless @listening || ENV['PRYEMREMOTEBROKER'] || @opts[:remote_broker]
client { |c| yield self } if block_given?
end
|
57
58
59
|
# File 'lib/pry-remote-em/broker.rb', line 57
def servers
@servers ||= {}
end
|
98
99
100
|
# File 'lib/pry-remote-em/broker.rb', line 98
def timers
@timers ||= {}
end
|
.unregister(id) ⇒ Object
65
66
67
|
# File 'lib/pry-remote-em/broker.rb', line 65
def unregister(id)
client { |c| c.send_unregister_server(id) }
end
|
.unregister_server(id) ⇒ Object
81
82
83
84
85
86
87
|
# File 'lib/pry-remote-em/broker.rb', line 81
def unregister_server(id)
server = servers.delete(id) or return
log.warn("[pry-remote-em broker] unregister #{id} #{server.inspect}")
timer = timers.delete(id)
timer.cancel if timer
hbeats.delete(id)
end
|
.update_server(server, description) ⇒ Object
75
76
77
78
79
|
# File 'lib/pry-remote-em/broker.rb', line 75
def update_server(server, description)
server.update(urls: description[:urls], name: description[:name])
server[:details].update(description[:details])
server[:metrics].update(description[:metrics])
end
|
.watch_heartbeats(id) ⇒ Object
89
90
91
92
93
94
95
96
|
# File 'lib/pry-remote-em/broker.rb', line 89
def watch_heartbeats(id)
interval = ENV['PRYEMHBCHECK'].nil? || ENV['PRYEMHBCHECK'].empty? ? HEARTBEAT_CHECK_INTERVAL : ENV['PRYEMHBCHECK']
timers[id] ||= EM::PeriodicTimer.new(interval) do
if !hbeats[id] || (Time.new - hbeats[id]) > 20
unregister_server(id)
end
end
end
|
Instance Method Details
#initialize(opts = {}, &blk) ⇒ Object
181
182
183
184
|
# File 'lib/pry-remote-em/broker.rb', line 181
def initialize(opts = {}, &blk)
@opts = opts
@ids = []
end
|
186
187
188
|
# File 'lib/pry-remote-em/broker.rb', line 186
def log
Broker.log
end
|
203
204
205
206
207
208
209
|
# File 'lib/pry-remote-em/broker.rb', line 203
def peer_ip
return @peer_ip if @peer_ip
return '' if get_peername.nil?
@peer_port, @peer_ip = Socket.unpack_sockaddr_in(get_peername)
@peer_ip = '127.0.0.1' if @peer_ip == '::1' @peer_ip
end
|
#peer_port ⇒ Object
211
212
213
214
215
216
|
# File 'lib/pry-remote-em/broker.rb', line 211
def peer_port
return @peer_port if @peer_port
return '' if get_peername.nil?
peer_ip @peer_port
end
|
#post_init ⇒ Object
190
191
192
193
194
195
|
# File 'lib/pry-remote-em/broker.rb', line 190
def post_init
port, ip = Socket.unpack_sockaddr_in(get_peername)
log.info("[pry-remote-em broker] received client connection from #{ip}:#{port}")
send_banner("PryRemoteEm #{VERSION} #{@opts[:tls] ? 'pryems' : 'pryem'}")
@opts[:tls] ? start_tls : send_server_list(Broker.servers)
end
|
#receive_proxy_connection(url) ⇒ Object
175
176
177
178
179
|
# File 'lib/pry-remote-em/broker.rb', line 175
def receive_proxy_connection(url)
log.info("[pry-remote-em broker] proxying to #{url}")
url = URI.parse(url)
EM.connect(url.host, url.port, Client::Proxy, self)
end
|
#receive_register_server(id, urls, name, details, metrics) ⇒ Object
158
159
160
161
162
163
164
165
166
167
168
|
# File 'lib/pry-remote-em/broker.rb', line 158
def receive_register_server(id, urls, name, details, metrics)
@ids.push(id)
description = { urls: urls, name: name, details: details, metrics: metrics }
Broker.hbeats[id] = Time.new
server = Broker.servers[id]
if server
Broker.update_server(server, description)
else
Broker.register_server(id, description)
end
end
|
#receive_server_reload_list ⇒ Object
154
155
156
|
# File 'lib/pry-remote-em/broker.rb', line 154
def receive_server_reload_list
send_server_list(Broker.servers)
end
|
#receive_unregister_server(id) ⇒ Object
#ssl_handshake_completed ⇒ Object
218
219
220
221
|
# File 'lib/pry-remote-em/broker.rb', line 218
def ssl_handshake_completed
log.info("[pry-remote-em broker] TLS connection established (#{peer_ip}:#{peer_port})")
send_server_list(Broker.servers)
end
|
#start_tls ⇒ Object
197
198
199
200
201
|
# File 'lib/pry-remote-em/broker.rb', line 197
def start_tls
log.debug("[pry-remote-em broker] starting TLS (#{peer_ip}:#{peer_port})")
send_start_tls
super(@opts[:tls].is_a?(Hash) ? @opts[:tls] : {})
end
|
223
224
225
226
227
|
# File 'lib/pry-remote-em/broker.rb', line 223
def unbind
@ids.each do |id|
Broker.unregister_server(id)
end
end
|