Class: FnordMetric::Worker
- Inherits:
-
Object
- Object
- FnordMetric::Worker
- Defined in:
- lib/fnordmetric/worker.rb
Instance Method Summary collapse
- #announce_event(event) ⇒ Object
- #event_key(event_id) ⇒ Object
- #expire_event(event_id) ⇒ Object
-
#initialize(opts = {}) ⇒ Worker
constructor
A new instance of Worker.
- #initialized ⇒ Object
- #namespace(key) ⇒ Object
- #parse_json(data) ⇒ Object
- #process_event(event_id, event_data) ⇒ Object
- #publish_event(event) ⇒ Object
- #pubsub_key ⇒ Object
- #queue_key ⇒ Object
- #redis ⇒ Object
- #stats_key ⇒ Object
- #sync_redis ⇒ Object
- #tick ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Worker
Returns a new instance of Worker.
3 4 5 6 7 8 |
# File 'lib/fnordmetric/worker.rb', line 3 def initialize(opts = {}) @namespaces = FnordMetric.namespaces @opts = FnordMetric.(opts) FnordMetric.register(self) end |
Instance Method Details
#announce_event(event) ⇒ Object
57 58 59 |
# File 'lib/fnordmetric/worker.rb', line 57 def announce_event(event) namespace(event[:_namespace]).ready!(redis, sync_redis).announce(event) end |
#event_key(event_id) ⇒ Object
49 50 51 |
# File 'lib/fnordmetric/worker.rb', line 49 def event_key(event_id) [@opts[:redis_prefix], 'event', event_id].join("-") end |
#expire_event(event_id) ⇒ Object
61 62 63 |
# File 'lib/fnordmetric/worker.rb', line 61 def expire_event(event_id) redis.expire(event_key(event_id), @opts[:event_data_ttl]) end |
#initialized ⇒ Object
10 11 12 13 |
# File 'lib/fnordmetric/worker.rb', line 10 def initialized FnordMetric.log("worker started") EM.next_tick(&method(:tick)) end |
#namespace(key) ⇒ Object
69 70 71 |
# File 'lib/fnordmetric/worker.rb', line 69 def namespace(key) (@namespaces[key] || @namespaces.first.last).clone end |
#parse_json(data) ⇒ Object
73 74 75 76 77 78 79 |
# File 'lib/fnordmetric/worker.rb', line 73 def parse_json(data) event = Yajl::Parser.new(:symbolize_keys => true).parse(data) event[:_namespace] = event[:_namespace].to_sym if event[:_namespace] event rescue Yajl::ParseError => e FnordMetric.error "invalid json: #{e.to_s}"; false end |
#process_event(event_id, event_data) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/fnordmetric/worker.rb', line 28 def process_event(event_id, event_data) EM.next_tick do event = parse_json(event_data) if event event[:_time] ||= Time.now.to_i event[:_eid] = event_id announce_event(event) publish_event(event) expire_event(event_id) end end end |
#publish_event(event) ⇒ Object
65 66 67 |
# File 'lib/fnordmetric/worker.rb', line 65 def publish_event(event) redis.publish(pubsub_key, event.to_json) end |
#pubsub_key ⇒ Object
41 42 43 |
# File 'lib/fnordmetric/worker.rb', line 41 def pubsub_key [@opts[:redis_prefix], 'announce'].join("-") end |
#queue_key ⇒ Object
45 46 47 |
# File 'lib/fnordmetric/worker.rb', line 45 def queue_key [@opts[:redis_prefix], 'queue'].join("-") end |
#redis ⇒ Object
81 82 83 |
# File 'lib/fnordmetric/worker.rb', line 81 def redis @redis ||= EM::Hiredis.connect(FnordMetric.[:redis_url]) # FIXPAUL end |
#stats_key ⇒ Object
53 54 55 |
# File 'lib/fnordmetric/worker.rb', line 53 def stats_key [@opts[:redis_prefix], 'stats'].join("-") end |
#sync_redis ⇒ Object
85 86 87 |
# File 'lib/fnordmetric/worker.rb', line 85 def sync_redis @sync_redis ||= FnordMetric.mk_redis end |
#tick ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/fnordmetric/worker.rb', line 15 def tick redis.blpop(queue_key, 1).callback do |list, event_id| EM.next_tick(&method(:tick)) if event_id redis.get(event_key(event_id)).callback do |event_data| process_event(event_id, event_data) if event_data FnordMetric.log("event_lost: event_data not found for event-id '#{event_id}' - maybe expired?") unless event_data redis.hincrby(stats_key, :events_processed, 1) end end end end |