Class: Droonga::DataAbsorber

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/droonga/data_absorber.rb

Constant Summary collapse

DEFAULT_MESSAGES_PER_SECOND =
100
TIME_UNKNOWN =
-1
PROGRESS_UNKNOWN =
-1
MESSAGES_PER_SECOND_MATCHER =
/(\d+(\.\d+)?) messages\/second/
ONE_MINUTE_IN_SECONDS =
60
ONE_HOUR_IN_SECONDS =
ONE_MINUTE_IN_SECONDS * 60

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params) ⇒ DataAbsorber

Returns a new instance of DataAbsorber.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/droonga/data_absorber.rb', line 39

def initialize(params)
  @params = params

  @messages_per_second = @params[:messages_per_second] || DEFAULT_MESSAGES_PER_SECOND

  @drndump = @params[:drndump] || "drndump"
  # We should use droonga-send instead of droonga-request,
  # because droonga-request is too slow.
  @client = @params[:client] || "droonga-send"

  @dataset = @params[:dataset] || CatalogGenerator::DEFAULT_DATASET
  @port    = @params[:port]    || CatalogGenerator::DEFAULT_PORT
  @tag     = @params[:tag]     || CatalogGenerator::DEFAULT_TAG

  @source_host      = @params[:source_host]
  @destination_host = @params[:destination_host]
  @receiver_host    = @params[:receiver_host] || @destination_host

  @receiver_port = @params[:receiver_port]
end

Instance Attribute Details

#paramsObject (readonly)

Returns the value of attribute params.



38
39
40
# File 'lib/droonga/data_absorber.rb', line 38

def params
  @params
end

Class Method Details

.absorb(params) ⇒ Object



33
34
35
# File 'lib/droonga/data_absorber.rb', line 33

def absorb(params)
  new(params).absorb
end

Instance Method Details

#absorbObject



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/droonga/data_absorber.rb', line 62

def absorb
  drndump_command_line = [@drndump] + drndump_options
  client_command_line  = [@client] + client_options(@client)

  start_time_in_seconds = Time.new.to_i
  env = {}
  Open3.pipeline_r([env, *drndump_command_line],
                   [env, *client_command_line]) do |last_stdout, thread|
    last_stdout.each do |output|
      if block_given?
        messages_per_second = nil
        if output =~ MESSAGES_PER_SECOND_MATCHER
          messages_per_second = $1.to_f
        end
        yield(:progress => report_progress(start_time_in_seconds),
              :output   => output,
              :messages_per_second => messages_per_second)
      end
    end
  end
end

#can_report_remaining_time?Boolean

Returns:

  • (Boolean)


84
85
86
87
# File 'lib/droonga/data_absorber.rb', line 84

def can_report_remaining_time?
  required_time_in_seconds != Droonga::DataAbsorber::TIME_UNKNOWN and
    required_time_in_seconds > 0
end

#destination_clientObject



125
126
127
128
129
130
131
132
133
134
135
# File 'lib/droonga/data_absorber.rb', line 125

def destination_client
  options = {
    :host          => @destination_host,
    :port          => @port,
    :tag           => @tag,
    :progocol      => :droonga,
    :receiver_host => @receiver_host,
    :receiver_port => 0,
  }
  @destination_client ||= Droonga::Client.new(options)
end

#report_progress(start_time_in_seconds) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/droonga/data_absorber.rb', line 96

def report_progress(start_time_in_seconds)
  return nil unless can_report_remaining_time?

  elapsed_time = Time.new.to_i - start_time_in_seconds
  progress = elapsed_time.to_f / required_time_in_seconds
  progress = [(progress * 100).to_i, 100].min

  remaining_seconds  = [required_time_in_seconds - elapsed_time, 0].max
  remaining_hours    = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor
  remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS
  remaining_minutes  = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor
  remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS
  remaining_time     = sprintf("%02i:%02i:%02i", remaining_hours, remaining_minutes, remaining_seconds)

  "#{progress}% done (maybe #{remaining_time} remaining)"
end

#required_time_in_secondsObject



89
90
91
# File 'lib/droonga/data_absorber.rb', line 89

def required_time_in_seconds
  @required_time_in_seconds ||= calculate_required_time_in_seconds
end

#source_clientObject



113
114
115
116
117
118
119
120
121
122
123
# File 'lib/droonga/data_absorber.rb', line 113

def source_client
  options = {
    :host          => @source_host,
    :port          => @port,
    :tag           => @tag,
    :progocol      => :droonga,
    :receiver_host => @receiver_host,
    :receiver_port => 0,
  }
  @source_client ||= Droonga::Client.new(options)
end

#source_node_suspendable?Boolean

Returns:

  • (Boolean)


137
138
139
# File 'lib/droonga/data_absorber.rb', line 137

def source_node_suspendable?
  (source_replica_hosts - [@source_host]).size > 1
end