Module: Fluent::TransactionSupport

Defined in:
lib/flydata/fluent-plugins/flydata_plugin_ext/transaction_support.rb

Overview

Transaction support This module is expected to be prepended

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



16
17
18
# File 'lib/flydata/fluent-plugins/flydata_plugin_ext/transaction_support.rb', line 16

def self.included(base)
  raise "This module is expected to be prepended"
end

Instance Method Details

#configure(conf) ⇒ Object

def self.prepended(base) end



23
24
25
26
# File 'lib/flydata/fluent-plugins/flydata_plugin_ext/transaction_support.rb', line 23

def configure(conf)
  @lock_file = Flydata::FLYDATA_LOCK
  super
end

#do_transactionObject



43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/flydata/fluent-plugins/flydata_plugin_ext/transaction_support.rb', line 43

def do_transaction
  transaction_context =TransactionContext.new
  File.open(@lock_file, "w") {|f| f.write(Process.pid)}
  begin
    yield(transaction_context)
  ensure
    # leave the lock file when a transaction is broken
    if !transaction_context.transaction_broken? && File.exists?(@lock_file) &&
        Process.pid == File.open(@lock_file, "r") {|f| f.read}.to_i
      File.delete(@lock_file)
    end
  end
end

#process_aborted?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/flydata/fluent-plugins/flydata_plugin_ext/transaction_support.rb', line 39

def process_aborted?
  instance_variable_defined? :@abort
end

#startObject



28
29
30
31
32
33
34
35
36
37
# File 'lib/flydata/fluent-plugins/flydata_plugin_ext/transaction_support.rb', line 28

def start
  if File.exists?(@lock_file)
    $log.warn "Previous process was terminated abnormally.  To start, remove the lock file after checking data integrity."
    @abort = true
    Process.kill(:TERM, Process.ppid)
    return
  end

  super
end