Class: Fluent::SomeOutput

Inherits:
TimeSlicedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_snowplow.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



11
12
13
# File 'lib/fluent/plugin/out_snowplow.rb', line 11

def configure(conf)
  super
end

#format(tag, time, record) ⇒ Object



33
34
35
# File 'lib/fluent/plugin/out_snowplow.rb', line 33

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#startObject



15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/fluent/plugin/out_snowplow.rb', line 15

def start
  super

  @emitter = SnowplowTracker::Emitter.new(@host, {
    buffer_size: @buffer_size,
    protocol: @protocol,
    method: @method,
    on_success: ->(_) { log.debug("Flush with success on snowplow") },
    on_failure: ->(_, _) { raise "Error when flushing to snowplow" }
  })

  @trackers = {}
end

#stopObject



29
30
31
# File 'lib/fluent/plugin/out_snowplow.rb', line 29

def stop
  @tracker.flush
end

#tracker_for(application) ⇒ Object



37
38
39
40
# File 'lib/fluent/plugin/out_snowplow.rb', line 37

def tracker_for(application)
  @trackers[application] ||= SnowplowTracker::Tracker.new(@emitter, nil, nil, application)
  @trackers[application]
end

#write(chunk) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/out_snowplow.rb', line 42

def write(chunk)
  application, tracker = nil, nil

  chunk.msgpack_each do |_, _, record|
    schema = record['schema']
    message = JSON.parse record['message']
    true_timestamp = record['true_timestamp']
    application = record['application']
    contexts = JSON.parse record.fetch('contexts', "[]")
    tracker = tracker_for(application)

    contexts = contexts.map do |context|
      context_schema = context['schema']
      context_message = context['message']

      SnowplowTracker::SelfDescribingJson.new(context_schema, context_message)
    end

    self_describing_json = SnowplowTracker::SelfDescribingJson.new(schema, message)
    tracker.track_self_describing_event(self_describing_json, contexts, SnowplowTracker::TrueTimestamp.new(true_timestamp.to_i))
  end

  tracker.flush
end