Class: Fluent::Counter::Server
- Inherits:
-
Object
- Object
- Fluent::Counter::Server
- Defined in:
- lib/fluent/counter/server.rb
Defined Under Namespace
Classes: Response
Constant Summary collapse
- DEFAULT_ADDR =
'127.0.0.1'
- DEFAULT_PORT =
24321
Instance Method Summary collapse
-
#initialize(name, opt = {}) ⇒ Server
constructor
A new instance of Server.
- #on_message(data) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(name, opt = {}) ⇒ Server
Returns a new instance of Server.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fluent/counter/server.rb', line 29 def initialize(name, opt = {}) raise 'Counter server name is invalid' unless Validator::VALID_NAME.match?(name) @name = name @opt = opt @addr = @opt[:addr] || DEFAULT_ADDR @port = @opt[:port] || DEFAULT_PORT @loop = @opt[:loop] || Coolio::Loop.new @log = @opt[:log] || $log @store = Fluent::Counter::Store.new(opt) @mutex_hash = MutexHash.new(@store) @server = Coolio::TCPServer.new(@addr, @port, Handler, method(:on_message)) @thread = nil @running = false end |
Instance Method Details
#on_message(data) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/fluent/counter/server.rb', line 68 def (data) errors = Validator.request(data) unless errors.empty? return { 'id' => data['id'], 'data' => [], 'errors' => errors } end result = safe_run do send(data['method'], data['params'], data['scope'], data['options']) end result.merge('id' => data['id']) rescue => e @log.error e.to_s end |
#start ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/fluent/counter/server.rb', line 46 def start @server.attach(@loop) @thread = Thread.new do @running = true @loop.run(0.5) @running = false end @log.debug("starting counter server #{@addr}:#{@port}") @mutex_hash.start self end |
#stop ⇒ Object
58 59 60 61 62 63 64 65 66 |
# File 'lib/fluent/counter/server.rb', line 58 def stop # This `sleep` for a test to wait for a `@loop` to begin to run sleep 0.1 @server.close @loop.stop if @running @mutex_hash.stop @thread.join if @thread @log.debug("calling stop in counter server #{@addr}:#{@port}") end |