Class: Esse::AsyncIndexing::CLI::AsyncImport

Inherits:
CLI::Index::BaseOperation
  • Object
show all
Defined in:
lib/esse/async_indexing/cli/async_import.rb

Constant Summary collapse

WORKER_NAME =
"Esse::AsyncIndexing::Jobs::ImportBatchIdJob"

Instance Method Summary collapse

Instance Method Details

#runObject



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/esse/async_indexing/cli/async_import.rb', line 8

def run
  validate_options!
  indices.each do |index|
    repos = if (repo = @options[:repo])
      [index.repo(repo)]
    else
      index.repo_hash.values
    end

    repos.each do |repo|
      unless Esse::AsyncIndexing.async_indexing_repo?(repo)
        raise Esse::CLI::InvalidOption, <<~MSG
          The #{repo} repository does not support async indexing. Make sure you have the `plugin :async_indexing` in your `#{index}` class and the :#{repo.repo_name} collection implements the `#each_batch_ids` method.
        MSG
      end

      enqueuer = if (caller = repo.async_indexing_jobs[:import])
        ->(ids) { caller.call(service_name, repo, :import, ids, **bulk_options) }
      else
        queue = Esse::RedisStorage::Queue.for(repo: repo)
        ->(ids) do
          batch_id = queue.enqueue(values: ids)
          Esse::AsyncIndexing.worker(WORKER_NAME, service: service_name)
            .with_args(repo.index.name, repo.repo_name, batch_id, Esse::HashUtils.deep_transform_keys(bulk_options, &:to_s))
            .push
        end
      end

      repo.batch_ids.each(&enqueuer)
    end
  end
end