Class: NATSD::Server
- Inherits:
-
Object
- Object
- NATSD::Server
- Defined in:
- lib/nats/server/varz.rb,
lib/nats/server/connz.rb,
lib/nats/server/server.rb,
lib/nats/server/options.rb
Class Attribute Summary collapse
-
.auth_required ⇒ Object
(also: auth_required?)
readonly
Returns the value of attribute auth_required.
-
.auth_timeout ⇒ Object
readonly
Returns the value of attribute auth_timeout.
-
.debug_flag ⇒ Object
(also: debug_flag?)
readonly
Returns the value of attribute debug_flag.
-
.healthz ⇒ Object
Returns the value of attribute healthz.
-
.id ⇒ Object
readonly
Returns the value of attribute id.
-
.in_bytes ⇒ Object
Returns the value of attribute in_bytes.
-
.in_msgs ⇒ Object
Returns the value of attribute in_msgs.
-
.info ⇒ Object
readonly
Returns the value of attribute info.
-
.log_time ⇒ Object
readonly
Returns the value of attribute log_time.
-
.max_connections ⇒ Object
Returns the value of attribute max_connections.
-
.max_control_line ⇒ Object
readonly
Returns the value of attribute max_control_line.
-
.max_payload ⇒ Object
readonly
Returns the value of attribute max_payload.
-
.max_pending ⇒ Object
readonly
Returns the value of attribute max_pending.
-
.num_connections ⇒ Object
Returns the value of attribute num_connections.
-
.options ⇒ Object
readonly
Returns the value of attribute options.
-
.out_bytes ⇒ Object
Returns the value of attribute out_bytes.
-
.out_msgs ⇒ Object
Returns the value of attribute out_msgs.
-
.ping_interval ⇒ Object
readonly
Returns the value of attribute ping_interval.
-
.ping_max ⇒ Object
readonly
Returns the value of attribute ping_max.
-
.ssl_required ⇒ Object
(also: ssl_required?)
readonly
Returns the value of attribute ssl_required.
-
.ssl_timeout ⇒ Object
readonly
Returns the value of attribute ssl_timeout.
-
.syslog ⇒ Object
readonly
Returns the value of attribute syslog.
-
.trace_flag ⇒ Object
(also: trace_flag?)
readonly
Returns the value of attribute trace_flag.
-
.varz ⇒ Object
Returns the value of attribute varz.
Class Method Summary collapse
- .auth_ok?(user, pass) ⇒ Boolean
- .cid ⇒ Object
- .close_syslog ⇒ Object
- .deliver_to_subscriber(sub, subject, reply, msg) ⇒ Object
- .dump_connections ⇒ Object
- .finalize_options ⇒ Object
- .host ⇒ Object
- .info_string ⇒ Object
- .open_syslog ⇒ Object
- .parser ⇒ Object
- .pid_file ⇒ Object
- .port ⇒ Object
- .process_options(argv = []) ⇒ Object
- .read_config_file ⇒ Object
- .route_to_subscribers(subject, reply, msg) ⇒ Object
- .setup(argv) ⇒ Object
- .setup_logs ⇒ Object
-
.start_http_server ⇒ Object
Monitoring.
- .subscribe(sub) ⇒ Object
- .symbolize_users(users) ⇒ Object
- .unsubscribe(sub) ⇒ Object
- .update_varz ⇒ Object
- .version ⇒ Object
Class Attribute Details
.auth_required ⇒ Object (readonly) Also known as: auth_required?
Returns the value of attribute auth_required.
9 10 11 |
# File 'lib/nats/server/server.rb', line 9 def auth_required @auth_required end |
.auth_timeout ⇒ Object (readonly)
Returns the value of attribute auth_timeout.
10 11 12 |
# File 'lib/nats/server/server.rb', line 10 def auth_timeout @auth_timeout end |
.debug_flag ⇒ Object (readonly) Also known as: debug_flag?
Returns the value of attribute debug_flag.
9 10 11 |
# File 'lib/nats/server/server.rb', line 9 def debug_flag @debug_flag end |
.healthz ⇒ Object
Returns the value of attribute healthz.
11 12 13 |
# File 'lib/nats/server/server.rb', line 11 def healthz @healthz end |
.id ⇒ Object (readonly)
Returns the value of attribute id.
9 10 11 |
# File 'lib/nats/server/server.rb', line 9 def id @id end |
.in_bytes ⇒ Object
Returns the value of attribute in_bytes.
11 12 13 |
# File 'lib/nats/server/server.rb', line 11 def in_bytes @in_bytes end |
.in_msgs ⇒ Object
Returns the value of attribute in_msgs.
11 12 13 |
# File 'lib/nats/server/server.rb', line 11 def in_msgs @in_msgs end |
.info ⇒ Object (readonly)
Returns the value of attribute info.
9 10 11 |
# File 'lib/nats/server/server.rb', line 9 def info @info end |
.log_time ⇒ Object (readonly)
Returns the value of attribute log_time.
9 10 11 |
# File 'lib/nats/server/server.rb', line 9 def log_time @log_time end |
.max_connections ⇒ Object
Returns the value of attribute max_connections.
11 12 13 |
# File 'lib/nats/server/server.rb', line 11 def max_connections @max_connections end |
.max_control_line ⇒ Object (readonly)
Returns the value of attribute max_control_line.
10 11 12 |
# File 'lib/nats/server/server.rb', line 10 def max_control_line @max_control_line end |
.max_payload ⇒ Object (readonly)
Returns the value of attribute max_payload.
10 11 12 |
# File 'lib/nats/server/server.rb', line 10 def max_payload @max_payload end |
.max_pending ⇒ Object (readonly)
Returns the value of attribute max_pending.
10 11 12 |
# File 'lib/nats/server/server.rb', line 10 def max_pending @max_pending end |
.num_connections ⇒ Object
Returns the value of attribute num_connections.
11 12 13 |
# File 'lib/nats/server/server.rb', line 11 def num_connections @num_connections end |
.options ⇒ Object (readonly)
Returns the value of attribute options.
9 10 11 |
# File 'lib/nats/server/server.rb', line 9 def @options end |
.out_bytes ⇒ Object
Returns the value of attribute out_bytes.
11 12 13 |
# File 'lib/nats/server/server.rb', line 11 def out_bytes @out_bytes end |
.out_msgs ⇒ Object
Returns the value of attribute out_msgs.
11 12 13 |
# File 'lib/nats/server/server.rb', line 11 def out_msgs @out_msgs end |
.ping_interval ⇒ Object (readonly)
Returns the value of attribute ping_interval.
10 11 12 |
# File 'lib/nats/server/server.rb', line 10 def ping_interval @ping_interval end |
.ping_max ⇒ Object (readonly)
Returns the value of attribute ping_max.
10 11 12 |
# File 'lib/nats/server/server.rb', line 10 def ping_max @ping_max end |
.ssl_required ⇒ Object (readonly) Also known as: ssl_required?
Returns the value of attribute ssl_required.
9 10 11 |
# File 'lib/nats/server/server.rb', line 9 def ssl_required @ssl_required end |
.ssl_timeout ⇒ Object (readonly)
Returns the value of attribute ssl_timeout.
10 11 12 |
# File 'lib/nats/server/server.rb', line 10 def ssl_timeout @ssl_timeout end |
.syslog ⇒ Object (readonly)
Returns the value of attribute syslog.
9 10 11 |
# File 'lib/nats/server/server.rb', line 9 def syslog @syslog end |
.trace_flag ⇒ Object (readonly) Also known as: trace_flag?
Returns the value of attribute trace_flag.
9 10 11 |
# File 'lib/nats/server/server.rb', line 9 def trace_flag @trace_flag end |
.varz ⇒ Object
Returns the value of attribute varz.
11 12 13 |
# File 'lib/nats/server/server.rb', line 11 def varz @varz end |
Class Method Details
.auth_ok?(user, pass) ⇒ Boolean
161 162 163 164 |
# File 'lib/nats/server/server.rb', line 161 def auth_ok?(user, pass) @options[:users].each { |u| return true if (user == u[:user] && pass == u[:pass]) } false end |
.cid ⇒ Object
166 167 168 |
# File 'lib/nats/server/server.rb', line 166 def cid @cid += 1 end |
.close_syslog ⇒ Object
126 127 128 |
# File 'lib/nats/server/options.rb', line 126 def close_syslog Syslog.close if @options[:syslog] end |
.deliver_to_subscriber(sub, subject, reply, msg) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/nats/server/server.rb', line 96 def deliver_to_subscriber(sub, subject, reply, msg) conn = sub.conn # Accounting @out_msgs += 1 conn.out_msgs += 1 unless msg.nil? mbs = msg.bytesize @out_bytes += mbs conn.out_bytes += mbs end conn.queue_data("MSG #{subject} #{sub.sid} #{reply}#{msg.bytesize}#{CR_LF}#{msg}#{CR_LF}") # Account for these response and check for auto-unsubscribe (pruning interest graph) sub.num_responses += 1 conn.delete_subscriber(sub) if (sub.max_responses && sub.num_responses >= sub.max_responses) # Check the outbound queue here and react if need be.. if (conn.get_outbound_data_size + conn.writev_size) > NATSD::Server.max_pending conn.error_close SLOW_CONSUMER maxp = pretty_size(NATSD::Server.max_pending) log "Slow consumer dropped, exceeded #{maxp} pending", conn.client_info end end |
.dump_connections ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/nats/server/connz.rb', line 37 def dump_connections conns, total = [], 0 ObjectSpace.each_object(NATSD::Connection) do |c| next if c.closing? total += c.info[:pending_size] conns << c.info end { :pending_size => total, :num_connections => conns.size, :connections => conns } end |
.finalize_options ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/nats/server/options.rb', line 139 def # Addr/Port @options[:port] ||= DEFAULT_PORT @options[:addr] ||= DEFAULT_HOST # Max Connections @options[:max_connections] ||= DEFAULT_MAX_CONNECTIONS @max_connections = @options[:max_connections] # Debug and Tracing @debug_flag = @options[:debug] @trace_flag = @options[:trace] # Log timestamps @log_time = @options[:log_time] debug @options # Block pass? debug "DEBUG is on" trace "TRACE is on" # Syslog @syslog = @options[:syslog] # Authorization # Multi-user setup for auth if @options[:user] # Multiple Users setup @options[:users] ||= [] @options[:users].unshift({:user => @options[:user], :pass => @options[:pass]}) if @options[:user] elsif @options[:users] first = @options[:users].first @options[:user], @options[:pass] = first[:user], first[:pass] end @auth_required = (not @options[:user].nil?) @ssl_required = (not @options[:ssl].nil?) # Pings @options[:ping_interval] ||= DEFAULT_PING_INTERVAL @ping_interval = @options[:ping_interval] @options[:ping_max] ||= DEFAULT_PING_MAX @ping_max = @options[:ping_max] # Thresholds @options[:max_control_line] ||= MAX_CONTROL_LINE_SIZE @max_control_line = @options[:max_control_line] @options[:max_payload] ||= MAX_PAYLOAD_SIZE @max_payload = @options[:max_payload] @options[:max_pending] ||= MAX_PENDING_SIZE @max_pending = @options[:max_pending] @options[:auth_timeout] ||= AUTH_TIMEOUT @auth_timeout = @options[:auth_timeout] @options[:ssl_timeout] ||= SSL_TIMEOUT @ssl_timeout = @options[:ssl_timeout] end |
.host ⇒ Object
20 |
# File 'lib/nats/server/server.rb', line 20 def host; @options[:addr] end |
.info_string ⇒ Object
170 171 172 |
# File 'lib/nats/server/server.rb', line 170 def info_string @info.to_json end |
.open_syslog ⇒ Object
121 122 123 124 |
# File 'lib/nats/server/options.rb', line 121 def open_syslog return unless @options[:syslog] Syslog.open("#{@options[:syslog]}", Syslog::LOG_PID, Syslog::LOG_USER ) unless Syslog.opened? end |
.parser ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/nats/server/options.rb', line 8 def parser @parser ||= OptionParser.new do |opts| opts. = "Usage: nats-server [options]" opts.separator "" opts.separator "Server options:" opts.on("-a", "--addr HOST", "Bind to HOST address " + "(default: #{DEFAULT_HOST})") { |host| @options[:addr] = host } opts.on("-p", "--port PORT", "Use PORT (default: #{DEFAULT_PORT})") { |port| @options[:port] = port.to_i } opts.on("-d", "--daemonize", "Run daemonized in the background") { @options[:daemonize] = true } opts.on("-P", "--pid FILE", "File to store PID") { |file| @options[:pid_file] = file } opts.on("-m", "--http_port PORT", "Use HTTP PORT ") { |port| @options[:http_port] = port.to_i } opts.on("-c", "--config FILE", "Configuration File") { |file| @options[:config_file] = file } opts.separator "" opts.separator "Logging options:" opts.on("-l", "--log FILE", "File to redirect log output") { |file| @options[:log_file] = file } opts.on("-T", "--logtime", "Timestamp log entries (default: false)") { @options[:log_time] = true } opts.on("-S", "--syslog IDENT", "Enable Syslog output") { |ident| @options[:syslog] = ident } opts.on("-D", "--debug", "Enable debugging output") { @options[:debug] = true } opts.on("-V", "--trace", "Trace the raw protocol") { @options[:trace] = true } opts.separator "" opts.separator "Authorization options:" opts.on("--user user", "User required for connections") { |user| @options[:user] = user } opts.on("--pass password", "Password required for connections") { |pass| @options[:pass] = pass } opts.separator "" opts.on("--ssl", "Enable SSL") { |ssl| @options[:ssl] = true } opts.separator "" opts.separator "Advanced IO options:" opts.on("--no_epoll", "Disable epoll (Linux)") { @options[:noepoll] = true } opts.on("--no_kqueue", "Disable kqueue (MacOSX and BSD)") { @options[:nokqueue] = true } opts.separator "" opts.separator "Common options:" opts.on_tail("-h", "--help", "Show this message") { puts opts; exit } opts.on_tail('-v', '--version', "Show version") { puts NATSD::Server.version; exit } end end |
.pid_file ⇒ Object
22 |
# File 'lib/nats/server/server.rb', line 22 def pid_file; @options[:pid_file] end |
.port ⇒ Object
21 |
# File 'lib/nats/server/server.rb', line 21 def port; @options[:port] end |
.process_options(argv = []) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/nats/server/server.rb', line 24 def (argv=[]) @options = {} # Allow command line to override config file, so do them first. parser.parse!(argv) read_config_file if @options[:config_file] rescue OptionParser::InvalidOption => e log_error "Error parsing options: #{e}" exit(1) end |
.read_config_file ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/nats/server/options.rb', line 57 def read_config_file return unless config_file = @options[:config_file] config = File.open(config_file) { |f| YAML.load(f) } # Command lines args, parsed first, will override these. @options[:port] = config['port'] if @options[:port].nil? @options[:addr] = config['net'] if @options[:addr].nil? if auth = config['authorization'] @options[:user] = auth['user'] if @options[:user].nil? @options[:pass] = auth['password'] if @options[:pass].nil? @options[:pass] = auth['pass'] if @options[:pass].nil? @options[:token] = auth['token'] if @options[:token].nil? @options[:auth_timeout] = auth['timeout'] if @options[:auth_timeout].nil? # Multiple Users setup @options[:users] = symbolize_users(auth['users']) || [] end # TLS/SSL @options[:ssl] = config['ssl'] if @options[:ssl].nil? @options[:pid_file] = config['pid_file'] if @options[:pid_file].nil? @options[:log_file] = config['log_file'] if @options[:log_file].nil? @options[:log_time] = config['logtime'] if @options[:log_time].nil? @options[:syslog] = config['syslog'] if @options[:syslog].nil? @options[:debug] = config['debug'] if @options[:debug].nil? @options[:trace] = config['trace'] if @options[:trace].nil? # these just override if present @options[:max_control_line] = config['max_control_line'] if config['max_control_line'] @options[:max_payload] = config['max_payload'] if config['max_payload'] @options[:max_pending] = config['max_pending'] if config['max_pending'] @options[:max_connections] = config['max_connections'] if config['max_connections'] # just set @options[:noepoll] = config['no_epoll'] if config['no_epoll'] @options[:nokqueue] = config['no_kqueue'] if config['no_kqueue'] if http = config['http'] if @options[:http_net].nil? @options[:http_net] = http['net'] || @options[:addr] end @options[:http_port] = http['port'] if @options[:http_port].nil? @options[:http_user] = http['user'] if @options[:http_user].nil? @options[:http_password] = http['password'] if @options[:http_password].nil? end if ping = config['ping'] @options[:ping_interval] = ping['interval'] if @options[:ping_interval].nil? @options[:ping_max] = ping['max_outstanding'] if @options[:ping_max].nil? end rescue => e log "Could not read configuration file: #{e}" exit 1 end |
.route_to_subscribers(subject, reply, msg) ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/nats/server/server.rb', line 122 def route_to_subscribers(subject, reply, msg) qsubs = nil # Allows nil reply to not have extra space reply = reply + ' ' if reply # Accounting @in_msgs += 1 @in_bytes += msg.bytesize unless msg.nil? @sublist.match(subject).each do |sub| # Skip anyone in the closing state next if sub.conn.closing unless sub[:qgroup] deliver_to_subscriber(sub, subject, reply, msg) else if NATSD::Server.trace_flag? trace("Matched queue subscriber", sub[:subject], sub[:qgroup], sub[:sid], sub.conn.client_info) end # Queue this for post processing qsubs ||= Hash.new qsubs[sub[:qgroup]] ||= [] qsubs[sub[:qgroup]] << sub end end return unless qsubs qsubs.each_value do |subs| # Randomly pick a subscriber from the group sub = subs[rand*subs.size] if NATSD::Server.trace_flag? trace("Selected queue subscriber", sub[:subject], sub[:qgroup], sub[:sid], sub.conn.client_info) end deliver_to_subscriber(sub, subject, reply, msg) end end |
.setup(argv) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/nats/server/server.rb', line 36 def setup(argv) (argv) @id, @cid = fast_uuid, 1 @sublist = Sublist.new @num_connections = 0 @in_msgs = @out_msgs = 0 @in_bytes = @out_bytes = 0 @info = { :server_id => Server.id, :host => host, :port => port, :version => VERSION, :auth_required => auth_required?, :ssl_required => ssl_required?, :max_payload => @max_payload } # Check for daemon flag if @options[:daemonize] require 'rubygems' require 'daemons' require 'tmpdir' unless @options[:log_file] # These log messages visible to controlling TTY log "Starting #{NATSD::APP_NAME} version #{NATSD::VERSION} on port #{NATSD::Server.port}" log "Starting http monitor on port #{@options[:http_port]}" if @options[:http_port] log "Switching to daemon mode" end opts = { :app_name => APP_NAME, :mode => :exec, :dir_mode => :normal, :dir => Dir.tmpdir } Daemons.daemonize(opts) FileUtils.rm_f("#{Dir.tmpdir}/#{APP_NAME}.pid") end setup_logs open_syslog # Setup optimized select versions EM.epoll unless @options[:noepoll] EM.kqueue unless @options[:nokqueue] # Write pid file if requested. File.open(@options[:pid_file], 'w') { |f| f.puts "#{Process.pid}" } if @options[:pid_file] end |
.setup_logs ⇒ Object
114 115 116 117 118 119 |
# File 'lib/nats/server/options.rb', line 114 def setup_logs return unless @options[:log_file] $stdout.reopen(@options[:log_file], 'a') $stdout.sync = true $stderr.reopen($stdout) end |
.start_http_server ⇒ Object
Monitoring
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/server/server.rb', line 175 def start_http_server return unless port = @options[:http_port] require 'thin' log "Starting http monitor on port #{port}" @healthz = "ok\n" @varz = { :start => Time.now, :options => @options, :cores => num_cpu_cores } http_server = Thin::Server.new(@options[:http_net], port, :signals => false) do Thin::Logging.silent = true if NATSD::Server.[:http_user] auth = [NATSD::Server.[:http_user], NATSD::Server.[:http_password]] use Rack::Auth::Basic do |username, password| [username, password] == auth end end map '/healthz' do run lambda { |env| [200, RACK_TEXT_HDR, NATSD::Server.healthz] } end map '/varz' do run Varz.new end map '/connz' do run Connz.new end end http_server.start! end |
.subscribe(sub) ⇒ Object
88 89 90 |
# File 'lib/nats/server/server.rb', line 88 def subscribe(sub) @sublist.insert(sub.subject, sub) end |
.symbolize_users(users) ⇒ Object
130 131 132 133 134 135 136 137 |
# File 'lib/nats/server/options.rb', line 130 def symbolize_users(users) return nil unless users auth_users = [] users.each do |u| auth_users << { :user => u['user'], :pass => u['pass'] || u['password'] } end auth_users end |
.unsubscribe(sub) ⇒ Object
92 93 94 |
# File 'lib/nats/server/server.rb', line 92 def unsubscribe(sub) @sublist.remove(sub.subject, sub) end |
.update_varz ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/nats/server/varz.rb', line 15 def update_varz # Snapshot uptime @varz[:uptime] = uptime_string(Time.now - @varz[:start]) # Grab current cpu and memory usage. rss, pcpu = `ps -o rss=,pcpu= -p #{Process.pid}`.split @varz[:mem] = rss.to_i @varz[:cpu] = pcpu.to_f @varz[:connections] = num_connections @varz[:in_msgs] = in_msgs @varz[:out_msgs] = out_msgs @varz[:in_bytes] = in_bytes @varz[:out_bytes] = out_bytes @last_varz_update = Time.now.to_f varz end |