13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
# File 'lib/meeseeker/steem_engine/follower_job.rb', line 13
def perform(options = {})
redis = Meeseeker.redis
last_key_prefix = nil
trx_index = 0
current_block_num = nil
block_transactions = []
stream_transactions(options) do |data, block|
transaction = data[:transaction]
virtual = !!data[:virtual]
begin
trx_id = transaction['transactionId'].to_s.split('-').first
block_num = block['blockNumber']
current_key_prefix = "#{@chain_key_prefix}:#{block_num}:#{trx_id}"
contract = transaction['contract']
action = transaction['action']
if current_key_prefix == last_key_prefix
trx_index += 1
else
if !!last_key_prefix
_, b, t = last_key_prefix.split(':')
transaction_payload = {
block_num: b.to_i,
transaction_id: t,
transaction_num: block_transactions.size
}
block_transactions << trx_id
trx_pub_key = if !!virtual
"#{@chain_key_prefix}:virtual_transaction"
else
"#{@chain_key_prefix}:transaction"
end
redis.publish(trx_pub_key, transaction_payload.to_json)
end
last_key_prefix = "#{@chain_key_prefix}:#{block_num}:#{trx_id}"
trx_index = 0
end
key = "#{current_key_prefix}:#{trx_index}:#{contract}:#{action}"
puts key
end
unless Meeseeker.max_keys == -1
while redis.keys("#{@chain_key_prefix}:*").size > Meeseeker.max_keys
sleep Meeseeker::BLOCK_INTERVAL
end
end
redis.set(key, transaction.to_json)
redis.expire(key, Meeseeker.expire_keys) unless Meeseeker.expire_keys == -1
if current_block_num != block_num
block_transactions = []
block_payload = {
block_num: block_num
}
redis.set(@chain_key_prefix + Meeseeker::LAST_STEEM_ENGINE_BLOCK_NUM_KEY_SUFFIX, block_num)
redis.publish("#{@chain_key_prefix}:block", block_payload.to_json)
current_block_num = block_num
end
redis.publish("#{@chain_key_prefix}:#{contract}", {key: key}.to_json)
redis.publish("#{@chain_key_prefix}:#{contract}:#{action}", {key: key}.to_json)
end
end
|