Class: Droonga::DistributedCommandPlanner
- Inherits:
-
Object
- Object
- Droonga::DistributedCommandPlanner
show all
- Includes:
- Loggable
- Defined in:
- lib/droonga/distributed_command_planner.rb
Constant Summary
collapse
- REDUCE_SUM =
"sum"
- DEFAULT_LIMIT =
-1
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of DistributedCommandPlanner.
30
31
32
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/droonga/distributed_command_planner.rb', line 30
def initialize(dataset, source_message)
@dataset = dataset
@source_message = source_message
@key = nil
@outputs = []
@reducers = []
@gatherers = []
@processor = nil
plan_errors_handling
end
|
Instance Attribute Details
#key ⇒ Object
Returns the value of attribute key.
24
25
26
|
# File 'lib/droonga/distributed_command_planner.rb', line 24
def key
@key
end
|
Instance Method Details
#broadcast(options = {}) ⇒ Object
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/droonga/distributed_command_planner.rb', line 78
def broadcast(options={})
@processor = {
"command" => @source_message["type"],
"dataset" => @dataset.name,
"body" => options[:body] || @source_message["body"],
"type" => "broadcast",
"outputs" => [],
"replica" => options[:replica] || "random",
"slice" => options[:slice] || "all",
"post" => options[:write] || false,
}
end
|
#plan ⇒ Object
44
45
46
47
48
|
# File 'lib/droonga/distributed_command_planner.rb', line 44
def plan
steps = unified_reducers + unified_gatherers + [fixed_processor]
logger.debug("distribution plan", :steps => steps)
steps
end
|
#reduce(params = nil) ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# File 'lib/droonga/distributed_command_planner.rb', line 50
def reduce(params=nil)
return unless params
params.each do |name, reducer|
gatherer = nil
if reducer.is_a?(Hash) and reducer[:gather]
gatherer = reducer[:gather]
reducer = reducer[:reduce]
end
@reducers << reducer_message(reduce_command, name, reducer)
@gatherers << gatherer_message(gather_command, name, gatherer)
@outputs << name
end
end
|
#scatter(options = {}) ⇒ Object
64
65
66
67
68
69
70
71
72
73
74
75
76
|
# File 'lib/droonga/distributed_command_planner.rb', line 64
def scatter(options={})
@processor = {
"command" => @source_message["type"],
"dataset" => @dataset.name,
"body" => options[:body] || @source_message["body"],
"record" => options[:record],
"type" => "scatter",
"outputs" => [],
"replica" => options[:replica] || "all",
"slice" => options[:slice] || "all",
"post" => options[:write] || false,
}
end
|