Class: NATSD::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/nats/server/varz.rb,
lib/nats/server/connz.rb,
lib/nats/server/server.rb,
lib/nats/server/options.rb

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.auth_requiredObject (readonly) Also known as: auth_required?

Returns the value of attribute auth_required.



9
10
11
# File 'lib/nats/server/server.rb', line 9

def auth_required
  @auth_required
end

.auth_timeoutObject (readonly)

Returns the value of attribute auth_timeout.



10
11
12
# File 'lib/nats/server/server.rb', line 10

def auth_timeout
  @auth_timeout
end

.debug_flagObject (readonly) Also known as: debug_flag?

Returns the value of attribute debug_flag.



9
10
11
# File 'lib/nats/server/server.rb', line 9

def debug_flag
  @debug_flag
end

.healthzObject

Returns the value of attribute healthz.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def healthz
  @healthz
end

.idObject (readonly)

Returns the value of attribute id.



9
10
11
# File 'lib/nats/server/server.rb', line 9

def id
  @id
end

.in_bytesObject

Returns the value of attribute in_bytes.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def in_bytes
  @in_bytes
end

.in_msgsObject

Returns the value of attribute in_msgs.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def in_msgs
  @in_msgs
end

.infoObject (readonly)

Returns the value of attribute info.



9
10
11
# File 'lib/nats/server/server.rb', line 9

def info
  @info
end

.log_timeObject (readonly)

Returns the value of attribute log_time.



9
10
11
# File 'lib/nats/server/server.rb', line 9

def log_time
  @log_time
end

.max_connectionsObject

Returns the value of attribute max_connections.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def max_connections
  @max_connections
end

.max_control_lineObject (readonly)

Returns the value of attribute max_control_line.



10
11
12
# File 'lib/nats/server/server.rb', line 10

def max_control_line
  @max_control_line
end

.max_payloadObject (readonly)

Returns the value of attribute max_payload.



10
11
12
# File 'lib/nats/server/server.rb', line 10

def max_payload
  @max_payload
end

.max_pendingObject (readonly)

Returns the value of attribute max_pending.



10
11
12
# File 'lib/nats/server/server.rb', line 10

def max_pending
  @max_pending
end

.num_connectionsObject

Returns the value of attribute num_connections.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def num_connections
  @num_connections
end

.optionsObject (readonly)

Returns the value of attribute options.



9
10
11
# File 'lib/nats/server/server.rb', line 9

def options
  @options
end

.out_bytesObject

Returns the value of attribute out_bytes.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def out_bytes
  @out_bytes
end

.out_msgsObject

Returns the value of attribute out_msgs.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def out_msgs
  @out_msgs
end

.ping_intervalObject (readonly)

Returns the value of attribute ping_interval.



10
11
12
# File 'lib/nats/server/server.rb', line 10

def ping_interval
  @ping_interval
end

.ping_maxObject (readonly)

Returns the value of attribute ping_max.



10
11
12
# File 'lib/nats/server/server.rb', line 10

def ping_max
  @ping_max
end

.ssl_requiredObject (readonly) Also known as: ssl_required?

Returns the value of attribute ssl_required.



9
10
11
# File 'lib/nats/server/server.rb', line 9

def ssl_required
  @ssl_required
end

.ssl_timeoutObject (readonly)

Returns the value of attribute ssl_timeout.



10
11
12
# File 'lib/nats/server/server.rb', line 10

def ssl_timeout
  @ssl_timeout
end

.syslogObject (readonly)

Returns the value of attribute syslog.



9
10
11
# File 'lib/nats/server/server.rb', line 9

def syslog
  @syslog
end

.trace_flagObject (readonly) Also known as: trace_flag?

Returns the value of attribute trace_flag.



9
10
11
# File 'lib/nats/server/server.rb', line 9

def trace_flag
  @trace_flag
end

.varzObject

Returns the value of attribute varz.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def varz
  @varz
end

Class Method Details

.auth_ok?(user, pass) ⇒ Boolean

