Class: Indexer

Inherits:
Object
  • Object
show all
Defined in:
lib/indexer.rb

Overview

used to index data in almost realtime to Elastic Search

Instance Method Summary collapse

Constructor Details

#initialize(host, port) ⇒ Indexer

get elastic client object

Parameters:

  • host (String)

    Elastic Host

  • port (String)

    Elastic Port



9
10
11
# File 'lib/indexer.rb', line 9

def initialize(host, port)
  @client = Elasticsearch::Client.new(host: host, port: port)
end

Instance Method Details

#bulk_index(data) ⇒ Object

expects data in the form of

index, _id: 23, data: {doc: data }},{_index: index2, _id: 28, data: data}

Parameters:

  • data (Array)

    Data to be indexed



16
17
18
19
20
21
22
23
# File 'lib/indexer.rb', line 16

def bulk_index(data)
  check_bulk_index_params(data)
  response = @client.bulk body: data
  response = response.with_indifferent_access
  if response[:errors] == true
    raise "Not able to index with errors as #{(response["items"].map{|t| (t["index"] || t["update"])["error"]}.compact)}"
  end
end

#create_index(index_name, index_params_hash) ⇒ Object

creates new index givem the mapping and settings in index_params_hash

Parameters:

  • index_name (String)

    name of the index(with timestamp)

  • index_params_hash, (settins:{}, mappings:{})


99
100
101
102
# File 'lib/indexer.rb', line 99

def create_index(index_name, index_params_hash)
  @client.indices.create index: "#{index_name}",
  body: index_params_hash
end

#delete_record(index_name, id, parent_id = nil) ⇒ Object

deletes the record if exists in the given index with given type and id, raises DocumentNotFoundException if record is not found

Parameters:

  • index_name (String)

    name of the index

  • id (String)

    doc_id

  • parent_id (String) (defaults to: nil)

    parent_id



41
42
43
44
45
# File 'lib/indexer.rb', line 41

def delete_record(index_name, id, parent_id = nil)
  if record_exists?(index_name, id, parent_id)
    @client.delete generate_options_hash(index_name, id, parent_id)
  end
end

#record_exists?(index_name, id, parent_id = nil) ⇒ True, False

checks whether the record exists in the given index with given type and id

Parameters:

  • index_name (String)

    name of the index

  • id (String)

    doc_id

  • parent_id (String) (defaults to: nil)

    parent_id (optional)

Returns:

  • (True)

    if record found

  • (False)

    if record not found



31
32
33
34
# File 'lib/indexer.rb', line 31

def record_exists?(index_name, id, parent_id = nil)
  options_hash = generate_options_hash(index_name, id, parent_id)
  @client.exists options_hash
end

#switch_to_new_index(new_index, alias_name, old_index_delete_flag = true) ⇒ Object

switches to new index ,will be used when doing bulk caching

Parameters:

  • new_index (String)

    name of the index(with timestamp)

  • alias

    name , name with which you refer to fetch index details

  • old_index_delete_flag,

    bool value to decide whether to keep last index or delete it



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/indexer.rb', line 51

def switch_to_new_index(new_index, alias_name, old_index_delete_flag = true)
  begin
    result = @client.indices.get_alias name: "#{alias_name}"
  rescue Exception => e
    puts "#{e.exception}"
    if e.class == Elasticsearch::Transport::Transport::Errors::NotFound
      result = nil
    else
      raise NameError,"#{e}"
    end
  end
  unless result.nil?
    if result.keys.count > 1
      raise NameError,"Multiple indexes found for alias #{alias_name}"
    else
      old_index = result.keys.first
      log_file = Logger.new("log/old_index.log")
      log_file.info("old_index - #{old_index}")
      @client.indices.update_aliases body: {
            actions: [
              { remove: { index: "#{old_index}", alias: "#{alias_name}" } },
              { add:    { index: "#{new_index}", alias: "#{alias_name}" } }
            ]
          }
      if old_index_delete_flag
          @client.indices.delete index: "#{old_index}"
      else
          @client.indices.close index: "#{old_index}"
      end
      message = old_index_delete_flag ? "index swapping done #{old_index} deleted and " : ""
      message += "#{new_index} created"
      puts message
    end
  else
    ## checks if index exists with the name of alias itself and handles accordingly.
    if @client.indices.exists(index: "#{alias_name}") && old_index_delete_flag
      @client.indices.delete index: "#{alias_name}"
      puts "#{alias_name} index deleted"
    end
    puts "Alias #{alias_name} set for new index #{new_index} "
    @client.indices.put_alias index: "#{new_index}", name: "#{alias_name}"
  end
  puts "#{new_index} set up"
end