Class: Droonga::DataAbsorberClient
- Inherits:
-
Object
- Object
- Droonga::DataAbsorberClient
- Includes:
- Loggable
- Defined in:
- lib/droonga/data_absorber_client.rb
Defined Under Namespace
Classes: DestinationEqualsToSource, EmptyBody, EmptyResponse
Constant Summary collapse
- DEFAULT_MESSAGES_PER_SECOND =
100
- DEFAULT_PROGRESS_INTERVAL_SECONDS =
3
- DEFAULT_HOST =
NodeName::DEFAULT_HOST
- DEFAULT_PORT =
NodeName::DEFAULT_PORT
- DEFAULT_TAG =
NodeName::DEFAULT_TAG
- DEFAULT_DATASET =
Catalog::Dataset::DEFAULT_NAME
Instance Attribute Summary collapse
-
#dataset ⇒ Object
readonly
Returns the value of attribute dataset.
-
#error_message ⇒ Object
readonly
Returns the value of attribute error_message.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#messages_per_second ⇒ Object
readonly
Returns the value of attribute messages_per_second.
-
#params ⇒ Object
readonly
Returns the value of attribute params.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#progress_interval_seconds ⇒ Object
readonly
Returns the value of attribute progress_interval_seconds.
-
#source_dataset ⇒ Object
readonly
Returns the value of attribute source_dataset.
-
#source_host ⇒ Object
readonly
Returns the value of attribute source_host.
-
#source_port ⇒ Object
readonly
Returns the value of attribute source_port.
-
#source_tag ⇒ Object
readonly
Returns the value of attribute source_tag.
-
#tag ⇒ Object
readonly
Returns the value of attribute tag.
Instance Method Summary collapse
- #empty_destination? ⇒ Boolean
-
#initialize(params) ⇒ DataAbsorberClient
constructor
A new instance of DataAbsorberClient.
- #run ⇒ Object
- #source_node_suspendable? ⇒ Boolean
Constructor Details
#initialize(params) ⇒ DataAbsorberClient
Returns a new instance of DataAbsorberClient.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/droonga/data_absorber_client.rb', line 53 def initialize(params) @params = params @messages_per_second = @params[:messages_per_second] || DEFAULT_MESSAGES_PER_SECOND @progress_interval_seconds = @params[:progress_interval_seconds] || DEFAULT_PROGRESS_INTERVAL_SECONDS @target_role = @params[:target_role] @host = @params[:host] || DEFAULT_HOST @port = @params[:port] || DEFAULT_PORT @tag = @params[:tag] || DEFAULT_TAG @dataset = @params[:dataset] || DEFAULT_DATASET @source_host = @params[:source_host] || @host || DEFAULT_HOST @source_port = @params[:source_port] || @port || DEFAULT_PORT @source_tag = @params[:source_tag] || @tag || DEFAULT_TAG @source_dataset = @params[:source_dataset] || @dataset || DEFAULT_DATASET @receiver_host = @params[:receiver_host] || @host @receiver_port = @params[:receiver_port] || 0 @client_options = @params[:client_options] || {} @error_message = nil validate_params end |
Instance Attribute Details
#dataset ⇒ Object (readonly)
Returns the value of attribute dataset.
48 49 50 |
# File 'lib/droonga/data_absorber_client.rb', line 48 def dataset @dataset end |
#error_message ⇒ Object (readonly)
Returns the value of attribute error_message.
51 52 53 |
# File 'lib/droonga/data_absorber_client.rb', line 51 def @error_message end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
48 49 50 |
# File 'lib/droonga/data_absorber_client.rb', line 48 def host @host end |
#messages_per_second ⇒ Object (readonly)
Returns the value of attribute messages_per_second.
49 50 51 |
# File 'lib/droonga/data_absorber_client.rb', line 49 def @messages_per_second end |
#params ⇒ Object (readonly)
Returns the value of attribute params.
47 48 49 |
# File 'lib/droonga/data_absorber_client.rb', line 47 def params @params end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
48 49 50 |
# File 'lib/droonga/data_absorber_client.rb', line 48 def port @port end |
#progress_interval_seconds ⇒ Object (readonly)
Returns the value of attribute progress_interval_seconds.
49 50 51 |
# File 'lib/droonga/data_absorber_client.rb', line 49 def progress_interval_seconds @progress_interval_seconds end |
#source_dataset ⇒ Object (readonly)
Returns the value of attribute source_dataset.
50 51 52 |
# File 'lib/droonga/data_absorber_client.rb', line 50 def source_dataset @source_dataset end |
#source_host ⇒ Object (readonly)
Returns the value of attribute source_host.
50 51 52 |
# File 'lib/droonga/data_absorber_client.rb', line 50 def source_host @source_host end |
#source_port ⇒ Object (readonly)
Returns the value of attribute source_port.
50 51 52 |
# File 'lib/droonga/data_absorber_client.rb', line 50 def source_port @source_port end |
#source_tag ⇒ Object (readonly)
Returns the value of attribute source_tag.
50 51 52 |
# File 'lib/droonga/data_absorber_client.rb', line 50 def source_tag @source_tag end |
#tag ⇒ Object (readonly)
Returns the value of attribute tag.
48 49 50 |
# File 'lib/droonga/data_absorber_client.rb', line 48 def tag @tag end |
Instance Method Details
#empty_destination? ⇒ Boolean
131 132 133 |
# File 'lib/droonga/data_absorber_client.rb', line 131 def empty_destination? table_names_in_destination_node.empty? end |
#run ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/droonga/data_absorber_client.rb', line 82 def run n_absorbers = 0 = { "type" => "system.absorb-data", "dataset" => @dataset, "body" => { "host" => @source_host, "port" => @source_port, "tag" => @source_tag, "dataset" => @source_dataset, "messagesPerSecond" => @messages_per_second, "progressIntervalSeconds" => @progress_interval_seconds, "targetRole" => @target_role, }, } client = Droonga::Client.new() client.subscribe() do || case when Droonga::Client::Error client.close @error_message = .to_s else case ["type"] when "system.absorb-data.result", "system.absorb-data.error" if ["statusCode"] != 200 client.close error = ["body"] @error_message = "#{error['name']}: #{error['message']}" end when "system.absorb-data.progress" body = ["body"] yield(:n_processed_messages => body["nProcessedMessages"], :percentage => body["percentage"], :message => body["message"]) when "system.absorb-data.start" n_absorbers += 1 when "system.absorb-data.end" n_absorbers -= 1 client.close if n_absorbers <= 0 end end end end |
#source_node_suspendable? ⇒ Boolean
127 128 129 |
# File 'lib/droonga/data_absorber_client.rb', line 127 def source_node_suspendable? (source_replica_hosts - [@source_host]).size >= 1 end |