Class: SidekiqUniqueJobs::UniqueArgs

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/sidekiq_unique_jobs/unique_args.rb

Overview

This class exists to be testable and the entire api should be considered private rubocop:disable ClassLength

Constant Summary collapse

CLASS_NAME =
'SidekiqUniqueJobs::UniqueArgs'

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(item) ⇒ UniqueArgs

Returns a new instance of UniqueArgs.


20
21
22
23
24
25
26
27
28
# File 'lib/sidekiq_unique_jobs/unique_args.rb', line 20

def initialize(item)
  Sidekiq::Logging.with_context(CLASS_NAME) do
    @item = item
    @worker_class            ||= worker_class_constantize(@item[CLASS_KEY])
    @item[UNIQUE_PREFIX_KEY] ||= unique_prefix
    @item[UNIQUE_ARGS_KEY]     = unique_args(@item[ARGS_KEY])
    @item[UNIQUE_DIGEST_KEY]   = unique_digest
  end
end

Class Method Details

.digest(item) ⇒ Object


16
17
18
# File 'lib/sidekiq_unique_jobs/unique_args.rb', line 16

def self.digest(item)
  new(item).unique_digest
end

Instance Method Details

#digestable_hashObject


44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/sidekiq_unique_jobs/unique_args.rb', line 44

def digestable_hash
  @item.slice(CLASS_KEY, QUEUE_KEY, UNIQUE_ARGS_KEY).tap do |hash|
    if unique_on_all_queues?
      logger.debug { "#{__method__} deleting queue: #{@item[QUEUE_KEY]}" }
      hash.delete(QUEUE_KEY)
    end
    if unique_across_workers?
      logger.debug { "#{__method__} deleting class: #{@item[CLASS_KEY]}" }
      hash.delete(CLASS_KEY)
    end
  end
end

#filter_by_proc(args) ⇒ Object


115
116
117
118
119
120
121
122
123
124
# File 'lib/sidekiq_unique_jobs/unique_args.rb', line 115

def filter_by_proc(args)
  if unique_args_method.nil?
    logger.warn { "#{__method__} : unique_args_method is nil. Returning (#{args})" }
    return args
  end

  filter_args = unique_args_method.call(args)
  logger.debug { "#{__method__} : #{args} -> #{filter_args}" }
  filter_args
end

#filter_by_symbol(args) ⇒ Object


126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/sidekiq_unique_jobs/unique_args.rb', line 126

def filter_by_symbol(args)
  unless @worker_class.respond_to?(unique_args_method)
    logger.warn do
      "#{__method__} : #{@worker_class} does not respond to #{unique_args_method}). Returning (#{args})"
    end
    return args
  end

  filter_args = @worker_class.send(unique_args_method, args)
  logger.debug { "#{__method__} : #{unique_args_method}(#{args}) => #{filter_args}" }
  filter_args
rescue ArgumentError => ex
  logger.fatal "#{__method__} : #{@worker_class}'s #{unique_args_method} needs at least one argument"
  logger.fatal ex
  args
end

#filtered_args(args) ⇒ Object

Filters unique arguments by proc or symbol returns provided arguments for other configurations


99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/sidekiq_unique_jobs/unique_args.rb', line 99

def filtered_args(args)
  return args if args.empty?
  json_args = Normalizer.jsonify(args)
  logger.debug { "#filtered_args #{args} => #{json_args}" }

  case unique_args_method
  when Proc
    filter_by_proc(json_args)
  when Symbol
    filter_by_symbol(json_args)
  else
    logger.debug { "#{__method__} arguments not filtered (using all arguments for uniqueness)" }
    json_args
  end
end

#sidekiq_worker_class?Boolean

Returns:

  • (Boolean)

88
89
90
91
92
93
94
95
# File 'lib/sidekiq_unique_jobs/unique_args.rb', line 88

def sidekiq_worker_class?
  if @worker_class.respond_to?(:get_sidekiq_options)
    true
  else
    logger.debug { "#{__method__} #{@worker_class} does not respond to :get_sidekiq_options" }
    nil
  end
end

#unique_across_workers?Boolean

Returns:

  • (Boolean)

78
79
80
81
# File 'lib/sidekiq_unique_jobs/unique_args.rb', line 78

def unique_across_workers?
  return unless sidekiq_worker_class?
  @item[UNIQUE_ACROSS_WORKERS_KEY] || @worker_class.get_sidekiq_options[UNIQUE_ACROSS_WORKERS_KEY]
end

#unique_args(args) ⇒ Object


57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/sidekiq_unique_jobs/unique_args.rb', line 57

def unique_args(args)
  if unique_args_enabled?
    filtered_args(args)
  else
    logger.debug { "#{__method__} : unique arguments disabled" }
    args
  end
rescue NameError => ex
  logger.error "#{__method__}(#{args}) : failed with (#{ex.message})"
  logger.error ex

  raise if config.raise_unique_args_errors

  args
end

#unique_args_enabled?Boolean

Returns:

  • (Boolean)

83
84
85
86
# File 'lib/sidekiq_unique_jobs/unique_args.rb', line 83

def unique_args_enabled?
  return unless sidekiq_worker_class?
  unique_args_method # && !unique_args_method.is_a?(Boolean)
end

#unique_args_methodObject


143
144
145
146
147
# File 'lib/sidekiq_unique_jobs/unique_args.rb', line 143

def unique_args_method
  @unique_args_method ||= @worker_class.get_sidekiq_options[UNIQUE_ARGS_KEY] if sidekiq_worker_class?
  @unique_args_method ||= :unique_args if @worker_class.respond_to?(:unique_args)
  @unique_args_method ||= Sidekiq.default_worker_options.stringify_keys[UNIQUE_ARGS_KEY]
end

#unique_digestObject


30
31
32
33
34
35
36
37
# File 'lib/sidekiq_unique_jobs/unique_args.rb', line 30

def unique_digest
  @unique_digest ||= begin
    digest = Digest::MD5.hexdigest(Sidekiq.dump_json(digestable_hash))
    digest = "#{unique_prefix}:#{digest}"
    logger.debug { "#{__method__} : #{digestable_hash} into #{digest}" }
    digest
  end
end

#unique_on_all_queues?Boolean

Returns:

  • (Boolean)

73
74
75
76
# File 'lib/sidekiq_unique_jobs/unique_args.rb', line 73

def unique_on_all_queues?
  return unless sidekiq_worker_class?
  @item[UNIQUE_ON_ALL_QUEUES_KEY] || @worker_class.get_sidekiq_options[UNIQUE_ON_ALL_QUEUES_KEY]
end

#unique_prefixObject


39
40
41
42
# File 'lib/sidekiq_unique_jobs/unique_args.rb', line 39

def unique_prefix
  return config.unique_prefix unless sidekiq_worker_class?
  @worker_class.get_sidekiq_options[UNIQUE_PREFIX_KEY] || config.unique_prefix
end