Module: Faktory::Job::ClassMethods

Defined in:
lib/faktory/job.rb

Instance Method Summary collapse

Instance Method Details

#client_push(item) ⇒ Object

:nodoc:



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/faktory/job.rb', line 105

def client_push(item) # :nodoc:
  pool = Thread.current[:faktory_via_pool] || get_faktory_options['pool'.freeze] || Faktory.server_pool
  item = get_faktory_options.merge(item)
  # stringify
  item.keys.each do |key|
    item[key.to_s] = item.delete(key)
  end
  item["jid"] ||= SecureRandom.hex(12)
  item["queue"] ||= "default"

  Faktory.client_middleware.invoke(item, pool) do
    pool.with do |c|
      c.push(item)
    end
  end
end

#faktory_class_attribute(*attrs) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/faktory/job.rb', line 122

def faktory_class_attribute(*attrs)
  instance_reader = true
  instance_writer = true

  attrs.each do |name|
    singleton_class.instance_eval do
      undef_method(name) if method_defined?(name) || private_method_defined?(name)
    end
    define_singleton_method(name) { nil }

    ivar = "@#{name}"

    singleton_class.instance_eval do
      m = "#{name}="
      undef_method(m) if method_defined?(m) || private_method_defined?(m)
    end
    define_singleton_method("#{name}=") do |val|
      singleton_class.class_eval do
        undef_method(name) if method_defined?(name) || private_method_defined?(name)
        define_method(name) { val }
      end

      if singleton_class?
        class_eval do
          undef_method(name) if method_defined?(name) || private_method_defined?(name)
          define_method(name) do
            if instance_variable_defined? ivar
              instance_variable_get ivar
            else
              singleton_class.send name
            end
          end
        end
      end
      val
    end

    if instance_reader
      undef_method(name) if method_defined?(name) || private_method_defined?(name)
      define_method(name) do
        if instance_variable_defined?(ivar)
          instance_variable_get ivar
        else
          self.class.public_send name
        end
      end
    end

    if instance_writer
      m = "#{name}="
      undef_method(m) if method_defined?(m) || private_method_defined?(m)
      attr_writer name
    end
  end
end

#faktory_options(opts = {}) ⇒ Object

Allows customization of Faktory features for this type of Job. Legal options:

queue - use a named queue for this Job, default 'default'
retry - enable automatic retry for this Job, *Integer* count, default 25
backtrace - whether to save the error backtrace in the job payload to display in web UI,
   an integer number of lines to save, default *0*


96
97
98
99
# File 'lib/faktory/job.rb', line 96

def faktory_options(opts={})
  # stringify
  self.faktory_options_hash = get_faktory_options.merge(Hash[opts.map{|k, v| [k.to_s, v]}])
end

#get_faktory_optionsObject

:nodoc:



101
102
103
# File 'lib/faktory/job.rb', line 101

def get_faktory_options # :nodoc:
  self.faktory_options_hash ||= Faktory.default_job_options
end

#perform_async(*args) ⇒ Object



70
71
72
# File 'lib/faktory/job.rb', line 70

def perform_async(*args)
  client_push('jobtype'.freeze => self, 'args'.freeze => args)
end

#perform_in(interval, *args) ⇒ Object Also known as: perform_at

interval must be a timestamp, numeric or something that acts

numeric (like an activesupport time interval).


76
77
78
79
80
81
82
83
84
# File 'lib/faktory/job.rb', line 76

def perform_in(interval, *args)
  int = interval.to_f
  now = Time.now.to_f
  ts = (int < 1_000_000_000 ? now + int : int)
  item = { 'jobtype'.freeze => self, 'args'.freeze => args }

  item['at'] = Time.at(ts).utc.to_datetime.rfc3339(9) if ts > now
  client_push(item)
end

#set(options) ⇒ Object



66
67
68
# File 'lib/faktory/job.rb', line 66

def set(options)
  Setter.new(options.merge!('jobtype'.freeze => self))
end