8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/esse/async_indexing/cli/async_import.rb', line 8
def run
validate_options!
indices.each do |index|
repos = if (repo = @options[:repo])
[index.repo(repo)]
else
index.repo_hash.values
end
repos.each do |repo|
unless Esse::AsyncIndexing.async_indexing_repo?(repo)
raise Esse::CLI::InvalidOption, <<~MSG
The #{repo} repository does not support async indexing. Make sure you have the `plugin :async_indexing` in your `#{index}` class and the :#{repo.repo_name} collection implements the `#each_batch_ids` method.
MSG
end
enqueuer = if (caller = repo.async_indexing_jobs[:import])
->(ids) { caller.call(service_name, repo, :import, ids, **bulk_options) }
else
queue = Esse::RedisStorage::Queue.for(repo: repo)
->(ids) do
batch_id = queue.enqueue(values: ids)
Esse::AsyncIndexing.worker(WORKER_NAME, service: service_name)
.with_args(repo.index.name, repo.repo_name, batch_id, Esse::HashUtils.deep_transform_keys(bulk_options, &:to_s))
.push
end
end
repo.batch_ids.each(&enqueuer)
end
end
end
|