Module: PryRemoteEm::Broker

Includes:
Proto
Defined in:
lib/pry-remote-em/broker.rb

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Proto

#receive_auth, #receive_banner, #receive_clear_buffer, #receive_completion, #receive_data, #receive_msg, #receive_msg_bcast, #receive_object, #receive_prompt, #receive_raw, #receive_server_list, #receive_shell_cmd, #receive_shell_data, #receive_shell_result, #receive_shell_sig, #receive_start_tls, #receive_unknown, #send_auth, #send_banner, #send_clear_buffer, #send_completion, #send_heatbeat, #send_msg, #send_msg_bcast, #send_object, #send_prompt, #send_proxy_connection, #send_raw, #send_register_server, #send_server_list, #send_server_reload_list, #send_shell_cmd, #send_shell_data, #send_shell_result, #send_shell_sig, #send_start_tls, #send_unregister_server

Class Attribute Details

.hostObject (readonly)

Returns the value of attribute host.



10
11
12
# File 'lib/pry-remote-em/broker.rb', line 10

def host
  @host
end

.listeningObject (readonly) Also known as: listening?

Returns the value of attribute listening.



10
11
12
# File 'lib/pry-remote-em/broker.rb', line 10

def listening
  @listening
end

.portObject (readonly)

Returns the value of attribute port.



10
11
12
# File 'lib/pry-remote-em/broker.rb', line 10

def port
  @port
end

Class Method Details

.connected?Boolean

Returns:

  • (Boolean)


106
107
108
# File 'lib/pry-remote-em/broker.rb', line 106

def connected?
  @connected
end

.hbeatsObject



102
103
104
# File 'lib/pry-remote-em/broker.rb', line 102

def hbeats
  @hbeats ||= {}
end

.logObject



52
53
54
55
# File 'lib/pry-remote-em/broker.rb', line 52

def log
  return opts[:logger] if opts[:logger]
  @log ||= Logger.new(STDERR)
end

.optsObject



48
49
50
# File 'lib/pry-remote-em/broker.rb', line 48

def opts
  @opts ||= {}
end

.register(description) ⇒ Object



61
62
63
# File 'lib/pry-remote-em/broker.rb', line 61

def register(description)
  client { |c| c.send_register_server(description[:id], description[:urls], description[:name], description[:details], description[:metrics]) }
end

.register_server(id, description) ⇒ Object



69
70
71
72
73
# File 'lib/pry-remote-em/broker.rb', line 69

def register_server(id, description)
  servers[id] = description
  watch_heartbeats(id)
  log.info("[pry-remote-em broker] registered #{id} #{description.inspect}")
end

.restartObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/pry-remote-em/broker.rb', line 30

def restart
  log.info("[pry-remote-em broker] restarting on pryem://#{host}:#{port}")
  @waiting   = nil
  @client    = nil
  run(@host, @port, @opts) do
    PryRemoteEm.servers.each do |id, description|
      next unless EM.get_sockname(description[:server])
      register(
        id: description[:id],
        urls: description[:urls],
        name: description[:name],
        details: description[:details],
        metrics: PryRemoteEm::Metrics.list
      )
    end
  end
end

.run(host = nil, port = nil, opts = {}) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/pry-remote-em/broker.rb', line 13

def run(host = nil, port = nil, opts = {})
  host ||= ENV['PRYEMBROKER'].nil? || ENV['PRYEMBROKER'].empty? ? DEFAULT_BROKER_HOST : ENV['PRYEMBROKER']
  port ||= ENV['PRYEMBROKERPORT'].nil? || ENV['PRYEMBROKERPORT'].empty? ? DEFAULT_BROKER_PORT : ENV['PRYEMBROKERPORT']
  port = port.to_i if port.kind_of?(String)
  raise "root permission required for port below 1024 (#{port})" if port < 1024 && Process.euid != 0
  @host = host
  @port = port
  opts = opts.dup
  # Brokers cannot use SSL directly. If they do then when a proxy request to an SSL server is received
  # the client and server will not be able to negotiate a SSL session. The proxied traffic can be SSL
  # encrypted, but the SSL session will be between the client and the server.
  opts[:tls] = false
  @opts = opts
  start_server(host, port, opts) unless @listening || ENV['PRYEMREMOTEBROKER'] || @opts[:remote_broker]
  client { |c| yield self } if block_given?
end

.serversObject



57
58
59
# File 'lib/pry-remote-em/broker.rb', line 57

def servers
  @servers ||= {}
end

.timersObject



98
99
100
# File 'lib/pry-remote-em/broker.rb', line 98

def timers
  @timers ||= {}
end

.unregister(id) ⇒ Object



65
66
67
# File 'lib/pry-remote-em/broker.rb', line 65

def unregister(id)
  client { |c| c.send_unregister_server(id) }
end

.unregister_server(id) ⇒ Object



