Class: LogStash::Outputs::Redisearch
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::Redisearch
- Includes:
- Stud::Buffer
- Defined in:
- lib/logstash/outputs/redisearch.rb
Overview
An redisearch output will store data into Redisearch.
Instance Method Summary collapse
-
#close ⇒ Object
Method is for final bookkeeping and cleanup when plugin thread exit.
-
#flush(events, close = false) ⇒ Object
Method is called from Stud::Buffer when max_items/max_interval is reached.
-
#on_flush_error(e) ⇒ Object
Method is called from Stud::Buffer when an error occurs.
- #receive(event) ⇒ Object
- #register ⇒ Object
-
#send_to_redisearch(event, payload) ⇒ Object
Method to assign uuid to each event (formatting event as per document required by redisearch) and to append each event to buffer.
Instance Method Details
#close ⇒ Object
Method is for final bookkeeping and cleanup when plugin thread exit
96 97 98 99 |
# File 'lib/logstash/outputs/redisearch.rb', line 96 def close # Force full flush call to ensure that all accumulated messages are flushed. buffer_flush(:final => true) end |
#flush(events, close = false) ⇒ Object
Method is called from Stud::Buffer when max_items/max_interval is reached
81 82 83 84 85 |
# File 'lib/logstash/outputs/redisearch.rb', line 81 def flush(events, close=false) #buffer_flush should pass here the :final boolean value. @redisearch_client.add_docs(events) @logger.info("Buffer Inserted Successfully", :length => events.length) end |
#on_flush_error(e) ⇒ Object
Method is called from Stud::Buffer when an error occurs
88 89 90 91 92 93 |
# File 'lib/logstash/outputs/redisearch.rb', line 88 def on_flush_error(e) @logger.warn("Failed to send backlog of events to Redisearch", :exception => e, :backtrace => e.backtrace ) end |
#receive(event) ⇒ Object
69 70 71 72 73 74 75 76 77 78 |
# File 'lib/logstash/outputs/redisearch.rb', line 69 def receive(event) begin @codec.encode(event) rescue StandardError => e @logger.warn("Error encoding event", :exception => e, :event => event) sleep @reconnect_interval retry end end |
#register ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/logstash/outputs/redisearch.rb', line 43 def register buffer_initialize( :max_items => @batch_events, :max_interval => @batch_timeout, ) params = { "host"=>@host, "port"=>@port, "index"=>@index, "ssl"=>@ssl } if @password params["password"] = @password.value end @idx = Index.new(params) @redisearch_client = @idx.connect() @codec.on_event(&method(:send_to_redisearch)) end |
#send_to_redisearch(event, payload) ⇒ Object
Method to assign uuid to each event (formatting event as per document required by redisearch) and to append each event to buffer
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/logstash/outputs/redisearch.rb', line 103 def send_to_redisearch(event, payload) begin doc_data = JSON.parse(payload) doc_id = @idx.get_id() document = [doc_id,doc_data] buffer_receive(document) rescue => e @logger.warn("Failed to send event to Redisearch", :event => event, :exception => e, :backtrace => e.backtrace) sleep @reconnect_interval retry end end |