Class: Syphon::Source
- Inherits:
-
Object
- Object
- Syphon::Source
- Defined in:
- lib/syphon/source.rb
Instance Attribute Summary collapse
-
#index ⇒ Object
readonly
Returns the value of attribute index.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#schema ⇒ Object
readonly
Returns the value of attribute schema.
-
#type ⇒ Object
readonly
Returns the value of attribute type.
Instance Method Summary collapse
- #import(options = {}) ⇒ Object
-
#initialize(index, name, options = {}, &block) ⇒ Source
constructor
A new instance of Source.
- #mapping ⇒ Object
- #update_ids(ids) ⇒ Object
Constructor Details
Instance Attribute Details
#index ⇒ Object (readonly)
Returns the value of attribute index.
10 11 12 |
# File 'lib/syphon/source.rb', line 10 def index @index end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
10 11 12 |
# File 'lib/syphon/source.rb', line 10 def name @name end |
#schema ⇒ Object (readonly)
Returns the value of attribute schema.
10 11 12 |
# File 'lib/syphon/source.rb', line 10 def schema @schema end |
#type ⇒ Object (readonly)
Returns the value of attribute type.
10 11 12 |
# File 'lib/syphon/source.rb', line 10 def type @type end |
Instance Method Details
#import(options = {}) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/syphon/source.rb', line 16 def import( = {}) db = index.database_connection query = [:query] || schema.query(order: "`#{schema.relation}`.id") index.pre_sql.each { |sql| db.query(sql) } rows = db.query(query, as: :array, stream: true, cache_rows: false) builder = Builder.new(rows, schema) builder.each_slice(1000) do |slice| body = [] slice.each do |document| body << {index: (document[:id], )} << document end client.bulk body: body end client.indices.refresh index: [:index] || index.index_name end |
#mapping ⇒ Object
12 13 14 |
# File 'lib/syphon/source.rb', line 12 def mapping {type => {properties: schema.properties}} end |
#update_ids(ids) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/syphon/source.rb', line 33 def update_ids(ids) return if ids.empty? query = schema.query( scope: "`#{schema.relation}`.id IN (#{ids.join(', ')})", order: "`#{schema.relation}`.id", ) rows = Syphon.database_connection.query(query, as: :array) docs = Builder.new(rows, schema).to_a body = bulk_indexes(docs) + bulk_deletes(ids, docs) client.bulk body: body, refresh: true unless body.empty? end |