Class: CloudPI::AppMetric
- Inherits:
-
Object
- Object
- CloudPI::AppMetric
- Defined in:
- lib/cloudpi-appmetric.rb
Constant Summary collapse
- SLEEP_TIME =
sleep time in thread loop - checking queue, retrying to connect.
3.0
Instance Method Summary collapse
- #add_policy(policy) ⇒ Object
- #circular_enqueue(obj) ⇒ Object
- #close ⇒ Object
- #connect ⇒ Object
-
#initialize(bridge_ip = 'localhost', bridge_port = 5999, policy = '{}') ⇒ AppMetric
constructor
sec.
- #key ⇒ Object
- #parse(raw) ⇒ Object
- #remove_policy(policy) ⇒ Object
- #run ⇒ Object
- #send_heartbeat ⇒ Object
- #send_metric(metric) ⇒ Object
- #send_policy ⇒ Object
- #set_policy(policy) ⇒ Object
- #set_trap(thread) ⇒ Object
Constructor Details
#initialize(bridge_ip = 'localhost', bridge_port = 5999, policy = '{}') ⇒ AppMetric
sec
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/cloudpi-appmetric.rb', line 13 def initialize(bridge_ip = 'localhost', bridge_port = 5999, policy = '{}') @bridge_ip = bridge_ip @bridge_port = bridge_port @queue = SizedQueue.new(4096) @mutex = Mutex.new begin json = JSON.parse(ENV['VCAP_APPLICATION'], {:symbolize_names => true}) @app_ip = json[:host] @app_port = json[:port] @app_id = json[:app_id] @instance_id = json[:instance_id] rescue JSON::ParserError, TypeError # parsing error: ENV['VCAP_APPLICATION'] # log error "parsing error: ENV['VCAP_APPLICATION']" end @policy = parse(policy) @policy_published = false # log info "key: @#!$!" connect run end |
Instance Method Details
#add_policy(policy) ⇒ Object
94 95 96 |
# File 'lib/cloudpi-appmetric.rb', line 94 def add_policy(policy) # TODO end |
#circular_enqueue(obj) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/cloudpi-appmetric.rb', line 71 def circular_enqueue(obj) # enqueue like a circular-queue # store messages into queue then loop thread will push to bridge @mutex.synchronize { begin while @queue.length >= @queue.max do # queue is full. # log info "queue is full." @queue.pop(true) end rescue ThreadError # queue is empty. # log info "queue is empty." end @queue.push(obj) } end |
#close ⇒ Object
51 52 53 54 |
# File 'lib/cloudpi-appmetric.rb', line 51 def close @conn.close if @conn # log info "connection closed" end |
#connect ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/cloudpi-appmetric.rb', line 38 def connect begin @conn = TCPSocket.new(@bridge_ip, @bridge_port) # log info "connected to bridge" @policy_published = false send_policy rescue # ignore connection error # log error "cann't connect to bridge." nil end end |
#key ⇒ Object
56 57 58 |
# File 'lib/cloudpi-appmetric.rb', line 56 def key :"#{@app_ip}:#{@app_port}:#{@app_id}:#{@instance_id}" end |
#parse(raw) ⇒ Object
60 61 62 63 64 65 66 67 68 69 |
# File 'lib/cloudpi-appmetric.rb', line 60 def parse(raw) begin json = JSON.parse(raw, {:symbolize_names => true}) json = {} unless json.is_a?(Hash) json rescue JSON::ParserError # log error "parse error" {} end end |
#remove_policy(policy) ⇒ Object
98 99 100 |
# File 'lib/cloudpi-appmetric.rb', line 98 def remove_policy(policy) # TODO end |
#run ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/cloudpi-appmetric.rb', line 129 def run # no needs to send heartbeat, dea will do that.. t = Thread.new do loop do # log debug "queue capacity: #{@queue.length}/#{@queue.max}" while !@queue.empty? do @msg = @queue.pop unless @msg # @msg stores last failed msg. begin @conn.write("#{@msg}\0") # log debug "send msg: #{@msg}" @msg = nil rescue # retry to connect # log error "cann't write to bridge." sleep(SLEEP_TIME) connect end end # while sleep(SLEEP_TIME) # queue is empty end end set_trap(t) end |
#send_heartbeat ⇒ Object
102 103 104 105 |
# File 'lib/cloudpi-appmetric.rb', line 102 def send_heartbeat # log debug "heartbeat into queue" circular_enqueue({:"#{key}" => {:alive => Time.now.to_i}}.to_json) end |
#send_metric(metric) ⇒ Object
114 115 116 117 118 119 |
# File 'lib/cloudpi-appmetric.rb', line 114 def send_metric(metric) # log debug "metric into queue" msg = parse(metric) msg[:when] = Time.now.to_i circular_enqueue({:"#{key}" => msg}.to_json) end |
#send_policy ⇒ Object
107 108 109 110 111 112 |
# File 'lib/cloudpi-appmetric.rb', line 107 def send_policy # log debug "policy into queue" @policy[:when] = Time.now.to_i circular_enqueue({:"#{key}" => @policy}.to_json) @policy_published = true; end |
#set_policy(policy) ⇒ Object
89 90 91 92 |
# File 'lib/cloudpi-appmetric.rb', line 89 def set_policy(policy) @policy = parse(policy) # log info "set_policy: #{@policy.to_json}" end |
#set_trap(thread) ⇒ Object
121 122 123 124 125 126 127 |
# File 'lib/cloudpi-appmetric.rb', line 121 def set_trap(thread) # Thread exit when signal triggered Signal.trap("TERM") do # log info "TERM signal received." Thread.kill(thread) if thread end end |