Class: LogStash::Inputs::Vespa
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Vespa
- Defined in:
- lib/logstash/inputs/vespa.rb
Overview
This is the logstash vespa input plugin. It is used to read from Vespa via Visit : docs.vespa.ai/en/reference/document-v1-api-reference.html#visit Each document becomes an event.
Instance Method Summary collapse
-
#fetch_documents_from_vespa(uri) ⇒ Object
def run.
-
#parse_response(response) ⇒ Object
def fetch_documents_from_vespa.
-
#process_documents(documents, queue) ⇒ Object
def parse_response.
- #register ⇒ Object
-
#run(queue) ⇒ Object
def register.
-
#stop ⇒ Object
def process_documents.
Instance Method Details
#fetch_documents_from_vespa(uri) ⇒ Object
def run
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/logstash/inputs/vespa.rb', line 131 def fetch_documents_from_vespa(uri) http = Net::HTTP.new(uri.host, uri.port) if uri.scheme == "https" http.use_ssl = true http.cert = @cert http.key = @key http.verify_mode = OpenSSL::SSL::VERIFY_PEER end request = Net::HTTP::Get.new(uri.request_uri) http.request(request) rescue => e @logger.error("Failed to make HTTP request to Vespa", :error => e.) nil end |
#parse_response(response) ⇒ Object
def fetch_documents_from_vespa
147 148 149 150 151 152 |
# File 'lib/logstash/inputs/vespa.rb', line 147 def parse_response(response) JSON.parse(response.body) rescue JSON::ParserError => e @logger.error("Failed to parse JSON response", :error => e.) nil end |
#process_documents(documents, queue) ⇒ Object
def parse_response
154 155 156 157 158 159 160 |
# File 'lib/logstash/inputs/vespa.rb', line 154 def process_documents(documents, queue) documents.each do |document| event = LogStash::Event.new(document) decorate(event) queue << event end end |
#register ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/logstash/inputs/vespa.rb', line 51 def register if @client_cert != nil @cert = OpenSSL::X509::Certificate.new(File.read(@client_cert)) end if @client_key != nil @key = OpenSSL::PKey::RSA.new(File.read(@client_key)) end if @client_cert.nil? ^ @client_key.nil? raise LogStash::ConfigurationError, "Both client_cert and client_key must be set, you can't have just one" end @uri_params = { :cluster => @cluster, :wantedDocumentCount => @page_size, :concurrency => @backend_concurrency, :timeout => @timeout } if @selection != nil @uri_params[:selection] = @selection end if @from_timestamp != nil @uri_params[:fromTimestamp] = @from_timestamp end if @to_timestamp != nil @uri_params[:toTimestamp] = @to_timestamp end end |
#run(queue) ⇒ Object
def register
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/logstash/inputs/vespa.rb', line 83 def run(queue) uri = URI.parse("#{@vespa_url}/document/v1/") uri.query = URI.encode_www_form(@uri_params) continuation = nil loop do response = fetch_documents_from_vespa(uri) # response should look like: # { # "pathId":"/document/v1/","documents":[ # {"id":"id:namespace:doctype::docid","fields":{"field1":"value1","field2":7.0}} # ], # "documentCount":1,"continuation":"continuation_string" # } if response.is_a?(Net::HTTPSuccess) response_parsed = parse_response(response) break unless response_parsed document_count = response_parsed["documentCount"] # record the continuation token for the next request (if it exists) continuation = response_parsed["continuation"] documents = response_parsed["documents"] process_documents(documents, queue) # Exit the loop if there are no more documents to process if continuation != nil uri.query = URI.encode_www_form(@uri_params.merge({:continuation => continuation})) else @logger.info("No continuation ID => no more documents to fetch from Vespa") break end if @stopping @logger.info("Stopping Vespa input") break end else @logger.error("Failed to fetch documents from Vespa", :request => uri.to_s, :response_code => response.code, :response_message => response.) break # TODO retry? Only on certain codes? end # if response.is_a?(Net::HTTPSuccess) end # loop do end |
#stop ⇒ Object
def process_documents
162 163 164 |
# File 'lib/logstash/inputs/vespa.rb', line 162 def stop @stopping = true end |