Class: CouchChanges

Inherits:
Object
  • Object
show all
Defined in:
lib/couchchanges.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ CouchChanges

Returns a new instance of CouchChanges.



6
7
8
9
10
# File 'lib/couchchanges.rb', line 6

def initialize options={}
  @options = options.dup
  @uri = URI.parse(@options.delete(:url) + "/_changes")
  @last_seq = 0
end

Instance Method Details

#callbacks(hash) ⇒ Object



79
80
81
82
83
84
85
86
# File 'lib/couchchanges.rb', line 79

def callbacks hash
  @change.call hash if @change
  if hash["deleted"]
    @delete.call hash if @delete
  else
    @update.call hash if @update
  end
end

#change(&block) ⇒ Object



12
13
14
# File 'lib/couchchanges.rb', line 12

def change &block
  block ? @change = block : @change
end

#delete(&block) ⇒ Object



20
21
22
# File 'lib/couchchanges.rb', line 20

def delete &block
  block ? @delete = block : @delete
end

#disconnect(&block) ⇒ Object



24
25
26
# File 'lib/couchchanges.rb', line 24

def disconnect &block
  block ? @disconnect = block : @disconnect
end

#disconnectedObject



68
69
70
71
72
73
74
75
76
77
# File 'lib/couchchanges.rb', line 68

def disconnected
  if @disconnect
    @disconnect.call @last_seq
  else
    EM.add_timer(@options[:reconnect]) {
      @options[:since] = @last_seq
      listen
    }
  end
end

#handle(line) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/couchchanges.rb', line 54

def handle line
  return if line.chomp.empty?

  hash = JSON.parse(line)
  if hash["last_seq"]
    disconnected
  else
    hash["rev"] = hash.delete("changes")[0]["rev"]
    @last_seq = hash["seq"]

    callbacks hash
  end
end

#http!Object

REFACTOR!



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/couchchanges.rb', line 42

def http!
  options = {
    :timeout => 0,
    :query   => @options.merge({:feed => "continuous"})
  }
  if @uri.user
    options[:head] = {'authorization' => [@uri.user, @uri.password]}
  end
  
  EM::HttpRequest.new(@uri.to_s).get(options)
end

#listenObject



28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/couchchanges.rb', line 28

def listen
  @http = http!
  buffer = ""
  @http.stream  {|chunk|
    buffer += chunk
    while line = buffer.slice!(/.+\r?\n/)
      handle line
    end
  }
  @http.errback { disconnected }
  @http
end

#update(&block) ⇒ Object



16
17
18
# File 'lib/couchchanges.rb', line 16

def update &block
  block ? @update = block : @update
end