Class: Fluent::SomeOutput
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::SomeOutput
- Defined in:
- lib/fluent/plugin/out_snowplow.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #tracker_for(application) ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
11 12 13 |
# File 'lib/fluent/plugin/out_snowplow.rb', line 11 def configure(conf) super end |
#format(tag, time, record) ⇒ Object
33 34 35 |
# File 'lib/fluent/plugin/out_snowplow.rb', line 33 def format(tag, time, record) [tag, time, record].to_msgpack end |
#start ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/fluent/plugin/out_snowplow.rb', line 15 def start super @emitter = SnowplowTracker::Emitter.new(@host, { buffer_size: @buffer_size, protocol: @protocol, method: @method, on_success: ->(_) { log.debug("Flush with success on snowplow") }, on_failure: ->(_, _) { raise "Error when flushing to snowplow" } }) @trackers = {} end |
#stop ⇒ Object
29 30 31 |
# File 'lib/fluent/plugin/out_snowplow.rb', line 29 def stop @tracker.flush end |
#tracker_for(application) ⇒ Object
37 38 39 40 |
# File 'lib/fluent/plugin/out_snowplow.rb', line 37 def tracker_for(application) @trackers[application] ||= SnowplowTracker::Tracker.new(@emitter, nil, nil, application) @trackers[application] end |
#write(chunk) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/out_snowplow.rb', line 42 def write(chunk) application, tracker = nil, nil chunk.msgpack_each do |_, _, record| schema = record['schema'] = JSON.parse record['message'] = record['true_timestamp'] application = record['application'] contexts = JSON.parse record.fetch('contexts', "[]") tracker = tracker_for(application) contexts = contexts.map do |context| context_schema = context['schema'] = context['message'] SnowplowTracker::SelfDescribingJson.new(context_schema, ) end self_describing_json = SnowplowTracker::SelfDescribingJson.new(schema, ) tracker.track_self_describing_event(self_describing_json, contexts, SnowplowTracker::TrueTimestamp.new(.to_i)) end tracker.flush end |