Class: Fluent::RackspaceCloudFeedsOutput

Inherits:
Output
  • Object
show all
Includes:
PluginLoggerMixin
Defined in:
lib/fluent/plugin/out_rackspace_cloud_feeds.rb

Instance Method Summary collapse

Instance Method Details

#atomic_wrapper(content, time) ⇒ Object

putting content into an atom entry document



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/out_rackspace_cloud_feeds.rb', line 91

def atomic_wrapper(content, time)
  # date format
  now = DateTime.strptime(time.to_s, '%s').strftime("%FT%T.%LZ")

  <<EOF
<?xml version="1.0" encoding="UTF-8" ?>
<entry xmlns="http://www.w3.org/2005/Atom">
  <id>#{SecureRandom.uuid.to_s}</id>
  <title type="text">User Access Event</title>
  <author><name>Repose</name></author>
  <updated>#{now}</updated>
  <content type="application/xml">#{content}</content>
</entry>
EOF
end

#authenticate_userObject

either get a token back from identity, or poop the pants noinspection RubyStringKeysInHashInspection



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/fluent/plugin/out_rackspace_cloud_feeds.rb', line 60

def authenticate_user
  uri = URI @identity_endpoint
  http = Net::HTTP.new(uri.host, uri.port)
  if uri.scheme == 'https'
    http.use_ssl = true
  end
  req = Net::HTTP::Post.new(uri.path)
  content = {
      'auth' => {
          'passwordCredentials' => {
              'username' => @identity_username,
              'password' => @identity_password
          }
      }
  }
  req.body = content.to_json
  req['content-type'] = 'application/json'
  req['accept'] = 'application/json'
  res = http.request(req)

  case res
    when Net::HTTPSuccess
      # Get the token
      JSON.parse(res.body)['access']['token']['id']
    else
      raise "Unable to authenticate with identity at #{@identity_endpoint} as #{@identity_username}"
  end
end

#configure(conf) ⇒ Object



37
38
39
40
41
42
# File 'lib/fluent/plugin/out_rackspace_cloud_feeds.rb', line 37

def configure(conf)
  super
  $log.debug("   Identity endpoint: #{@identity_endpoint}")
  $log.debug("   Identity username: #{@identity_username}")
  $log.debug("Cloud Feeds endpoint: #{@feeds_endpoint}")
end

#emit(tag, es, chain) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/fluent/plugin/out_rackspace_cloud_feeds.rb', line 108

def emit(tag, es, chain)
  es.each { |time, record|
    http = Net::HTTP.new(@feeds_uri.host, @feeds_uri.port)
    if @feeds_uri.scheme == 'https'
      http.use_ssl = true
    end

    # take the data, put it in to an abdera envelope
    post = Net::HTTP::Post.new @feeds_uri.path

    post.body = atomic_wrapper(record['message'], time)
    unless @auth_token
      #get new auth token
      @auth_token = authenticate_user
    end
    post['x-auth-token'] = @auth_token
    post['content-type'] = 'application/atom+xml'

    begin
      response = http.request(post)

      if response.code !~ /2\d\d/
        @auth_token = nil
        raise "NOT AUTHORIZED TO POST TO FEED ENDPOINT #{@feeds_endpoint}"
      end
      $log.debug "FEEDS RESPONSE CODE #{response.code}"
      $log.error "FEEDS RESPONSE BODY: #{response.body}" if response.code !~ /2\d\d/
    end
  }

  chain.next
end

#shutdownObject



53
54
55
# File 'lib/fluent/plugin/out_rackspace_cloud_feeds.rb', line 53

def shutdown
  super
end

#startObject



44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/out_rackspace_cloud_feeds.rb', line 44

def start
  super
  require 'net/http/persistent'

  @feeds_uri = URI @feeds_endpoint
  @feeds_http = Net::HTTP::Persistent.new 'fluent-feeds-output'

end