Returns:

  • (Boolean)


161
162
163
164
# File 'lib/nats/server/server.rb', line 161

def auth_ok?(user, pass)
  @options[:users].each { |u| return true if (user == u[:user] && pass == u[:pass]) }
  false
end

.cidObject



166
167
168
# File 'lib/nats/server/server.rb', line 166

def cid
  @cid += 1
end

.close_syslogObject



126
127
128
# File 'lib/nats/server/options.rb', line 126

def close_syslog
  Syslog.close if @options[:syslog]
end

.deliver_to_subscriber(sub, subject, reply, msg) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/nats/server/server.rb', line 96

def deliver_to_subscriber(sub, subject, reply, msg)
  conn = sub.conn

  # Accounting
  @out_msgs += 1
  conn.out_msgs += 1
  unless msg.nil?
    mbs = msg.bytesize
    @out_bytes += mbs
    conn.out_bytes += mbs
  end

  conn.queue_data("MSG #{subject} #{sub.sid} #{reply}#{msg.bytesize}#{CR_LF}#{msg}#{CR_LF}")

  # Account for these response and check for auto-unsubscribe (pruning interest graph)
  sub.num_responses += 1
  conn.delete_subscriber(sub) if (sub.max_responses && sub.num_responses >= sub.max_responses)

  # Check the outbound queue here and react if need be..
  if (conn.get_outbound_data_size + conn.writev_size) > NATSD::Server.max_pending
    conn.error_close SLOW_CONSUMER
    maxp = pretty_size(NATSD::Server.max_pending)
    log "Slow consumer dropped, exceeded #{maxp} pending", conn.client_info
  end
end

.dump_connectionsObject



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/nats/server/connz.rb', line 37

def dump_connections
  conns, total = [], 0
  ObjectSpace.each_object(NATSD::Connection) do |c|
    next if c.closing?
    total += c.info[:pending_size]
    conns << c.info
  end
  {
    :pending_size => total,
    :num_connections => conns.size,
    :connections => conns
  }
end

.finalize_optionsObject



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/nats/server/options.rb', line 139

def finalize_options
  # Addr/Port
  @options[:port] ||= DEFAULT_PORT
  @options[:addr] ||= DEFAULT_HOST

  # Max Connections
  @options[:max_connections] ||= DEFAULT_MAX_CONNECTIONS
  @max_connections = @options[:max_connections]

  # Debug and Tracing
  @debug_flag = @options[:debug]
  @trace_flag = @options[:trace]

  # Log timestamps
  @log_time = @options[:log_time]

  debug @options # Block pass?
  debug "DEBUG is on"
  trace "TRACE is on"

  # Syslog
  @syslog = @options[:syslog]

  # Authorization

  # Multi-user setup for auth
  if @options[:user]
    # Multiple Users setup
    @options[:users] ||= []
    @options[:users].unshift({:user => @options[:user], :pass => @options[:pass]}) if @options[:user]
  elsif @options[:users]
    first = @options[:users].first
    @options[:user], @options[:pass] = first[:user], first[:pass]
  end

  @auth_required = (not @options[:user].nil?)

  @ssl_required = (not @options[:ssl].nil?)

  # Pings
  @options[:ping_interval] ||= DEFAULT_PING_INTERVAL
  @ping_interval = @options[:ping_interval]

  @options[:ping_max] ||= DEFAULT_PING_MAX
  @ping_max = @options[:ping_max]

  # Thresholds
  @options[:max_control_line] ||= MAX_CONTROL_LINE_SIZE
  @max_control_line = @options[:max_control_line]

  @options[:max_payload] ||= MAX_PAYLOAD_SIZE
  @max_payload = @options[:max_payload]

  @options[:max_pending] ||= MAX_PENDING_SIZE
  @max_pending = @options[:max_pending]

  @options[:auth_timeout] ||= AUTH_TIMEOUT
  @auth_timeout = @options[:auth_timeout]

  @options[:ssl_timeout] ||= SSL_TIMEOUT
  @ssl_timeout = @options[:ssl_timeout]
