Class: Droonga::Dispatcher

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/droonga/dispatcher.rb

Defined Under Namespace

Classes: MissingDatasetParameter, SessionPlanner, UnknownDataset, UnknownType

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine_state, catalog) ⇒ Dispatcher

Returns a new instance of Dispatcher.



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/droonga/dispatcher.rb', line 51

def initialize(engine_state, catalog)
  @engine_state = engine_state
  @forwarder = @engine_state.forwarder
  @replier = @engine_state.replier
  @catalog = catalog
  @adapter_runners = create_adapter_runners
  @farm = Farm.new(@engine_state.name, @catalog, @engine_state.loop,
                   :engine_state => @engine_state,
                   :dispatcher => self,
                   :forwarder  => @forwarder)
  @collector_runners = create_collector_runners
  @step_runners = create_step_runners
end

Instance Attribute Details

#engine_stateObject (readonly)

Returns the value of attribute engine_state.



49
50
51
# File 'lib/droonga/dispatcher.rb', line 49

def engine_state
  @engine_state
end

Instance Method Details

#dispatch(message, destination) ⇒ Object



178
179
180
181
182
183
184
185
186
# File 'lib/droonga/dispatcher.rb', line 178

def dispatch(message, destination)
  if local?(destination)
    process_internal_message(message)
  else
    @forwarder.forward(@message.merge("body" => message),
                       "type" => "dispatcher",
                       "to"   => destination)
  end
end

#dispatch_steps(steps) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/droonga/dispatcher.rb', line 188

def dispatch_steps(steps)
  id = @engine_state.generate_id

  destinations = []
  steps.each do |step|
    dataset = @catalog.dataset(step["dataset"])
    if dataset
      routes = dataset.compute_routes(step, @engine_state.live_nodes)
      step["routes"] = routes
    else
      step["routes"] ||= [id]
    end
    destinations += step["routes"].collect do |route|
      farm_path(route)
    end
  end

  dispatch_message = { "id" => id, "steps" => steps }
  destinations.uniq.each do |destination|
    dispatch(dispatch_message, destination)
  end
end

#forward(message, destination) ⇒ Object



116
117
118
119
120
# File 'lib/droonga/dispatcher.rb', line 116

def forward(message, destination)
  logger.trace("forward start")
  @forwarder.forward(message, destination)
  logger.trace("forward done")
end

#local?(route) ⇒ Boolean

Returns:

  • (Boolean)


228
229
230
# File 'lib/droonga/dispatcher.rb', line 228

def local?(route)
  @engine_state.local_route?(route)
end

#process_internal_message(message) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/droonga/dispatcher.rb', line 155

def process_internal_message(message)
  id = message["id"]
  session = @engine_state.find_session(id)
  if session
    session.receive(message["input"], message["value"])
  else
    steps = message["steps"]
    if steps
      session_planner = SessionPlanner.new(@engine_state, steps)
      dataset = message["dataset"] || @message["dataset"]
      collector_runner = @collector_runners[dataset]
      session = session_planner.create_session(id, self, collector_runner)
      @engine_state.register_session(id, session)
    else
      logger.error("no steps error: id=#{id}, message=#{message}")
      return
      #todo: take cases receiving result before its query into account
    end
    session.start
  end
  @engine_state.unregister_session(id) if session.done?
end

#process_local_message(local_message) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/droonga/dispatcher.rb', line 211

def process_local_message(local_message)
  task = local_message["task"]
  slice_name = task["route"]
  step = task["step"]
  command = step["command"]
  descendants = {}
  step["descendants"].each do |name, routes|
    descendants[name] = routes.collect do |route|
      farm_path(route)
    end
  end
  local_message["descendants"] = descendants
  farm_message = @message.merge("body" => local_message,
                                "type" => command)
  @farm.process(slice_name, farm_message)
end

#process_message(message) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/droonga/dispatcher.rb', line 96

def process_message(message)
  @message = message
  if message["type"] == "dispatcher"
    process_internal_message(message["body"])
  else
    begin
      assert_valid_message(message)
      process_input_message(message)
    rescue ErrorMessage => error
      reply("statusCode" => error.status_code,
            "body"       => error.response_body)
    rescue StandardError, LoadError, SyntaxError => error
      logger.exception("failed to process input message", error)
      formatted_error = ErrorMessages::InternalServerError.new("Unknown internal error")
      reply("statusCode" => formatted_error.status_code,
            "body"       => formatted_error.response_body)
    end
  end
end

#reply(message) ⇒ void

This method returns an undefined value.

Replies response to replyTo.

Parameters:

  • message (Hash)

    The message to be replied. See Replier#reply for available keys.

    The key-value pairs in request message are used as the default key-value pairs. For example, if the passed message doesn't include id key, id key's value is used in request message.

See Also:



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/droonga/dispatcher.rb', line 134

def reply(message)
  adapted_message = @message.merge(message)
  adapter_runner = @adapter_runners[adapted_message["dataset"]]
  if adapter_runner
    adapted_message = adapter_runner.adapt_output(adapted_message)
  end
  if adapted_message["replyTo"].nil?
    status_code = adapted_message["statusCode"] || 200
    if status_code != 200
      dataset = adapted_message["dataset"]
      body = adapted_message["body"] || {}
      name = body["name"] || "Unknown"
      message = body["message"] || "unknown error"
      logger.error("orphan error: " +
                     "<#{dataset}>[#{name}](#{status_code}): #{message}")
    end
  else
    @replier.reply(adapted_message)
  end
end

#startObject



65
66
67
68
69
70
# File 'lib/droonga/dispatcher.rb', line 65

def start
  @farm.on_ready = lambda do
    @engine_state.on_ready
  end
  @farm.start
end

#stop_gracefully(&on_stop) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
# File 'lib/droonga/dispatcher.rb', line 72

def stop_gracefully(&on_stop)
  logger.trace("stop_gracefully: start")
  @collector_runners.each_value do |collector_runner|
    collector_runner.shutdown
  end
  @adapter_runners.each_value do |adapter_runner|
    adapter_runner.shutdown
  end
  @farm.stop_gracefully(&on_stop)
  logger.trace("stop_gracefully: done")
end

#stop_immediatelyObject



84
85
86
87
88
89
90
91
92
93
94
# File 'lib/droonga/dispatcher.rb', line 84

def stop_immediately
  logger.trace("stop_immediately: start")
  @collector_runners.each_value do |collector_runner|
    collector_runner.shutdown
  end
  @adapter_runners.each_value do |adapter_runner|
    adapter_runner.shutdown
  end
  @farm.shutdown
  logger.trace("stop_immediately: done")
end

#write_step?(step) ⇒ Boolean

Returns:

  • (Boolean)


232
233
234
235
236
237
238
239
240
241
242
# File 'lib/droonga/dispatcher.rb', line 232

def write_step?(step)
  return false unless step["dataset"]

  step_runner = @step_runners[step["dataset"]]
  return false unless step_runner

  step_definition = step_runner.find(step["command"])
  return false unless step_definition

  step_definition.write?
end