Class: LogStash::Outputs::KinesisIOT
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::KinesisIOT
- Defined in:
- lib/logstash/outputs/kinesis-iot.rb
Overview
An example output that does nothing.
Defined Under Namespace
Classes: AWSIOTCreds
Instance Method Summary collapse
- #check_required_file(file) ⇒ Object
- #getIotAccess ⇒ Object
- #init_aws ⇒ Object
- #receive(event) ⇒ Object
- #register ⇒ Object
- #renew_aws ⇒ Object
- #send_record(event, payload) ⇒ Object
Instance Method Details
#check_required_file(file) ⇒ Object
73 74 75 |
# File 'lib/logstash/outputs/kinesis-iot.rb', line 73 def check_required_file(file) raise "Required file " + file +" does not exist." unless File.file?(file) end |
#getIotAccess ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/logstash/outputs/kinesis-iot.rb', line 78 def getIotAccess uri = URI(@iot_endpoint) http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = true http.cert = OpenSSL::X509::Certificate.new(File.read(@cert_file)) http.key = OpenSSL::PKey::RSA.new(File.read(@key_file)) http.ca_file = @ca_cert_file http.verify_mode = @verify_certificate ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE request = Net::HTTP::Get.new(uri) request['x-amzn-iot-thingname'] = @iot_name response = http.request(request) result = JSON.parse(response.body)["credentials"] return AWSIOTCreds.new(result["accessKeyId"], result["secretAccessKey"], result["sessionToken"], Time.parse(result["expiration"])) end |
#init_aws ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/logstash/outputs/kinesis-iot.rb', line 97 def init_aws Aws.config.update({ region: @region, credentials: Aws::Credentials.new(@creds.accessKeyId, @creds.secretAccessKey, @creds.sessionToken) }) # Initialize Kinesis client @kinesis = Aws::Kinesis::Client.new # send data to kinesis response = @kinesis.put_record({ stream_name: @stream_name, data: '{"test":"test"}', partition_key: 'test' }) @logger.info("Record sent successfully. Shard ID: #{response.shard_id}, Sequence number: #{response.sequence_number}") end |
#receive(event) ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/logstash/outputs/kinesis-iot.rb', line 129 def receive(event) return unless output?(event) if @randomized_partition_key event.set("[@metadata][partition_key]", SecureRandom.uuid) else # Haha - gawd. If I don't put an empty string in the array, then calling .join() # on it later will result in a US-ASCII string if the array is empty. Ruby is awesome. partition_key_parts = [""] @event_partition_keys.each do |partition_key_name| if not event.get(partition_key_name).nil? and event.get(partition_key_name).length > 0 partition_key_parts << event.get(partition_key_name).to_s break end end event.set("[@metadata][partition_key]", (partition_key_parts * "-").to_s[/.+/m] || "-") end begin @codec.encode(event) rescue => e @logger.warn("Error encoding event", :exception => e, :event => event) end end |
#register ⇒ Object
119 120 121 122 123 124 125 126 |
# File 'lib/logstash/outputs/kinesis-iot.rb', line 119 def register check_required_file(@cert_file) check_required_file(@key_file) check_required_file(@ca_cert_file) @creds = getIotAccess() init_aws() @codec.on_event(&method(:send_record)) end |
#renew_aws ⇒ Object
112 113 114 115 |
# File 'lib/logstash/outputs/kinesis-iot.rb', line 112 def renew_aws @creds = getIotAccess() Aws.config.update({ credentials: Aws::Credentials.new(@creds.accessKeyId, @creds.secretAccessKey, @creds.sessionToken)}) end |
#send_record(event, payload) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/logstash/outputs/kinesis-iot.rb', line 155 def send_record(event, payload) begin response = @kinesis.put_record({ stream_name: @stream_name, data: payload, partition_key: event.get("[@metadata][partition_key]") }) rescue => e @logger.warn("Error writing event to Kinesis", :exception => e) end # num = @producer.getOutstandingRecordsCount() # if num > @max_pending_records # @logger.warn("Kinesis is too busy - blocking until things have cleared up") # @producer.flushSync() # @logger.info("Okay - I've stopped blocking now") # end end |