Class: ThinkingSphinx::Deltas::ResqueDelta::DeltaJob
- Inherits:
-
Object
- Object
- ThinkingSphinx::Deltas::ResqueDelta::DeltaJob
- Extended by:
- Resque::Plugins::LockTimeout
- Defined in:
- lib/thinking_sphinx/deltas/resque_delta/delta_job.rb
Overview
A simple job class that processes a given index.
Direct Known Subclasses
Class Method Summary collapse
-
.around_perform_lock1(*args) ⇒ Object
This allows us to have a concurrency safe version of ts-delayed-delta’s duplicates_exist:.
-
.lock_failed(*args) ⇒ Object
Try again later if lock is in use.
-
.perform(index) ⇒ Object
Runs Sphinx’s indexer tool to process the index.
Class Method Details
.around_perform_lock1(*args) ⇒ Object
This allows us to have a concurrency safe version of ts-delayed-delta’s duplicates_exist:
github.com/freelancing-god/ts-delayed-delta/blob/master/lib/thinkin g_sphinx/deltas/delayed_delta/job.rb#L47
The name of this method ensures that it runs within around_perform_lock.
We’ve leveraged resque-lock-timeout to ensure that only one DeltaJob is running at a time. Now, this around filter essentially ensures that only one DeltaJob of each index type can sit at the queue at once. If the queue has more than one, lrem will clear the rest off.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/thinking_sphinx/deltas/resque_delta/delta_job.rb', line 69 def self.around_perform_lock1(*args) # Remove all other instances of this job (with the same args) from the # queue. Uses LREM (http://code.google.com/p/redis/wiki/LremCommand) which # takes the form: "LREM key count value" and if count == 0 removes all # instances of value from the list. redis_job_value = Resque.encode(:class => self.to_s, :args => args) Resque.redis.lrem("queue:#{@queue}", 0, redis_job_value) # Grab the subset of flag as deleted document ids to work on core_index = ThinkingSphinx::Deltas::ResqueDelta::IndexUtils.delta_to_core(*args) ThinkingSphinx::Deltas::ResqueDelta::FlagAsDeletedSet.get_subset_for_processing(core_index) yield # Clear processing set ThinkingSphinx::Deltas::ResqueDelta::FlagAsDeletedSet.clear_processing(core_index) end |
.lock_failed(*args) ⇒ Object
Try again later if lock is in use.
47 48 49 |
# File 'lib/thinking_sphinx/deltas/resque_delta/delta_job.rb', line 47 def self.lock_failed(*args) Resque.enqueue(self, *args) end |
.perform(index) ⇒ Object
Runs Sphinx’s indexer tool to process the index. Currently assumes Sphinx is running.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/thinking_sphinx/deltas/resque_delta/delta_job.rb', line 16 def self.perform(index) return if skip?(index) config = ThinkingSphinx::Configuration.instance # Delta Index output = `#{config.bin_path}#{config.indexer_binary_name} --config #{config.config_file} --rotate #{index}` puts output unless ThinkingSphinx.suppress_delta_output? # Flag As Deleted return unless ThinkingSphinx.sphinx_running? index = ThinkingSphinx::Deltas::ResqueDelta::IndexUtils.delta_to_core(index) # Get the document ids we've saved flag_as_deleted_ids = ThinkingSphinx::Deltas::ResqueDelta::FlagAsDeletedSet.processing_members(index) unless flag_as_deleted_ids.empty? # Filter out the ids that aren't present in sphinx flag_as_deleted_ids = filter_flag_as_deleted_ids(flag_as_deleted_ids, index) unless flag_as_deleted_ids.empty? # Each hash element should be of the form { id => [1] } flag_hash = Hash[*flag_as_deleted_ids.collect {|id| [id, [1]] }.flatten(1)] config.client.update(index, ['sphinx_deleted'], flag_hash) end end end |