Class: LogStash::Outputs::Redisearch

Inherits:
Base
  • Object
show all
Includes:
Stud::Buffer
Defined in:
lib/logstash/outputs/redisearch.rb

Overview

An redisearch output will store data into Redisearch.

Instance Method Summary collapse

Instance Method Details

#closeObject

Method is for final bookkeeping and cleanup when plugin thread exit



96
97
98
99
# File 'lib/logstash/outputs/redisearch.rb', line 96

def close
    # Force full flush call to ensure that all accumulated messages are flushed.
    buffer_flush(:final => true)
end

#flush(events, close = false) ⇒ Object

Method is called from Stud::Buffer when max_items/max_interval is reached



81
82
83
84
85
# File 'lib/logstash/outputs/redisearch.rb', line 81

def flush(events, close=false)
  #buffer_flush should pass here the :final boolean value.
  @redisearch_client.add_docs(events)
  @logger.info("Buffer Inserted Successfully", :length => events.length)
end

#on_flush_error(e) ⇒ Object

Method is called from Stud::Buffer when an error occurs



88
89
90
91
92
93
# File 'lib/logstash/outputs/redisearch.rb', line 88

def on_flush_error(e)
  @logger.warn("Failed to send backlog of events to Redisearch",
    :exception => e,
    :backtrace => e.backtrace
  )
end

#receive(event) ⇒ Object



69
70
71
72
73
74
75
76
77
78
# File 'lib/logstash/outputs/redisearch.rb', line 69

def receive(event)
  begin
    @codec.encode(event)
  rescue StandardError => e
    @logger.warn("Error encoding event", :exception => e,
                 :event => event)
    sleep @reconnect_interval
    retry
  end
end

#registerObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/logstash/outputs/redisearch.rb', line 43

def register
  
  buffer_initialize(
    :max_items => @batch_events,
    :max_interval => @batch_timeout,
  )

  params = {
    "host"=>@host,
    "port"=>@port,
    "index"=>@index,
    "ssl"=>@ssl
  }

  if @password
    params["password"] = @password.value  
  end

  @idx = Index.new(params)
  @redisearch_client = @idx.connect()
  @codec.on_event(&method(:send_to_redisearch))

end

#send_to_redisearch(event, payload) ⇒ Object

Method to assign uuid to each event (formatting event as per document required by redisearch) and to append each event to buffer



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/logstash/outputs/redisearch.rb', line 103

def send_to_redisearch(event, payload)
  begin
    doc_data = JSON.parse(payload)
    doc_id = @idx.get_id()
    document = [doc_id,doc_data]
    buffer_receive(document)

  rescue => e
    @logger.warn("Failed to send event to Redisearch", :event => event,
                 :exception => e,
                 :backtrace => e.backtrace)
    sleep @reconnect_interval
    retry
  end
end