Class: Droonga::Session

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/droonga/session.rb

Instance Method Summary collapse

Constructor Details

#initialize(id, dispatcher, collector_runner, tasks, inputs) ⇒ Session

Returns a new instance of Session.



24
25
26
27
28
29
30
31
32
# File 'lib/droonga/session.rb', line 24

def initialize(id, dispatcher, collector_runner, tasks, inputs)
  @id = id
  @dispatcher = dispatcher
  @collector_runner = collector_runner
  @tasks = tasks
  @n_dones = 0
  @inputs = inputs
  logger.trace("initialized", :tasks => tasks, :inputs => inputs)
end

Instance Method Details

#done?Boolean

Returns:

  • (Boolean)


34
35
36
# File 'lib/droonga/session.rb', line 34

def done?
  @n_dones == @tasks.size
end

#finishObject



73
74
75
76
# File 'lib/droonga/session.rb', line 73

def finish
  @timeout_timer.detach if @timeout_timer
  @timeout_timer = nil
end

#need_result?Boolean

TODO: We don't have to wait results if no collection step is in the list of tasks, because:

  * Currently the "super step" mecahnism is not
    implemented yet.
  * So, results won't be used by other handlers directly.
    Results will be used only for the "response" for the
    sender.
  * So, if there is no collection step, no-one requires
    results and there is no need to wait for results.

 However, in the future after the "super step" mechanism
 is introduced, results can be used by other handlers
 even if there is no collection step.
 Then we must update this logic.

Returns:

  • (Boolean)


53
54
55
56
57
# File 'lib/droonga/session.rb', line 53

def need_result?
  @tasks.any? do |task|
    @collector_runner.collectable?("task" => task)
  end
end

#receive(name, value) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/droonga/session.rb', line 78

def receive(name, value)
  tasks = @inputs[name]
  logger.trace("receive: process response",
               :name => name, :value => value, :task => tasks)
  unless tasks
    #TODO: result arrived before its query
    return
  end
  tasks.each do |task|
    task["n_of_inputs"] += 1
    step = task["step"]
    command = step["type"]
    n_of_expects = step["n_of_expects"]
    message = {
      "task"=>task,
      "name"=>name,
      "value"=>value
    }
    @collector_runner.collect(message)
    return if task["n_of_inputs"] < n_of_expects
    #the task is done
    result = task["values"]
    post = step["post"]
    if post
      # XXX: It is just a workaround.
      # Remove me when super step is introduced.
      if result["errors"]
        reply_body = result
      elsif command == "search_gather"
        reply_body = result
      else
        reply_body = result["result"]
      end
      @dispatcher.reply("body" => reply_body)
    end
    send_to_descendantas(step["descendants"], result)
    @n_dones += 1
  end
end

#set_timeout(loop, timeout, &block) ⇒ Object



118
119
120
121
122
123
124
125
126
127
# File 'lib/droonga/session.rb', line 118

def set_timeout(loop, timeout, &block)
  @timeout_timer = Coolio::TimerWatcher.new(timeout)
  @timeout_timer.on_timer do
    @timeout_timer.detach
    @timeout_timer = nil
    report_timeout_error
    yield
  end
  loop.attach(@timeout_timer)
end

#startObject



59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/droonga/session.rb', line 59

def start
  tasks = @inputs[nil] || []
  logger.trace("start: no task!") if tasks.empty?
  tasks.each do |task|
    local_message = {
      "id"   => @id,
      "task" => task,
    }
    logger.trace("start: dispatching local message", :message => local_message)
    @dispatcher.process_local_message(local_message)
    @n_dones += 1
  end
end