Class: CloudPI::AppMetric

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudpi-appmetric.rb

Constant Summary collapse

SLEEP_TIME =

sleep time in thread loop - checking queue, retrying to connect.

3.0

Instance Method Summary collapse

Constructor Details

#initialize(bridge_ip = 'localhost', bridge_port = 5999, policy = '{}') ⇒ AppMetric

sec



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/cloudpi-appmetric.rb', line 13

def initialize(bridge_ip = 'localhost', bridge_port = 5999, policy = '{}')
  @bridge_ip     = bridge_ip
  @bridge_port   = bridge_port
  @queue         = SizedQueue.new(4096)
  @mutex         = Mutex.new
  
  begin
    json         = JSON.parse(ENV['VCAP_APPLICATION'], {:symbolize_names => true})
    @app_ip      = json[:host]
    @app_port    = json[:port]
    @app_id      = json[:app_id]
    @instance_id = json[:instance_id]
  rescue JSON::ParserError, TypeError
    # parsing error: ENV['VCAP_APPLICATION']
    # log error "parsing error: ENV['VCAP_APPLICATION']"
  end
  
  @policy           = parse(policy)
  @policy_published = false
  
  # log info "key: @#!$!"
  connect
  run
end

Instance Method Details

#add_policy(policy) ⇒ Object



94
95
96
# File 'lib/cloudpi-appmetric.rb', line 94

def add_policy(policy)
  # TODO
end

#circular_enqueue(obj) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/cloudpi-appmetric.rb', line 71

def circular_enqueue(obj)
  # enqueue like a circular-queue
  # store messages into queue then loop thread will push to bridge
  @mutex.synchronize {
    begin
      while @queue.length >= @queue.max do
        # queue is full.
        # log info "queue is full."
        @queue.pop(true)
      end
    rescue ThreadError
      # queue is empty.
      # log info "queue is empty."
    end
    @queue.push(obj)
  }
end

#closeObject



51
52
53
54
# File 'lib/cloudpi-appmetric.rb', line 51

def close
  @conn.close if @conn
  # log info "connection closed"
end

#connectObject



38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/cloudpi-appmetric.rb', line 38

def connect
  begin
    @conn = TCPSocket.new(@bridge_ip, @bridge_port)
    # log info "connected to bridge"
    @policy_published = false
    send_policy
  rescue
    # ignore connection error
    # log error "cann't connect to bridge."
    nil
  end
end

#keyObject



56
57
58
# File 'lib/cloudpi-appmetric.rb', line 56

def key
  :"#{@app_ip}:#{@app_port}:#{@app_id}:#{@instance_id}"
end

#parse(raw) ⇒ Object



60
61
62
63
64
65
66
67
68
69
# File 'lib/cloudpi-appmetric.rb', line 60

def parse(raw)
  begin
    json = JSON.parse(raw, {:symbolize_names => true})
    json = {} unless json.is_a?(Hash)
    json
  rescue JSON::ParserError
    # log error "parse error"
    {}
  end
end

#remove_policy(policy) ⇒ Object



98
99
100
# File 'lib/cloudpi-appmetric.rb', line 98

def remove_policy(policy)
  # TODO
end

#runObject



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/cloudpi-appmetric.rb', line 129

def run
  # no needs to send heartbeat, dea will do that..

  t = Thread.new do
    loop do

      # log debug "queue capacity: #{@queue.length}/#{@queue.max}"
      while !@queue.empty? do
        @msg = @queue.pop unless @msg  # @msg stores last failed msg.
        begin
          @conn.write("#{@msg}\0")
          # log debug "send msg: #{@msg}"
          @msg = nil
        rescue
          # retry to connect
          # log error "cann't write to bridge."
          sleep(SLEEP_TIME)
          connect
        end
      end # while

      sleep(SLEEP_TIME) # queue is empty
    end
  end
  
  set_trap(t)
end

#send_heartbeatObject



102
103
104
105
# File 'lib/cloudpi-appmetric.rb', line 102

def send_heartbeat
  # log debug "heartbeat into queue"
  circular_enqueue({:"#{key}" => {:alive => Time.now.to_i}}.to_json)
end

#send_metric(metric) ⇒ Object



114
115
116
117
118
119
# File 'lib/cloudpi-appmetric.rb', line 114

def send_metric(metric)
  # log debug "metric into queue"
  msg = parse(metric)
  msg[:when] = Time.now.to_i
  circular_enqueue({:"#{key}" => msg}.to_json)
end

#send_policyObject



107
108
109
110
111
112
# File 'lib/cloudpi-appmetric.rb', line 107

def send_policy
  # log debug "policy into queue"
  @policy[:when] = Time.now.to_i
  circular_enqueue({:"#{key}" => @policy}.to_json)
  @policy_published = true;
end

#set_policy(policy) ⇒ Object



89
90
91
92
# File 'lib/cloudpi-appmetric.rb', line 89

def set_policy(policy)
  @policy = parse(policy)
  # log info "set_policy: #{@policy.to_json}"
end

#set_trap(thread) ⇒ Object



121
122
123
124
125
126
127
# File 'lib/cloudpi-appmetric.rb', line 121

def set_trap(thread)
  # Thread exit when signal triggered
  Signal.trap("TERM") do
    # log info "TERM signal received."
    Thread.kill(thread) if thread
  end
end