Class: Droonga::DistributedCommandPlanner

Inherits:
Object
  • Object
show all
Defined in:
lib/droonga/distributed_command_planner.rb

Constant Summary collapse

REDUCE_SUM =
"sum"
DEFAULT_LIMIT =
-1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(dataset, source_message) ⇒ DistributedCommandPlanner

Returns a new instance of DistributedCommandPlanner.



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/droonga/distributed_command_planner.rb', line 26

def initialize(dataset, source_message)
  @dataset = dataset
  @source_message = source_message

  @key = nil
  @outputs = []

  @reducers = []
  @gatherers = []
  @processor = nil

  plan_errors_handling
end

Instance Attribute Details

#keyObject

Returns the value of attribute key.



20
21
22
# File 'lib/droonga/distributed_command_planner.rb', line 20

def key
  @key
end

Instance Method Details

#broadcast(options = {}) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/droonga/distributed_command_planner.rb', line 71

def broadcast(options={})
  processor = {
    "command" => @source_message["type"],
    "dataset" => @dataset.name,
    "body"    => options[:body] || @source_message["body"],
    "type"    => "broadcast",
    "outputs" => [],
    "replica" => "random"
  }
  if options[:write]
    processor["replica"] = "all"
    processor["post"]    = true
  end
  @processor = processor
end

#planObject



40
41
42
# File 'lib/droonga/distributed_command_planner.rb', line 40

def plan
  unified_reducers + unified_gatherers + [fixed_processor]
end

#reduce(params = nil) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/droonga/distributed_command_planner.rb', line 44

def reduce(params=nil)
  return unless params
  params.each do |name, reducer|
    gatherer = nil
    if reducer.is_a?(Hash) and reducer[:gather]
      gatherer = reducer[:gather]
      reducer = reducer[:reduce]
    end
    @reducers << reducer_message(reduce_command, name, reducer)
    @gatherers << gatherer_message(gather_command, name, gatherer)
    @outputs << name
  end
end

#scatter(record, options = {}) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/droonga/distributed_command_planner.rb', line 58

def scatter(record, options={})
  @processor = {
    "command" => @source_message["type"],
    "dataset" => @dataset.name,
    "body"    => options[:body] || @source_message["body"],
    "record"  => record,
    "type"    => "scatter",
    "outputs" => [],
    "replica" => "all",
    "post"    => true
  }
end