Class: Fluent::Counter::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/counter/server.rb

Defined Under Namespace

Classes: Response

Constant Summary collapse

DEFAULT_ADDR =
'127.0.0.1'
DEFAULT_PORT =
24321

Instance Method Summary collapse

Constructor Details

#initialize(name, opt = {}) ⇒ Server

Returns a new instance of Server.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/counter/server.rb', line 29

def initialize(name, opt = {})
  raise 'Counter server name is invalid' unless Validator::VALID_NAME.match?(name)
  @name = name
  @opt = opt
  @addr = @opt[:addr] || DEFAULT_ADDR
  @port = @opt[:port] || DEFAULT_PORT
  @loop = @opt[:loop] || Coolio::Loop.new
  @log =  @opt[:log] || $log

  @store = Fluent::Counter::Store.new(opt)
  @mutex_hash = MutexHash.new(@store)

  @server = Coolio::TCPServer.new(@addr, @port, Handler, method(:on_message))
  @thread = nil
  @running = false
end

Instance Method Details

#on_message(data) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/counter/server.rb', line 68

def on_message(data)
  errors = Validator.request(data)
  unless errors.empty?
    return { 'id' => data['id'], 'data' => [], 'errors' => errors }
  end

  result = safe_run do
    send(data['method'], data['params'], data['scope'], data['options'])
  end
  result.merge('id' => data['id'])
rescue => e
  @log.error e.to_s
end

#startObject



46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fluent/counter/server.rb', line 46

def start
  @server.attach(@loop)
  @thread = Thread.new do
    @running = true
    @loop.run(0.5)
    @running = false
  end
  @log.debug("starting counter server #{@addr}:#{@port}")
  @mutex_hash.start
  self
end

#stopObject



58
59
60
61
62
63
64
65
66
# File 'lib/fluent/counter/server.rb', line 58

def stop
  # This `sleep` for a test to wait for a `@loop` to begin to run
  sleep 0.1
  @server.close
  @loop.stop if @running
  @mutex_hash.stop
  @thread.join if @thread
  @log.debug("calling stop in counter server #{@addr}:#{@port}")
end