Class: Droonga::DistributedCommandPlanner

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/droonga/distributed_command_planner.rb

Constant Summary collapse

REDUCE_SUM =
"sum"
DEFAULT_LIMIT =
-1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(dataset, source_message) ⇒ DistributedCommandPlanner

Returns a new instance of DistributedCommandPlanner.



30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/droonga/distributed_command_planner.rb', line 30

def initialize(dataset, source_message)
  @dataset = dataset
  @source_message = source_message

  @key = nil
  @outputs = []

  @reducers = []
  @gatherers = []
  @processor = nil

  plan_errors_handling
end

Instance Attribute Details

#keyObject

Returns the value of attribute key.



24
25
26
# File 'lib/droonga/distributed_command_planner.rb', line 24

def key
  @key
end

Instance Method Details

#broadcast(options = {}) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/droonga/distributed_command_planner.rb', line 78

def broadcast(options={})
  @processor = {
    "command" => @source_message["type"],
    "dataset" => @dataset.name,
    "body"    => options[:body] || @source_message["body"],
    "type"    => "broadcast",
    "outputs" => [],
    "replica" => options[:replica] || "random",
    "slice"   => options[:slice] || "all",
    "post"    => options[:write] || false,
  }
end

#planObject



44
45
46
47
48
# File 'lib/droonga/distributed_command_planner.rb', line 44

def plan
  steps = unified_reducers + unified_gatherers + [fixed_processor]
  logger.debug("distribution plan", :steps => steps)
  steps
end

#reduce(params = nil) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/droonga/distributed_command_planner.rb', line 50

def reduce(params=nil)
  return unless params
  params.each do |name, reducer|
    gatherer = nil
    if reducer.is_a?(Hash) and reducer[:gather]
      gatherer = reducer[:gather]
      reducer = reducer[:reduce]
    end
    @reducers << reducer_message(reduce_command, name, reducer)
    @gatherers << gatherer_message(gather_command, name, gatherer)
    @outputs << name
  end
end

#scatter(options = {}) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/droonga/distributed_command_planner.rb', line 64

def scatter(options={})
  @processor = {
    "command" => @source_message["type"],
    "dataset" => @dataset.name,
    "body"    => options[:body] || @source_message["body"],
    "record"  => options[:record],
    "type"    => "scatter",
    "outputs" => [],
    "replica" => options[:replica] || "all",
    "slice"   => options[:slice] || "all",
    "post"    => options[:write] || false,
  }
end