Class: Wukong::Load::ElasticsearchLoader

Inherits:
Loader
  • Object
show all
Defined in:
lib/wukong-load/loaders/elasticsearch.rb

Overview

Loads data into Elasticsearch.

Uses Elasticsearch's HTTP API to communicate.

Allows loading records into a given index and type. Records can have fields _index and _es_type which override the given index and type on a per-record basis.

Records can have an _id field which indicates an update, not a create.

The names of these fields within each record (_index, _es_type, and _id) can be customized.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Loader

#process

Instance Attribute Details

#connectionObject

The Net::HTTP connection we'll use for talking to Elasticsearch.



52
53
54
# File 'lib/wukong-load/loaders/elasticsearch.rb', line 52

def connection
  @connection
end

Instance Method Details

#create_path(record) ⇒ Object

:nodoc:



76
77
78
# File 'lib/wukong-load/loaders/elasticsearch.rb', line 76

def create_path record
  File.join('/', index_for(record).to_s, es_type_for(record).to_s)
end

#es_type_for(record) ⇒ Object

:nodoc:



91
92
93
# File 'lib/wukong-load/loaders/elasticsearch.rb', line 91

def es_type_for record
  record[es_type_field] || self.es_type
end

#id_for(record) ⇒ Object

:nodoc:



96
97
98
# File 'lib/wukong-load/loaders/elasticsearch.rb', line 96

def id_for record
  record[id_field]
end

#index_for(record) ⇒ Object

:nodoc:



86
87
88
# File 'lib/wukong-load/loaders/elasticsearch.rb', line 86

def index_for record
  record[index_field] || self.index
end

#load(record) ⇒ Object

Load a single record into Elasticsearch.

If the record has an ID, we'll issue an update, otherwise a create

Parameters:

  • record (Hash)


71
72
73
# File 'lib/wukong-load/loaders/elasticsearch.rb', line 71

def load record
  id_for(record) ? request(Net::HTTP::Put, update_path(record), record) : request(Net::HTTP::Post, create_path(record), record)
end

#request(request_type, path, record) ⇒ Object

Make a request via the existing #connection. Record will be turned to JSON automatically.

Parameters:

  • request_type (Net::HTTPRequest)
  • path (String)
  • record (Hash)


106
107
108
# File 'lib/wukong-load/loaders/elasticsearch.rb', line 106

def request request_type, path, record
  perform_request(create_request(request_type, path, record))
end

#setupObject

Creates a connection



55
56
57
58
59
60
61
62
63
64
# File 'lib/wukong-load/loaders/elasticsearch.rb', line 55

def setup
  h = host.gsub(%r{^http://},'')
  log.debug("Connecting to Elasticsearch cluster at #{h}:#{port}...")
  begin
    self.connection = Net::HTTP.new(h, port)
    self.connection.use_ssl = true if host =~ /^https/
  rescue => e
    raise Error.new(e.message)
  end
end

#update_path(record) ⇒ Object

:nodoc:



81
82
83
# File 'lib/wukong-load/loaders/elasticsearch.rb', line 81

def update_path record
  File.join('/', index_for(record).to_s, es_type_for(record).to_s, id_for(record).to_s)
end