5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
# File 'lib/meeseeker/block_follower_job.rb', line 5
def perform(options = {})
chain = (options[:chain] || 'hive').to_sym
url = Meeseeker.default_url(chain)
block_api = Meeseeker.block_api_class(chain).new(url: url)
redis = Meeseeker.redis
last_key_prefix = nil
trx_index = 0
current_block_num = nil
block_transactions = []
chain_key_prefix = chain.to_s if !!options[:chain]
chain_key_prefix ||= Meeseeker.default_chain_key_prefix
stream_operations(options) do |op, trx_id, block_num|
begin
current_key_prefix = "#{chain_key_prefix}:#{block_num}:#{trx_id}"
if current_key_prefix == last_key_prefix
trx_index += 1
else
if !!last_key_prefix
_, b, t = last_key_prefix.split(':')
transaction_payload = {
block_num: b.to_i,
transaction_id: t,
transaction_num: block_transactions.size
}
block_transactions << trx_id unless trx_id == VIRTUAL_TRX_ID
redis.publish("#{chain_key_prefix}:transaction", transaction_payload.to_json)
end
last_key_prefix = "#{chain_key_prefix}:#{block_num}:#{trx_id}"
trx_index = 0
end
op_type = if op.type.end_with? '_operation'
op.type.split('_')[0..-2].join('_')
else
op.type
end
key = "#{current_key_prefix}:#{trx_index}:#{op_type}"
puts key
end
unless Meeseeker.max_keys == -1
while redis.keys("#{chain_key_prefix}:*").size > Meeseeker.max_keys
sleep Meeseeker::BLOCK_INTERVAL
end
end
redis.set(key, op.to_json)
redis.expire(key, Meeseeker.expire_keys) unless Meeseeker.expire_keys == -1
if current_block_num != block_num
block_transactions = []
block_payload = {
block_num: block_num
}
if Meeseeker.
catch :block_header do
block_api.(block_num: block_num) do |result|
if result.nil? || result..nil?
puts "Node returned empty result for block_header on block_num: #{block_num} (rate limiting?). Retrying ..."
sleep Meeseeker::BLOCK_INTERVAL
throw :block_header
end
block_payload.merge!(result..to_h)
end
end
end
redis.set(chain_key_prefix + LAST_BLOCK_NUM_KEY_SUFFIX, block_num)
redis.publish("#{chain_key_prefix}:block", block_payload.to_json)
current_block_num = block_num
end
redis.publish("#{chain_key_prefix}:op:#{op_type}", {key: key}.to_json)
if Meeseeker.publish_op_custom_id
if %w(custom custom_binary custom_json).include? op_type
id = (op["value"]["id"] rescue nil).to_s
if id.size > 0
redis.publish("#{chain_key_prefix}:op:#{op_type}:#{id}", {key: key}.to_json)
end
end
end
end
end
|