Class: Droonga::DataAbsorber
- Inherits:
-
Object
- Object
- Droonga::DataAbsorber
- Includes:
- Loggable
- Defined in:
- lib/droonga/data_absorber.rb
Constant Summary collapse
- DEFAULT_MESSAGES_PER_SECOND =
100
- TIME_UNKNOWN =
-1
- PROGRESS_UNKNOWN =
-1
- MESSAGES_PER_SECOND_MATCHER =
/(\d+(\.\d+)?) messages\/second/
- ONE_MINUTE_IN_SECONDS =
60
- ONE_HOUR_IN_SECONDS =
ONE_MINUTE_IN_SECONDS * 60
Instance Attribute Summary collapse
-
#params ⇒ Object
readonly
Returns the value of attribute params.
Class Method Summary collapse
Instance Method Summary collapse
- #absorb ⇒ Object
- #can_report_remaining_time? ⇒ Boolean
- #destination_client ⇒ Object
-
#initialize(params) ⇒ DataAbsorber
constructor
A new instance of DataAbsorber.
- #report_progress(start_time_in_seconds) ⇒ Object
- #required_time_in_seconds ⇒ Object
- #source_client ⇒ Object
- #source_node_suspendable? ⇒ Boolean
Constructor Details
#initialize(params) ⇒ DataAbsorber
Returns a new instance of DataAbsorber.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/droonga/data_absorber.rb', line 39 def initialize(params) @params = params @messages_per_second = @params[:messages_per_second] || DEFAULT_MESSAGES_PER_SECOND @drndump = @params[:drndump] || "drndump" # We should use droonga-send instead of droonga-request, # because droonga-request is too slow. @client = @params[:client] || "droonga-send" @dataset = @params[:dataset] || CatalogGenerator::DEFAULT_DATASET @port = @params[:port] || CatalogGenerator::DEFAULT_PORT @tag = @params[:tag] || CatalogGenerator::DEFAULT_TAG @source_host = @params[:source_host] @destination_host = @params[:destination_host] @receiver_host = @params[:receiver_host] || @destination_host @receiver_port = @params[:receiver_port] end |
Instance Attribute Details
#params ⇒ Object (readonly)
Returns the value of attribute params.
38 39 40 |
# File 'lib/droonga/data_absorber.rb', line 38 def params @params end |
Class Method Details
.absorb(params) ⇒ Object
33 34 35 |
# File 'lib/droonga/data_absorber.rb', line 33 def absorb(params) new(params).absorb end |
Instance Method Details
#absorb ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/droonga/data_absorber.rb', line 62 def absorb drndump_command_line = [@drndump] + client_command_line = [@client] + (@client) start_time_in_seconds = Time.new.to_i env = {} Open3.pipeline_r([env, *drndump_command_line], [env, *client_command_line]) do |last_stdout, thread| last_stdout.each do |output| if block_given? = nil if output =~ MESSAGES_PER_SECOND_MATCHER = $1.to_f end yield(:progress => report_progress(start_time_in_seconds), :output => output, :messages_per_second => ) end end end end |
#can_report_remaining_time? ⇒ Boolean
84 85 86 87 |
# File 'lib/droonga/data_absorber.rb', line 84 def can_report_remaining_time? required_time_in_seconds != Droonga::DataAbsorber::TIME_UNKNOWN and required_time_in_seconds > 0 end |
#destination_client ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/droonga/data_absorber.rb', line 125 def destination_client = { :host => @destination_host, :port => @port, :tag => @tag, :progocol => :droonga, :receiver_host => @receiver_host, :receiver_port => 0, } @destination_client ||= Droonga::Client.new() end |
#report_progress(start_time_in_seconds) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/droonga/data_absorber.rb', line 96 def report_progress(start_time_in_seconds) return nil unless can_report_remaining_time? elapsed_time = Time.new.to_i - start_time_in_seconds progress = elapsed_time.to_f / required_time_in_seconds progress = [(progress * 100).to_i, 100].min remaining_seconds = [required_time_in_seconds - elapsed_time, 0].max remaining_hours = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS remaining_minutes = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS remaining_time = sprintf("%02i:%02i:%02i", remaining_hours, remaining_minutes, remaining_seconds) "#{progress}% done (maybe #{remaining_time} remaining)" end |
#required_time_in_seconds ⇒ Object
89 90 91 |
# File 'lib/droonga/data_absorber.rb', line 89 def required_time_in_seconds @required_time_in_seconds ||= calculate_required_time_in_seconds end |
#source_client ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/droonga/data_absorber.rb', line 113 def source_client = { :host => @source_host, :port => @port, :tag => @tag, :progocol => :droonga, :receiver_host => @receiver_host, :receiver_port => 0, } @source_client ||= Droonga::Client.new() end |
#source_node_suspendable? ⇒ Boolean
137 138 139 |
# File 'lib/droonga/data_absorber.rb', line 137 def source_node_suspendable? (source_replica_hosts - [@source_host]).size > 1 end |