Module: Octo::Message::MessageParser

Included in:
Message
Defined in:
lib/octocore-mongo/message_parser.rb

Overview

Parsing kafka messages for octo consumer

Instance Method Summary collapse

Instance Method Details

#parse(msg) ⇒ Hash

Parsing Message hash in Octo compatible form

Parameters:

  • Message (Hash)

    Hash

Returns:

  • (Hash)

    Hash in Octo form

Raises:

  • (StandardError)


13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/octocore-mongo/message_parser.rb', line 13

def parse(msg)
  msg = JSON.parse(msg)
  m = { event_name: msg['event_name'] }
  case msg['event_name']
  when 'funnel_update'
    m.merge!({
              rediskey: msg['rediskey']
            })
  when 'update.profile'
    m.merge!({
              profileDetails: msg['profileDetails']
            })
  when 'page.view'
    m.merge!({
              routeUrl:     msg['routeUrl'],
              categories:   msg.fetch('categories', []),
              tags:         msg.fetch('tags', [])
            })
  when 'productpage.view'
    m.merge!({
              routeUrl:     msg['routeUrl'],
              categories:   msg.fetch('categories', []),
              tags:         msg.fetch('tags', []),
              productId:    msg['productId'],
              productName:  msg['productName'],
              price:        msg['price']
            })
  when 'update.push_token'
    m.merge!({
              pushType:     msg['notificationType'],
              pushKey:      msg['pushKey'],
              pushToken:    msg['pushToken']
            })
  end
  enterprise = msg['enterprise']
  raise StandardError, 'Parse Error' if enterprise.nil?

  eid = if enterprise.has_key?'custom_id'
          enterprise['custom_id']
        elsif enterprise.has_key?'customId'
          enterprise['customId']
        end

  ename = if enterprise.has_key?'user_name'
            enterprise['user_name']
          elsif enterprise.has_key?'userName'
            enterprise['userName']
          else
            nil
          end
  m.merge!({
    id:             msg.fetch('uuid', nil),
    enterpriseId:   eid,
    enterpriseName: ename,
    phone:          msg.fetch('phoneDetails', nil),
    browser:        msg.fetch('browserDetails', nil),
    userId:         msg.fetch('userId', -1),
    created_at:     Time.now
  })

  m
end