Class: Baza::QueryBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/baza/query_buffer.rb

Overview

This class buffers a lot of queries and flushes them out via transactions.

Constant Summary collapse

QUERIES_FLUSH_SIZE =
12 * 1024 * 1024
INITIALIZE_ARGS_ALLOWED =
[:db, :debug, :flush_async].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ QueryBuffer

Constructor. Takes arguments to be used and a block.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/baza/query_buffer.rb', line 9

def initialize(args)
  @args = args
  @db = args.fetch(:db)
  @queries = []
  @inserts = {}
  @queries_count = 0
  @queries_size = 0
  @debug = @args[:debug]
  @lock = Mutex.new

  STDOUT.puts "Query buffer started." if @debug

  return unless block_given?

  if @args[:flush_async]
    @db.clone_conn do |db_flush_async|
      @db_flush_async = db_flush_async

      begin
        yield(self)
      ensure
        flush
        thread_async_join
      end
    end
  else
    begin
      yield(self)
    ensure
      flush
      thread_async_join
    end
  end
end

Instance Attribute Details

#thread_asyncObject (readonly)

Returns the value of attribute thread_async.



3
4
5
# File 'lib/baza/query_buffer.rb', line 3

def thread_async
  @thread_async
end

Instance Method Details

#delete(table, where) ⇒ Object

Delete as on a normal Baza::Db.

Example

buffer.delete(:users, => 5)



60
61
62
63
64
# File 'lib/baza/query_buffer.rb', line 60

def delete(table, where)
  STDOUT.puts "Delete called on table #{table} with arguments: '#{where}'." if @debug
  query(@db.delete(table, where, return_sql: true))
  nil
end

#flushObject

Flushes all queries out in a transaction. This will automatically be called for every 1000 queries.



113
114
115
116
117
118
119
# File 'lib/baza/query_buffer.rb', line 113

def flush
  if @args[:flush_async]
    flush_async
  else
    flush_real
  end
end

#insert(table, data) ⇒ Object

Plans to inset a hash into a table. It will only be inserted when flush is called.

Examples

buffer.insert(:users, => “John Doe”)



86
87
88
89
# File 'lib/baza/query_buffer.rb', line 86

def insert(table, data)
  query(@db.insert(table, data, return_sql: true))
  nil
end

#insert_with_multi(table, data, sort: false) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/baza/query_buffer.rb', line 91

def insert_with_multi(table, data, sort: false)
  data_key = ""

  if sort
    data = data.sort_by { |element| element[0].to_s }
    data = Hash[data]
  end

  data.each do |key, value|
    data_key << "#{key}---"
    @queries_size += value.is_a?(String) ? value.bytesize * 1.5 : 8
  end

  @inserts[table] ||= {}
  @inserts[table][data_key] ||= []
  @inserts[table][data_key] << data

  @queries_count += 1
  flush if @queries_count >= 1000 || @queries_size >= QUERIES_FLUSH_SIZE
end

#query(str) ⇒ Object

Adds a query to the buffer.



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/baza/query_buffer.rb', line 45

def query(str)
  @lock.synchronize do
    STDOUT.print "Adding to buffer: #{str}\n" if @debug
    @queries << str
    @queries_count += 1
    @queries_size += str.bytesize
  end

  flush if @queries_count >= 1000 || @queries_size >= QUERIES_FLUSH_SIZE
  nil
end

#update(table, update, terms) ⇒ Object

Update as on a normal Baza::Db.

Example

buffer.update(:users, => “Kasper”, => 5)



69
70
71
72
73
# File 'lib/baza/query_buffer.rb', line 69

def update(table, update, terms)
  STDOUT.puts "Update called on table #{table}." if @debug
  query(@db.update(table, update, terms, return_sql: true))
  nil
end

#upsert(table, data, terms) ⇒ Object

Shortcut to doing upsert through the buffer instead of through the db-object with the buffer as an argument.

Example

buffer.upsert(:users, => 5, => “Kasper”)



78
79
80
81
# File 'lib/baza/query_buffer.rb', line 78

def upsert(table, data, terms)
  @db.upsert(table, data, terms, buffer: self)
  nil
end