Class: ZooKeeper::RubyIO::Binding

Inherits:
Object
  • Object
show all
Includes:
Slf4r::Logger
Defined in:
lib/zkruby/rubyio.rb

Overview

Class connection

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBinding

Returns a new instance of Binding.



156
157
158
# File 'lib/zkruby/rubyio.rb', line 156

def initialize()
    @event_queue = Queue.new()
end

Instance Attribute Details

#sessionObject (readonly)

Returns the value of attribute session.



147
148
149
# File 'lib/zkruby/rubyio.rb', line 147

def session
  @session
end

Class Method Details

.available?Boolean

Returns:

  • (Boolean)


148
149
150
# File 'lib/zkruby/rubyio.rb', line 148

def self.available?
    true
end

.context {|Thread| ... } ⇒ Object

Yields:

  • (Thread)


152
153
154
# File 'lib/zkruby/rubyio.rb', line 152

def self.context(&context_block)
   yield Thread 
end

Instance Method Details

#close(&callback) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/zkruby/rubyio.rb', line 215

def close(&callback)
    op = AsyncOp.new(self,&callback)
    
    session.synchronize do
            session.close() do |error,response|
                op.resume(error,response)
            end
    end

    op
    
end

#connect(host, port, delay, timeout) ⇒ Object

session callback, IO thread



208
209
210
211
212
# File 'lib/zkruby/rubyio.rb', line 208

def connect(host,port,delay,timeout)
    sleep(delay)
    conn = Connection.new(host,port,timeout,session)
    session.synchronize() { session.prime_connection(conn) }
end

#event_thread?Boolean

Returns:

  • (Boolean)


241
242
243
# File 'lib/zkruby/rubyio.rb', line 241

def event_thread?
    Thread.current.equal?(@event_thread)
end

#invoke(*args) ⇒ Object



245
246
247
# File 'lib/zkruby/rubyio.rb', line 245

def invoke(*args)
    @event_queue.push(args)
end

#pop_event_queueObject



160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/zkruby/rubyio.rb', line 160

def pop_event_queue()
    queued = @event_queue.pop()
    return false  unless queued
    logger.debug { "Invoking #{queued[0]}" }
    callback,*args = queued
    callback.call(*args)
    logger.debug { "Completed #{queued[0]}" }
    return true
rescue Exception => ex
    logger.warn( "Exception in event thread", ex )
    return true
end

#queue_request(*args, &callback) ⇒ Object



228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/zkruby/rubyio.rb', line 228

def queue_request(*args,&callback)

    op = AsyncOp.new(self,&callback)
    
    session.synchronize do 
        session.queue_request(*args) do |error,response|
            op.resume(error,response)                    
        end
    end

    op
end

#start(client, session) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/zkruby/rubyio.rb', line 173

def start(client,session)
    @session = session
    @session.extend(MonitorMixin)

    # start the event thread
    @event_thread = Thread.new() do
        Thread.current[:name] = "ZooKeeper::RubyIO::EventLoop"

        # In this thread, the current client is always this client!
        Thread.current[ZooKeeper::CURRENT] = [client]
        loop do
            break unless pop_event_queue() 
        end
    end

    # and the read thread
    Thread.new() do
        begin
            Thread.current[:name] = "ZooKeeper::RubyIO::ReadLoop"
            conn = session.synchronize { session.start(); session.conn() } # will invoke connect 
            loop do
                break unless conn
                conn.read_loop()
                conn =  session.synchronize { session.disconnected(); session.conn() }
            end
            #event of death
            logger.debug("Pushing nil (event of death) to event queue")
            @event_queue.push(nil)
        rescue Exception => ex
            logger.error( "Exception in session thread", ex )
        end
    end
end