Class: Jp::Server::Handler

Inherits:
Object
  • Object
show all
Includes:
MongoConnection, Pools
Defined in:
lib/rb/jp/server/handler.rb

Overview

Implementation class.

Does not include Thrift code, etc.

Direct Known Subclasses

Server

Instance Attribute Summary collapse

Attributes included from Pools

#pools

Attributes included from MongoConnection

#database

Instance Method Summary collapse

Methods included from MongoConnection

#connect_to_mongo

Constructor Details

#initialize(options = {}) ⇒ Handler

Returns a new instance of Handler.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/rb/jp/server/handler.rb', line 14

def initialize options = {}
  # Option defaults
  defaults = {
    :mongo_retry_attempts => 10,
    :mongo_retry_delay    => 1,
  }
  options = defaults.merge(options)
  # Copy with/deal with options
  @retry_attempts = options[:mongo_retry_attempts]
  @retry_delay    = options[:mongo_retry_delay]

  load_pools(options)
  connect_to_mongo(options)
end

Instance Attribute Details

#retry_attemptsObject (readonly)

Returns the value of attribute retry_attempts.



13
14
15
# File 'lib/rb/jp/server/handler.rb', line 13

def retry_attempts
  @retry_attempts
end

#retry_delayObject (readonly)

Returns the value of attribute retry_delay.



13
14
15
# File 'lib/rb/jp/server/handler.rb', line 13

def retry_delay
  @retry_delay
end

Instance Method Details

#acquire(pool) ⇒ Object

Raises:



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/rb/jp/server/handler.rb', line 42

def acquire pool
  raise NoSuchPool.new unless pools.member? pool
  now = Time.new.to_i
  doc = {}
  begin
    rescue_connection_failure do
      doc = database[pool].find_and_modify(
        query: {
          'locked' => false
        },
        update: {
          '$set' => {
            'locked'       => now,
            'locked_until' => now + pools[pool][:timeout],
          },
        }
      )
    end
  rescue Mongo::OperationFailure => e
    raise EmptyPool
  end
  job = Job.new
  job.message = doc['message']
  job.id = doc['_id'].to_s
  job
end

#add(pool, message) ⇒ Object

Raises:



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/rb/jp/server/handler.rb', line 29

def add pool, message
  raise NoSuchPool.new unless pools.member? pool

  doc = {
    'message'     => message,
    'locked'       => false,
  }

  rescue_connection_failure do
    database[pool].insert doc
  end
end

#purge(pool, id) ⇒ Object

Raises:



69
70
71
72
73
74
# File 'lib/rb/jp/server/handler.rb', line 69

def purge pool, id
  raise NoSuchPool.new unless pools.member? pool
  rescue_connection_failure do
    database[pool].remove _id: BSON::ObjectId(id)
  end
end

#rescue_connection_failureObject

Ensure retry upon failure Based on code from www.mongodb.org/display/DOCS/Replica+Pairs+in+Ruby



78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/rb/jp/server/handler.rb', line 78

def rescue_connection_failure
  success = false
  retries = 0
  while !success
    begin
      yield
      success = true
    rescue Mongo::ConnectionFailure => ex
      retries += 1
      raise ex if retries >= @retry_attempts
      sleep(@retry_delay)
    end
  end
end