81
82
83
84
85
86
87
# File 'lib/pry-remote-em/broker.rb', line 81

def unregister_server(id)
  server = servers.delete(id) or return
  log.warn("[pry-remote-em broker] unregister #{id} #{server.inspect}")
  timer = timers.delete(id)
  timer.cancel if timer
  hbeats.delete(id)
end

.update_server(server, description) ⇒ Object



75
76
77
78
79
# File 'lib/pry-remote-em/broker.rb', line 75

def update_server(server, description)
  server.update(urls: description[:urls], name: description[:name])
  server[:details].update(description[:details])
  server[:metrics].update(description[:metrics])
end

.watch_heartbeats(id) ⇒ Object



89
90
91
92
93
94
95
96
# File 'lib/pry-remote-em/broker.rb', line 89

def watch_heartbeats(id)
  interval = ENV['PRYEMHBCHECK'].nil? || ENV['PRYEMHBCHECK'].empty? ? HEARTBEAT_CHECK_INTERVAL : ENV['PRYEMHBCHECK']
  timers[id] ||= EM::PeriodicTimer.new(interval) do
    if !hbeats[id] || (Time.new - hbeats[id]) > 20
      unregister_server(id)
    end
  end
end

Instance Method Details

#initialize(opts = {}, &blk) ⇒ Object



181
182
183
184
# File 'lib/pry-remote-em/broker.rb', line 181

def initialize(opts = {}, &blk)
  @opts = opts
  @ids = []
end

#logObject



186
187
188
# File 'lib/pry-remote-em/broker.rb', line 186

def log
  Broker.log
end

#peer_ipObject



203
204
205
206
207
208
209
# File 'lib/pry-remote-em/broker.rb', line 203

def peer_ip
  return @peer_ip if @peer_ip
  return '' if get_peername.nil?
  @peer_port, @peer_ip = Socket.unpack_sockaddr_in(get_peername)
  @peer_ip = '127.0.0.1' if @peer_ip == '::1' # Little hack to avoid segmentation fault in EventMachine 1.2.0.1 while connecting to PryRemoteEm Server from localhost with IPv6 address
  @peer_ip
end

#peer_portObject



211
212
213
214
215
216
# File 'lib/pry-remote-em/broker.rb', line 211

def peer_port
  return @peer_port if @peer_port
  return '' if get_peername.nil?
  peer_ip # Fills peer_port too
  @peer_port
end

#post_initObject



190
191
192
193
194
195
# File 'lib/pry-remote-em/broker.rb', line 190

def post_init
  port, ip = Socket.unpack_sockaddr_in(get_peername)
  log.info("[pry-remote-em broker] received client connection from #{ip}:#{port}")
  send_banner("PryRemoteEm #{VERSION} #{@opts[:tls] ? 'pryems' : 'pryem'}")
  @opts[:tls] ? start_tls : send_server_list(Broker.servers)
end

#receive_proxy_connection(url) ⇒ Object



175
176
177
178
179
# File 'lib/pry-remote-em/broker.rb', line 175

def receive_proxy_connection(url)
  log.info("[pry-remote-em broker] proxying to #{url}")
  url = URI.parse(url)
  EM.connect(url.host, url.port, Client::Proxy, self)
end

#receive_register_server(id, urls, name, details, metrics) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
# File 'lib/pry-remote-em/broker.rb', line 158

def receive_register_server(id, urls, name, details, metrics)
  @ids.push(id)
  description = { urls: urls, name: name, details: details, metrics: metrics }
  Broker.hbeats[id] = Time.new
  server = Broker.servers[id]
  if server
    Broker.update_server(server, description)
  else
    Broker.register_server(id, description)
  end
end

#receive_server_reload_listObject



154
155
156
# File 'lib/pry-remote-em/broker.rb', line 154

def receive_server_reload_list
  send_server_list(Broker.servers)
end

#receive_unregister_server(id) ⇒ Object



170
171
172
173
# File 'lib/pry-remote-em/broker.rb', line 170

def receive_unregister_server(id)
  server = Broker.servers[id]
  Broker.unregister_server(id) if server
end

#ssl_handshake_completedObject



218
219
220
221
# File 'lib/pry-remote-em/broker.rb', line 218

def ssl_handshake_completed
  log.info("[pry-remote-em broker] TLS connection established (#{peer_ip}:#{peer_port})")
  send_server_list(Broker.servers)
end

#start_tlsObject



197
198
199
200
201
# File 'lib/pry-remote-em/broker.rb', line 197

def start_tls
  log.debug("[pry-remote-em broker] starting TLS (#{peer_ip}:#{peer_port})")
  send_start_tls
  super(@opts[:tls].is_a?(Hash) ? @opts[:tls] : {})
end

#unbindObject



223
224
225
226
227
# File 'lib/pry-remote-em/broker.rb', line 223

def unbind
  @ids.each do |id|
    Broker.unregister_server(id)
  end
end