end

.hostObject



20
# File 'lib/nats/server/server.rb', line 20

def host; @options[:addr] end

.info_stringObject



170
171
172
# File 'lib/nats/server/server.rb', line 170

def info_string
  @info.to_json
end

.open_syslogObject



121
122
123
124
# File 'lib/nats/server/options.rb', line 121

def open_syslog
  return unless @options[:syslog]
  Syslog.open("#{@options[:syslog]}", Syslog::LOG_PID,  Syslog::LOG_USER ) unless Syslog.opened? 
end

.parserObject



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/nats/server/options.rb', line 8

def parser
  @parser ||= OptionParser.new do |opts|
    opts.banner = "Usage: nats-server [options]"

    opts.separator ""
    opts.separator "Server options:"

    opts.on("-a", "--addr HOST", "Bind to HOST address " +
                                 "(default: #{DEFAULT_HOST})")           { |host| @options[:addr] = host }
    opts.on("-p", "--port PORT", "Use PORT (default: #{DEFAULT_PORT})")  { |port| @options[:port] = port.to_i }
    opts.on("-d", "--daemonize", "Run daemonized in the background")     { @options[:daemonize] = true }
    opts.on("-P", "--pid FILE",  "File to store PID")                    { |file| @options[:pid_file] = file }

    opts.on("-m", "--http_port PORT", "Use HTTP PORT ")                  { |port| @options[:http_port] = port.to_i }

    opts.on("-c", "--config FILE", "Configuration File")                 { |file| @options[:config_file] = file }

    opts.separator ""
    opts.separator "Logging options:"

    opts.on("-l", "--log FILE", "File to redirect log output")           { |file| @options[:log_file] = file }
    opts.on("-T", "--logtime", "Timestamp log entries (default: false)") { @options[:log_time] = true }
    opts.on("-S", "--syslog IDENT", "Enable Syslog output")              { |ident| @options[:syslog] = ident }
    opts.on("-D", "--debug", "Enable debugging output")                  { @options[:debug] = true }
    opts.on("-V", "--trace", "Trace the raw protocol")                   { @options[:trace] = true }

    opts.separator ""
    opts.separator "Authorization options:"

    opts.on("--user user", "User required for connections")              { |user| @options[:user] = user }
    opts.on("--pass password", "Password required for connections")      { |pass| @options[:pass] = pass }

    opts.separator ""
    opts.on("--ssl", "Enable SSL")                                       { |ssl| @options[:ssl] = true }

    opts.separator ""
    opts.separator "Advanced IO options:"

    opts.on("--no_epoll", "Disable epoll (Linux)")                       { @options[:noepoll] = true }
    opts.on("--no_kqueue", "Disable kqueue (MacOSX and BSD)")            { @options[:nokqueue] = true }

    opts.separator ""
    opts.separator "Common options:"

    opts.on_tail("-h", "--help", "Show this message")                    { puts opts; exit }
    opts.on_tail('-v', '--version', "Show version")                      { puts NATSD::Server.version; exit }
  end
end

.pid_fileObject



22
# File 'lib/nats/server/server.rb', line 22

def pid_file; @options[:pid_file] end

.portObject



21
# File 'lib/nats/server/server.rb', line 21

def port; @options[:port] end

.process_options(argv = []) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/nats/server/server.rb', line 24

def process_options(argv=[])
  @options = {}

  # Allow command line to override config file, so do them first.
  parser.parse!(argv)
  read_config_file if @options[:config_file]
  finalize_options
rescue OptionParser::InvalidOption => e
  log_error "Error parsing options: #{e}"
  exit(1)
end

.read_config_fileObject



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/nats/server/options.rb', line 57

