Module: Cyclop

Extended by:
Cyclop
Included in:
Cyclop
Defined in:
lib/cyclop.rb,
lib/cyclop/job.rb,
lib/cyclop/action.rb,
lib/cyclop/worker.rb,
lib/cyclop/version.rb

Defined Under Namespace

Classes: Action, ActionQueueClash, DatabaseNotAvailable, Job, NoActionFound, Worker

Constant Summary collapse

VERSION =
"0.1.8"

Instance Method Summary collapse

Instance Method Details

#dbObject

Get ‘Mongo::DB` to use



26
27
28
# File 'lib/cyclop.rb', line 26

def db
  @db
end

#db=(db) ⇒ Object

Set which ‘Mongo::DB` to use



21
22
23
# File 'lib/cyclop.rb', line 21

def db=(db)
  @db = db
end

#failed(*args) ⇒ Object

Get failed ‘Cyclop::Job`s

Parameters:

* (Symbol, String) queues - list of queues to get a `Cyclop::Job` from. Defaults to all.
* (Hash) opts (defaults to: {}) - a customizable set of options.

Options Hash (opts):

* (String) :host - limit to `Cyclop::Job`s queued by this host.
* (Integer) :skip (0) - number of `Cyclop::Job`s to skip.
* (Integer) :limit (nil) - maximum number of `Cyclop::Job`s to return.

Returns an ‘Array` of failed `Cyclop::Job`



122
123
124
125
# File 'lib/cyclop.rb', line 122

def failed(*args)
  opts = extract_opts! args
  Cyclop::Job.failed({queues: args}.merge opts)
end

#hostObject

Get memoized host



31
32
33
# File 'lib/cyclop.rb', line 31

def host
  @host ||= Socket.gethostname
end

#master_idObject

Get a unique identifier for current process



36
37
38
# File 'lib/cyclop.rb', line 36

def master_id
  @master_id ||= "#{host}-#{Process.pid}-#{Thread.current}"
end

#next(*args) ⇒ Object

Get a ‘Cyclop::Job` to process

Parameters:

* (Symbol, String) queues - list of queues to get a `Cyclop::Job` from. Defaults to all.
* (Hash) opts (defaults to: {}) - a customizable set of options.

Options Hash (opts):

* (String) :host - limit to `Cyclop::Job`s queued by this host.

Returns a ‘Cyclop::Job` or `nil` if nothing to process



102
103
104
105
# File 'lib/cyclop.rb', line 102

def next(*args)
  opts = extract_opts! args
  Cyclop::Job.next({queues: args, locked_by: master_id}.merge opts)
end

#push(opts = {}) ⇒ Object

Queues a new job

Minimum usage:

  Cyclop.push queue: "refresh_cache"

With `:job_params`:

  # with an `Array`
  Cyclop.push queue: "email", job_params: ["1", :welcome]

  # with a `Hash`
  Cyclop.push queue: "email", job_params: {user_id: "1",
  type: "welcome"}

With `:delay`:

  # Will not perform the task before a delay of 60 seconds
  Cyclop.push queue: "email", delay: 60

With `:retries` and `:splay`:

  # Will mark the task as failed only after 3 retries
  Cyclop.push queue: "email", retries: 3

  # Will mark the task as failed only after 2 retries
  # and 30 seconds between each retry
  Cyclop.push queue: "email", retries: 2, splay: 30

Parameters:

* (Hash) opts (defaults to: {}) - a customizable set of options. The minimum required is :queue.

Options Hash (opts):

* (Symbol, String) :queue - name of the queue (required).
* (Array, Hash) :job_params (nil) - parameters to send to the `Cyclop::Job#perform` method.
* (Integer) :delay (0) - time to wait in `seconds` before the task should be performed.
* (Integer) :retries (0) - number of retries before the `Cyclop::Job` is marked as failed.
* (Integer) :splay (60) - time to wait in `seconds` between retry.
* (String) :host (Cyclop.host) - host under which the `Cyclop::Job` should be added.

Returns a ‘Cyclop::Job`



84
85
86
87
# File 'lib/cyclop.rb', line 84

def push(opts={})
  opts["created_by"] = opts.delete :host
  Cyclop::Job.create opts
end