Class: ElasticSearch::BulkStream

Inherits:
Object
  • Object
show all
Defined in:
lib/jruby-elasticsearch/bulkstream.rb

Instance Method Summary collapse

Constructor Details

#initialize(client, queue_size = 10, flush_interval = 1) ⇒ BulkStream

Create a new bulk stream. This allows you to send index and other bulk events asynchronously and use the bulk api in ElasticSearch in a streaming way.

The ‘queue_size’ is the maximum size of unflushed requests. If the queue reaches this size, new requests will block until there is room to move.



12
13
14
15
16
17
18
# File 'lib/jruby-elasticsearch/bulkstream.rb', line 12

def initialize(client, queue_size=10, flush_interval=1)
  @bulkthread = Thread.new { run } 
  @client = client
  @queue_size = queue_size
  @queue = SizedQueue.new(@queue_size)
  @flush_interval = flush_interval
end

Instance Method Details

#flushObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/jruby-elasticsearch/bulkstream.rb', line 61

def flush
  bulk = @client.bulk

  flush_one = proc do
    # block if no data.
    method, *args = @queue.pop
    return if args.nil? # probably we are now stopping.
    bulk.send(method, *args)
  end

  flush_one.call

  1.upto([@queue.size, @queue_size - 1].min) do
    flush_one.call
  end

  # Block until this finishes
  bulk.execute!
end

#index(*args) ⇒ Object



22
23
24
25
26
27
# File 'lib/jruby-elasticsearch/bulkstream.rb', line 22

def index(*args)
  # TODO(sissel): It's not clear I need to queue this up, I could just
  # call BulkRequest#index() and when we have 10 or whatnot, flush, but
  # Queue gives us a nice blocking mechanism anyway.
  @queue << [:index, *args]
end

#stopObject



53
54
55
56
# File 'lib/jruby-elasticsearch/bulkstream.rb', line 53

def stop
  @queue << nil
  @stop = true
end