def read_config_file
  return unless config_file = @options[:config_file]
  config = File.open(config_file) { |f| YAML.load(f) }

  # Command lines args, parsed first, will override these.
  @options[:port] = config['port'] if @options[:port].nil?
  @options[:addr] = config['net'] if @options[:addr].nil?

  if auth = config['authorization']
    @options[:user] = auth['user'] if @options[:user].nil?
    @options[:pass] = auth['password'] if @options[:pass].nil?
    @options[:pass] = auth['pass'] if @options[:pass].nil?
    @options[:token] = auth['token'] if @options[:token].nil?
    @options[:auth_timeout] = auth['timeout'] if @options[:auth_timeout].nil?
    # Multiple Users setup
    @options[:users] = symbolize_users(auth['users']) || []
  end

  # TLS/SSL
  @options[:ssl] = config['ssl'] if @options[:ssl].nil?

  @options[:pid_file] = config['pid_file'] if @options[:pid_file].nil?
  @options[:log_file] = config['log_file'] if @options[:log_file].nil?
  @options[:log_time] = config['logtime'] if @options[:log_time].nil?
  @options[:syslog] = config['syslog'] if @options[:syslog].nil?
  @options[:debug] = config['debug'] if @options[:debug].nil?
  @options[:trace] = config['trace'] if @options[:trace].nil?

  # these just override if present
  @options[:max_control_line] = config['max_control_line'] if config['max_control_line']
  @options[:max_payload] = config['max_payload'] if config['max_payload']
  @options[:max_pending] = config['max_pending'] if config['max_pending']
  @options[:max_connections] = config['max_connections'] if config['max_connections']

  # just set
  @options[:noepoll]  = config['no_epoll'] if config['no_epoll']
  @options[:nokqueue] = config['no_kqueue'] if config['no_kqueue']

  if http = config['http']
    if @options[:http_net].nil?
      @options[:http_net] = http['net'] || @options[:addr]
    end
    @options[:http_port] = http['port'] if @options[:http_port].nil?
    @options[:http_user] = http['user'] if @options[:http_user].nil?
    @options[:http_password] = http['password'] if @options[:http_password].nil?
  end

  if ping = config['ping']
    @options[:ping_interval] = ping['interval'] if @options[:ping_interval].nil?
    @options[:ping_max] = ping['max_outstanding'] if @options[:ping_max].nil?
  end

rescue => e
  log "Could not read configuration file:  #{e}"
  exit 1
end

.route_to_subscribers(subject, reply, msg) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/nats/server/server.rb', line 122

def route_to_subscribers(subject, reply, msg)
  qsubs = nil

  # Allows nil reply to not have extra space
  reply = reply + ' ' if reply

  # Accounting
  @in_msgs += 1
  @in_bytes += msg.bytesize unless msg.nil?

  @sublist.match(subject).each do |sub|
    # Skip anyone in the closing state
    next if sub.conn.closing

    unless sub[:qgroup]
      deliver_to_subscriber(sub, subject, reply, msg)
    else
      if NATSD::Server.trace_flag?
        trace("Matched queue subscriber", sub[:subject], sub[:qgroup], sub[:sid], sub.conn.client_info)
      end
      # Queue this for post processing
      qsubs ||= Hash.new
      qsubs[sub[:qgroup]] ||= []
      qsubs[sub[:qgroup]] << sub
    end
  end

  return unless qsubs

  qsubs.each_value do |subs|
    # Randomly pick a subscriber from the group
    sub = subs[rand*subs.size]
    if NATSD::Server.trace_flag?
      trace("Selected queue subscriber", sub[:subject], sub[:qgroup], sub[:sid], sub.conn.client_info)
    end
    deliver_to_subscriber(sub, subject, reply, msg)
  end
end

.setup(argv) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/nats/server/server.rb', line 36

