Class: Fluent::Counter::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/counter/client.rb

Constant Summary collapse

DEFAULT_PORT =
24321
DEFAULT_ADDR =
'127.0.0.1'
DEFAULT_TIMEOUT =
5
ID_LIMIT_COUNT =
1 << 31

Instance Method Summary collapse

Constructor Details

#initialize(loop = nil, opt = {}) ⇒ Client

Returns a new instance of Client.



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/fluent/counter/client.rb', line 30

def initialize(loop = nil, opt = {})
  @loop = loop || Coolio::Loop.new
  @port = opt[:port] || DEFAULT_PORT
  @host = opt[:host] || DEFAULT_ADDR
  @log = opt[:log] || $log
  @timeout = opt[:timeout] || DEFAULT_TIMEOUT
  @conn = Connection.connect(@host, @port, method(:on_message))
  @responses = {}
  @id = 0
  @id_mutex = Mutex.new
  @loop_mutex = Mutex.new
end

Instance Method Details

#delete(*params, options: {}) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/fluent/counter/client.rb', line 95

def delete(*params, options: {})
  exist_scope!
  res = send_request('delete', @scope, params, options)

  if block_given?
    Thread.start do
      yield res.get
    end
  else
    res
  end
end

#establish(scope) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
# File 'lib/fluent/counter/client.rb', line 60

def establish(scope)
  scope = Timeout.timeout(@timeout) {
    response = send_request('establish', nil, [scope])
    Fluent::Counter.raise_error(response.errors.first) if response.errors?
    data = response.data
    data.first
  }
  @scope = scope
rescue Timeout::Error
  raise "Can't establish the connection to counter server due to timeout"
end

#get(*params, options: {}) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/fluent/counter/client.rb', line 129

def get(*params, options: {})
  exist_scope!
  res = send_request('get', @scope, params, options)

  if block_given?
    Thread.start do
      yield res.get
    end
  else
    res
  end
end

#inc(params, options: {}) ⇒ Object

Example

‘inc` receives various arguments.

  1. inc(name: ‘name’)

  2. inc({ name: ‘name’,value: 20 }, options: {})

  3. inc([{ name: ‘name1’,value: 20 }, { name: ‘name2’,value: 20 }])

  4. inc([{ name: ‘name1’,value: 20 }, { name: ‘name2’,value: 20 }], options: {})



115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/fluent/counter/client.rb', line 115

def inc(params, options: {})
  exist_scope!
  params = [params] unless params.is_a?(Array)
  res = send_request('inc', @scope, params, options)

  if block_given?
    Thread.start do
      yield res.get
    end
  else
    res
  end
end

#init(params, options: {}) ⇒ Object

Example

‘init` receives various arguments.

  1. init(name: ‘name’)

  2. init({ name: ‘name’,reset_interval: 20 }, options: {})

  3. init([{ name: ‘name1’,reset_interval: 20 }, { name: ‘name2’,reset_interval: 20 }])

  4. init([{ name: ‘name1’,reset_interval: 20 }, { name: ‘name2’,reset_interval: 20 }], options: {})

  5. init([{ name: ‘name1’,reset_interval: 20 }, { name: ‘name2’,reset_interval: 20 }]) { |res| … }



80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/fluent/counter/client.rb', line 80

def init(params, options: {})
  exist_scope!
  params = [params] unless params.is_a?(Array)
  res = send_request('init', @scope, params, options)

  # if `async` is false or missing, block at this method and return a Future::Result object.
  if block_given?
    Thread.start do
      yield res.get
    end
  else
    res
  end
end

#reset(*params, options: {}) ⇒ Object



142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/fluent/counter/client.rb', line 142

def reset(*params, options: {})
  exist_scope!
  res = send_request('reset', @scope, params, options)

  if block_given?
    Thread.start do
      yield res.get
    end
  else
    res
  end
end

#startObject



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fluent/counter/client.rb', line 43

def start
  @loop.attach(@conn)
  @log.debug("starting counter client: #{@host}:#{@port}")
  self
rescue => e
  if @log
    @log.error e
  else
    STDERR.puts e
  end
end

#stopObject



55
56
57
58
# File 'lib/fluent/counter/client.rb', line 55

def stop
  @conn.close
  @log.debug("calling stop in counter client: #{@host}:#{@port}")
end