Class: GoodDataMarketo::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/gooddata_marketo/models/streams.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(web_method, request, config = {}) ⇒ Stream

Returns a new instance of Stream.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/gooddata_marketo/models/streams.rb', line 12

def initialize(web_method, request, config = {})

  def start_stream(web_method, request, config)

    @timer_start = Time.now
    puts "#{@timer_start} => Start Streaming:#{web_method}" if GoodDataMarketo.logging

    @client = config[:client]
    @storage = []

    # Make an initial call to determine what type of stream Marketo will respond with. This is based on the configurations in the initial request.
    initialized_stream = @client.call(web_method, request)
    raise "ERROR: No response from Marketo based on query: #{request}" unless initialized_stream

    begin

      # Timeout.timeout(config[:timeout] || 100){
      @storage << initialized_stream.to_json
      # }

      if initialized_stream[:new_stream_position].is_a? String
        @stream_id = initialized_stream[:new_stream_position]
        @stream_id_monitor = true
        @offset_monitor = false
      else
        initialized_stream.delete(:start_position)
        @offset_monitor = true
        @stream_id_monitor = false
        @offset = initialized_stream[:new_start_position][:offset]
      end

      @count = initialized_stream[:remaining_count].to_i

      puts "#{Time.now} => Stream:#{web_method}:#{@count} remain." if GoodDataMarketo.logging

    rescue Timeout::Error => e
      client.load.log('TIMEOUT') if client.load
      puts e if GoodDataMarketo.logging
    end

  end

  start_stream(web_method, request, config)

  @timeouts = 0
  # Begin a loop to iterate by offset or stream id depending on the stream response from Marketo.
  def next_stream_request web_method, request

    begin

      new_request = {}
      request = new_request.merge(request)

      if @stream_id
        request[:stream_position] = @stream_id
      end

      if @count < 1000
        request[:batch_size] = @count.to_s
      end

      chunk_from_stream = @client.call(web_method, request)

      @storage << chunk_from_stream.to_json

      # FOR GET LEAD ACTIVITIES. The return count and remaining count are not stacked. Request 100 and it says 8 remaining, request a batch of 8 and it says 100 remaining. No stream id.
      if chunk_from_stream[:return_count]
        @count = @count - chunk_from_stream[:return_count].to_i
      else
        @count = chunk_from_stream[:remaining_count].to_i
      end

      puts "#{Time.now} => Stream:#{web_method}:#{@count} remain." if GoodDataMarketo.logging

      if chunk_from_stream[:new_stream_position].is_a? String
        @stream_id = chunk_from_stream[:new_stream_position]
        @stream_id_monitor = true
        @offset_monitor = false
      else
        @offset_monitor = true
        @stream_id_monitor = false
        @offset = chunk_from_stream[:new_start_position][:offset]

        # Update Request
        request[:start_position] = {
            :offset => @offset
        }

      end

    rescue Error => e
      puts e if GoodDataMarketo.logging
      retry if @timeouts < 3

    end

  end

  while @count > 0
    next_stream_request web_method, request
  end

  @timer_end = Time.now
  puts "#{@timer_end} => End Streaming: #{web_method}" if GoodDataMarketo.logging
  puts "#{Time.now} => Stream duration: #{((@timer_end - @timer_start)/60).round} minutes." if GoodDataMarketo.logging

  storage_index = 0
  @storage.map! { |m|
    storage_index += 1
    puts "#{Time.now} => Hashing streams in storage: #{storage_index}" if GoodDataMarketo.logging
    JSON.parse(m, :symbolize_names => true)
  }

end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



10
11
12
# File 'lib/gooddata_marketo/models/streams.rb', line 10

def client
  @client
end

Instance Method Details

#next_stream_request(web_method, request) ⇒ Object

Begin a loop to iterate by offset or stream id depending on the stream response from Marketo.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/gooddata_marketo/models/streams.rb', line 58

def next_stream_request web_method, request

  begin

    new_request = {}
    request = new_request.merge(request)

    if @stream_id
      request[:stream_position] = @stream_id
    end

    if @count < 1000
      request[:batch_size] = @count.to_s
    end

    chunk_from_stream = @client.call(web_method, request)

    @storage << chunk_from_stream.to_json

    # FOR GET LEAD ACTIVITIES. The return count and remaining count are not stacked. Request 100 and it says 8 remaining, request a batch of 8 and it says 100 remaining. No stream id.
    if chunk_from_stream[:return_count]
      @count = @count - chunk_from_stream[:return_count].to_i
    else
      @count = chunk_from_stream[:remaining_count].to_i
    end

    puts "#{Time.now} => Stream:#{web_method}:#{@count} remain." if GoodDataMarketo.logging

    if chunk_from_stream[:new_stream_position].is_a? String
      @stream_id = chunk_from_stream[:new_stream_position]
      @stream_id_monitor = true
      @offset_monitor = false
    else
      @offset_monitor = true
      @stream_id_monitor = false
      @offset = chunk_from_stream[:new_start_position][:offset]

      # Update Request
      request[:start_position] = {
          :offset => @offset
      }

    end

  rescue Error => e
    puts e if GoodDataMarketo.logging
    retry if @timeouts < 3

  end

end

#start_stream(web_method, request, config) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/gooddata_marketo/models/streams.rb', line 14

def start_stream(web_method, request, config)

  @timer_start = Time.now
  puts "#{@timer_start} => Start Streaming:#{web_method}" if GoodDataMarketo.logging

  @client = config[:client]
  @storage = []

  # Make an initial call to determine what type of stream Marketo will respond with. This is based on the configurations in the initial request.
  initialized_stream = @client.call(web_method, request)
  raise "ERROR: No response from Marketo based on query: #{request}" unless initialized_stream

  begin

    # Timeout.timeout(config[:timeout] || 100){
    @storage << initialized_stream.to_json
    # }

    if initialized_stream[:new_stream_position].is_a? String
      @stream_id = initialized_stream[:new_stream_position]
      @stream_id_monitor = true
      @offset_monitor = false
    else
      initialized_stream.delete(:start_position)
      @offset_monitor = true
      @stream_id_monitor = false
      @offset = initialized_stream[:new_start_position][:offset]
    end

    @count = initialized_stream[:remaining_count].to_i

    puts "#{Time.now} => Stream:#{web_method}:#{@count} remain." if GoodDataMarketo.logging

  rescue Timeout::Error => e
    client.load.log('TIMEOUT') if client.load
    puts e if GoodDataMarketo.logging
  end

end

#storageObject



127
128
129
# File 'lib/gooddata_marketo/models/streams.rb', line 127

def storage
  @storage
end