def setup(argv)
  process_options(argv)

  @id, @cid = fast_uuid, 1
  @sublist = Sublist.new

  @num_connections = 0
  @in_msgs = @out_msgs = 0
  @in_bytes = @out_bytes = 0

  @info = {
    :server_id => Server.id,
    :host => host,
    :port => port,
    :version => VERSION,
    :auth_required => auth_required?,
    :ssl_required => ssl_required?,
    :max_payload => @max_payload
  }

  # Check for daemon flag
  if @options[:daemonize]
    require 'rubygems'
    require 'daemons'
    require 'tmpdir'
    unless @options[:log_file]
      # These log messages visible to controlling TTY
      log "Starting #{NATSD::APP_NAME} version #{NATSD::VERSION} on port #{NATSD::Server.port}"
      log "Starting http monitor on port #{@options[:http_port]}" if @options[:http_port]
      log "Switching to daemon mode"
    end
    opts = {
      :app_name => APP_NAME,
      :mode => :exec,
      :dir_mode => :normal,
      :dir => Dir.tmpdir
    }
    Daemons.daemonize(opts)
    FileUtils.rm_f("#{Dir.tmpdir}/#{APP_NAME}.pid")
  end

  setup_logs
  open_syslog

  # Setup optimized select versions
  EM.epoll unless @options[:noepoll]
  EM.kqueue unless @options[:nokqueue]

  # Write pid file if requested.
  File.open(@options[:pid_file], 'w') { |f| f.puts "#{Process.pid}" } if @options[:pid_file]
end

.setup_logsObject



114
115
116
117
118
119
# File 'lib/nats/server/options.rb', line 114

def setup_logs
  return unless @options[:log_file]
  $stdout.reopen(@options[:log_file], 'a')
  $stdout.sync = true
  $stderr.reopen($stdout)
end

.start_http_serverObject

Monitoring



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/nats/server/server.rb', line 175

def start_http_server
  return unless port = @options[:http_port]

  require 'thin'

  log "Starting http monitor on port #{port}"

  @healthz = "ok\n"

  @varz = {
    :start => Time.now,
    :options => @options,
    :cores => num_cpu_cores
  }

  http_server = Thin::Server.new(@options[:http_net], port, :signals => false) do
    Thin::Logging.silent = true
    if NATSD::Server.options[:http_user]
      auth = [NATSD::Server.options[:http_user], NATSD::Server.options[:http_password]]
      use Rack::Auth::Basic do |username, password|
        [username, password] == auth
      end
    end
    map '/healthz' do
      run lambda { |env| [200, RACK_TEXT_HDR, NATSD::Server.healthz] }
    end
    map '/varz' do
      run Varz.new
    end
    map '/connz' do
      run Connz.new
    end
  end
  http_server.start!
end

.subscribe(sub) ⇒ Object



88
89
90
# File 'lib/nats/server/server.rb', line 88

def subscribe(sub)
  @sublist.insert(sub.subject, sub)
end

.symbolize_users(users) ⇒ Object



130
131
132
133
134
135
136
137
# File 'lib/nats/server/options.rb', line 130

def symbolize_users(users)
  return nil unless users
  auth_users = []
  users.each do |u|
    auth_users << { :user => u['user'], :pass => u['pass'] || u['password'] }
  end
  auth_users
end

.unsubscribe(sub) ⇒ Object



92
93
94
# File 'lib/nats/server/server.rb', line 92

def unsubscribe(sub)
  @sublist.remove(sub.subject, sub)
end

.update_varzObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/nats/server/varz.rb', line 15

def update_varz
  # Snapshot uptime
  @varz[:uptime] = uptime_string(Time.now - @varz[:start])

  # Grab current cpu and memory usage.
  rss, pcpu = `ps -o rss=,pcpu= -p #{Process.pid}`.split
  @varz[:mem] = rss.to_i
  @varz[:cpu] = pcpu.to_f
  @varz[:connections] = num_connections
  @varz[:in_msgs] = in_msgs
  @varz[:out_msgs] = out_msgs
  @varz[:in_bytes] = in_bytes
  @varz[:out_bytes] = out_bytes
  @last_varz_update = Time.now.to_f
  varz
end

.versionObject



18
# File 'lib/nats/server/server.rb', line 18

def version; "nats-server version #{NATSD::VERSION}" end