Module: Esse::Plugins::AsyncIndexing::RepositoryClassMethods

Defined in:
lib/esse/plugins/async_indexing.rb

Defined Under Namespace

Classes: AsyncIndexingJobValidator

Constant Summary collapse

DEFAULT_ASYNC_INDEXING_JOBS =
{
  import: ->(service:, repo:, operation:, ids:, **kwargs) {
    unless (ids = Esse::ArrayUtils.wrap(ids)).empty?
      batch_id = Esse::RedisStorage::Queue.for(repo: repo).enqueue(values: ids)
      Esse::AsyncIndexing.worker("Esse::AsyncIndexing::Jobs::ImportBatchIdJob", service: service)
        .with_args(repo.index.name, repo.repo_name, batch_id, Esse::HashUtils.deep_transform_keys(kwargs, &:to_s))
        .push
    end
  },
  index: ->(service:, repo:, operation:, id:, **kwargs) {
    if id
      Esse::AsyncIndexing.worker("Esse::AsyncIndexing::Jobs::DocumentIndexByIdJob", service: service)
        .with_args(repo.index.name, repo.repo_name, id, Esse::HashUtils.deep_transform_keys(kwargs, &:to_s))
        .push
    end
  },
  update: ->(service:, repo:, operation:, id:, **kwargs) {
    if id
      Esse::AsyncIndexing.worker("Esse::AsyncIndexing::Jobs::DocumentUpdateByIdJob", service: service)
        .with_args(repo.index.name, repo.repo_name, id, Esse::HashUtils.deep_transform_keys(kwargs, &:to_s))
        .push
    end
  },
  delete: ->(service:, repo:, operation:, id:, **kwargs) {
    if id
      Esse::AsyncIndexing.worker("Esse::AsyncIndexing::Jobs::DocumentDeleteByIdJob", service: service)
        .with_args(repo.index.name, repo.repo_name, id, Esse::HashUtils.deep_transform_keys(kwargs, &:to_s))
        .push
    end
  }
}.freeze

Instance Method Summary collapse

Instance Method Details

#async_indexing_job(*operations, &block) ⇒ Object

DSL to define custom job enqueueing

async_indexing_job(:import) do |service:, repo:, operation:, ids:, **kwargs|

MyCustomJob.perform_later(repo.index.name, ids, **kwargs)

end async_indexing_job(:index, :update, :delete) do |service:, repo:, operation:, id, **kwargs|

MyCustomJob.perform_later(repo.index.name, [id], **kwargs)

end



72
73
74
75
76
77
78
79
# File 'lib/esse/plugins/async_indexing.rb', line 72

def async_indexing_job(*operations, &block)
  operations = AsyncIndexingJobValidator::OPERATIONS if operations.empty?
  AsyncIndexingJobValidator.call(operations, block)
  hash = operations.each_with_object({}) { |operation, h| h[operation] = block }
  @async_indexing_jobs = async_indexing_jobs.dup.merge(hash)
ensure
  @async_indexing_jobs.freeze
end

#async_indexing_job_for(operation) ⇒ Object



85
86
87
# File 'lib/esse/plugins/async_indexing.rb', line 85

def async_indexing_job_for(operation)
  async_indexing_jobs[operation] || DEFAULT_ASYNC_INDEXING_JOBS[operation] || raise(ArgumentError, "The #{operation} operation is not implemented")
end

#async_indexing_jobsObject



81
82
83
# File 'lib/esse/plugins/async_indexing.rb', line 81

def async_indexing_jobs
  @async_indexing_jobs || {}.freeze
end

#batch_ids(*args, **kwargs, &block) ⇒ Enumerator

This method is used to retrieve only the ids of the documents in the collection. It’s used to asynchronously index the documents. The #each_batch_ids method is optional and should be implemented by the collection class.

Returns:

  • (Enumerator)

    The enumerator



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/esse/plugins/async_indexing.rb', line 44

def batch_ids(*args, **kwargs, &block)
  if implement_batch_ids?
    Enumerator.new do |yielder|
      @collection_proc.new(*args, **kwargs).each_batch_ids do |batch|
        yielder.yield(batch)
      end
    end
  else
    raise NotImplementedError, format("the %<t>p collection does not implement the #each_batch_ids method", t: @collection_proc)
  end
end

#implement_batch_ids?Boolean

Check if the collection class implements the each_batch_ids method

Returns:

  • (Boolean)

    True if the collection class implements the each_batch_ids method

See Also:

  • #each_batch_ids


60
61
62
# File 'lib/esse/plugins/async_indexing.rb', line 60

def implement_batch_ids?
  @collection_proc.is_a?(Class) && @collection_proc.instance_methods.include?(:each_batch_ids)
end