Class: Syphon::Source

Inherits:
Object
  • Object
show all
Defined in:
lib/syphon/source.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(index, name, options = {}, &block) ⇒ Source

Returns a new instance of Source.



3
4
5
6
7
8
# File 'lib/syphon/source.rb', line 3

def initialize(index, name, options = {}, &block)
  @index = index
  @name = name && name.to_sym
  @type = options[:type] || default_type
  @schema = Schema.new(&block)
end

Instance Attribute Details

#indexObject (readonly)

Returns the value of attribute index.



10
11
12
# File 'lib/syphon/source.rb', line 10

def index
  @index
end

#nameObject (readonly)

Returns the value of attribute name.



10
11
12
# File 'lib/syphon/source.rb', line 10

def name
  @name
end

#schemaObject (readonly)

Returns the value of attribute schema.



10
11
12
# File 'lib/syphon/source.rb', line 10

def schema
  @schema
end

#typeObject (readonly)

Returns the value of attribute type.



10
11
12
# File 'lib/syphon/source.rb', line 10

def type
  @type
end

Instance Method Details

#import(options = {}) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/syphon/source.rb', line 16

def import(options = {})
  db = index.database_connection
  query = options[:query] || schema.query(order: "`#{schema.relation}`.id")
  index.pre_sql.each { |sql| db.query(sql) }
  rows = db.query(query, as: :array, stream: true, cache_rows: false)
  builder = Builder.new(rows, schema)

  builder.each_slice(1000) do |slice|
    body = []
    slice.each do |document|
      body << {index: meta(document[:id], options)} << document
    end
    client.bulk body: body
  end
  client.indices.refresh index: options[:index] || index.index_name
end

#mappingObject



12
13
14
# File 'lib/syphon/source.rb', line 12

def mapping
  {type => {properties: schema.properties}}
end

#update_ids(ids) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
# File 'lib/syphon/source.rb', line 33

def update_ids(ids)
  return if ids.empty?
  query = schema.query(
    scope: "`#{schema.relation}`.id IN (#{ids.join(', ')})",
    order: "`#{schema.relation}`.id",
  )
  rows = Syphon.database_connection.query(query, as: :array)
  docs = Builder.new(rows, schema).to_a
  body = bulk_indexes(docs) + bulk_deletes(ids, docs)
  client.bulk body: body, refresh: true unless body.empty?
end