Class: LogStash::Outputs::KinesisIOT

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/kinesis-iot.rb

Overview

An example output that does nothing.

Defined Under Namespace

Classes: AWSIOTCreds

Instance Method Summary collapse

Instance Method Details

#check_required_file(file) ⇒ Object



73
74
75
# File 'lib/logstash/outputs/kinesis-iot.rb', line 73

def check_required_file(file)
  raise "Required file " + file +" does not exist." unless File.file?(file)
end

#getIotAccessObject



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/logstash/outputs/kinesis-iot.rb', line 78

def getIotAccess
  uri = URI(@iot_endpoint)

  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = true
  http.cert = OpenSSL::X509::Certificate.new(File.read(@cert_file))
  http.key = OpenSSL::PKey::RSA.new(File.read(@key_file))
  http.ca_file = @ca_cert_file
  http.verify_mode = @verify_certificate ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE  

  request = Net::HTTP::Get.new(uri)
  request['x-amzn-iot-thingname'] = @iot_name

  response = http.request(request)
  result = JSON.parse(response.body)["credentials"]
  return AWSIOTCreds.new(result["accessKeyId"], result["secretAccessKey"], result["sessionToken"],
                         Time.parse(result["expiration"]))
end

#init_awsObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/logstash/outputs/kinesis-iot.rb', line 97

def init_aws
  Aws.config.update({
    region: @region,
    credentials: Aws::Credentials.new(@creds.accessKeyId, @creds.secretAccessKey, @creds.sessionToken)
  })
  # Initialize Kinesis client
  @kinesis = Aws::Kinesis::Client.new
    # send data to kinesis
  response = @kinesis.put_record({
    stream_name: @stream_name,
    data: '{"test":"test"}',
    partition_key: 'test'
  })
  @logger.info("Record sent successfully. Shard ID: #{response.shard_id}, Sequence number: #{response.sequence_number}")
end

#receive(event) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/logstash/outputs/kinesis-iot.rb', line 129

def receive(event)
  return unless output?(event)

  if @randomized_partition_key
    event.set("[@metadata][partition_key]", SecureRandom.uuid)
  else
    # Haha - gawd. If I don't put an empty string in the array, then calling .join()
    # on it later will result in a US-ASCII string if the array is empty. Ruby is awesome.
    partition_key_parts = [""]

    @event_partition_keys.each do |partition_key_name|
      if not event.get(partition_key_name).nil? and event.get(partition_key_name).length > 0
        partition_key_parts << event.get(partition_key_name).to_s
        break
      end
    end

    event.set("[@metadata][partition_key]", (partition_key_parts * "-").to_s[/.+/m] || "-")
  end

  begin
    @codec.encode(event)
  rescue => e
    @logger.warn("Error encoding event", :exception => e, :event => event)
  end
end

#registerObject



119
120
121
122
123
124
125
126
# File 'lib/logstash/outputs/kinesis-iot.rb', line 119

def register
  check_required_file(@cert_file)
  check_required_file(@key_file)
  check_required_file(@ca_cert_file)
  @creds = getIotAccess()
  init_aws()
  @codec.on_event(&method(:send_record))
end

#renew_awsObject



112
113
114
115
# File 'lib/logstash/outputs/kinesis-iot.rb', line 112

def renew_aws
  @creds = getIotAccess()
  Aws.config.update({ credentials: Aws::Credentials.new(@creds.accessKeyId, @creds.secretAccessKey, @creds.sessionToken)})
end

#send_record(event, payload) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/logstash/outputs/kinesis-iot.rb', line 155

def send_record(event, payload)
  begin
    response = @kinesis.put_record({
      stream_name: @stream_name,
      data: payload,
      partition_key: event.get("[@metadata][partition_key]")
    })
  rescue => e
    @logger.warn("Error writing event to Kinesis", :exception => e)
  end

  # num = @producer.getOutstandingRecordsCount()
  # if num > @max_pending_records
  #   @logger.warn("Kinesis is too busy - blocking until things have cleared up")
  #   @producer.flushSync()
  #   @logger.info("Okay - I've stopped blocking now")
  # end
end