Class: Droonga::Engine

Inherits:
Object
  • Object
show all
Includes:
Deferrable, Loggable
Defined in:
lib/droonga/engine.rb,
lib/droonga/engine/version.rb

Constant Summary collapse

VERSION =
"1.1.1"

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Deferrable

#wait_until_ready

Constructor Details

#initialize(params = {}) ⇒ Engine

Returns a new instance of Engine.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/droonga/engine.rb', line 39

def initialize(params={})
  @name = params[:name]
  @internal_name = params[:internal_name]
  @loop = params[:loop]
  @catalog = load_catalog
  @state = EngineState.new(:loop          => @loop,
                           :name          => @name,
                           :internal_name => @internal_name,
                           :catalog       => @catalog,
                           :internal_connection_lifetime =>
                             params[:internal_connection_lifetime])
  @cluster = Cluster.new(:loop    => @loop,
                         :catalog => @catalog,
                         :internal_connection_lifetime =>
                           params[:internal_connection_lifetime])

  @dispatcher = create_dispatcher
  @cluster.on_change = lambda do
    @dispatcher.refresh_node_reference
  end
end

Instance Attribute Details

#clusterObject (readonly)

Returns the value of attribute cluster.



37
38
39
# File 'lib/droonga/engine.rb', line 37

def cluster
  @cluster
end

Instance Method Details

#process(message) ⇒ Object



121
122
123
124
125
126
127
128
129
130
# File 'lib/droonga/engine.rb', line 121

def process(message)
  if message.include?("date")
    date = Time.parse(message["date"])
    if @last_message_timestamp.nil? or
         @last_message_timestamp < date
      @last_message_timestamp = date
    end
  end
  @dispatcher.process_message(message)
end

#refresh_self_referenceObject



116
117
118
119
# File 'lib/droonga/engine.rb', line 116

def refresh_self_reference
  @cluster.refresh_connection_for(@name)
  @state.forwarder.refresh_connection_for(@name)
end

#startObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/droonga/engine.rb', line 61

def start
  logger.trace("start: start")
  @state.on_ready = lambda do
    on_ready
    serf = Serf.new(@name.to_s)
    serf.set_tag(Serf::Tag.internal_node_name, @internal_name)
  end
  @state.on_failure = lambda do
    on_failure
  end
  @state.start
  @cluster.start
  @dispatcher.start
  @last_message_timestamp_observer = run_last_message_timestamp_observer
  logger.trace("start: done")
end

#stop_gracefullyObject



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/droonga/engine.rb', line 78

def stop_gracefully
  logger.trace("stop_gracefully: start")
  @last_message_timestamp_observer.stop
  Timestamp.last_message_timestamp = nil # to avoid old timestamp is used
  @cluster.shutdown
  on_finish = lambda do
    logger.trace("stop_gracefully: middle")
    @dispatcher.stop_gracefully do
      #XXX We must save last processed message timstamp
      #    based on forwarded/dispatched messages while
      #    "graceful stop" operations.
      save_last_message_timestamp
      @state.shutdown
      yield
      logger.trace("stop_gracefully: done")
    end
  end
  if @state.have_session?
    logger.trace("stop_gracefully: having sessions")
    @state.on_finish = on_finish
  else
    logger.trace("stop_gracefully: no session")
    on_finish.call
  end
end

#stop_immediatelyObject

It may be called after stop_gracefully.



105
106
107
108
109
110
111
112
113
114
# File 'lib/droonga/engine.rb', line 105

def stop_immediately
  logger.trace("stop_immediately: start")
  @last_message_timestamp_observer.stop
  Timestamp.last_message_timestamp = nil # to avoid old timestamp is used
  @dispatcher.stop_immediately
  save_last_message_timestamp
  @cluster.shutdown
  @state.shutdown
  logger.trace("stop_immediately: done")
end