Class: Concord::ComputationContext

Inherits:
Object
  • Object
show all
Defined in:
lib/concord.rb

Overview

Transactional wrapper for proxy <=> client interactions

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(computation) ⇒ ComputationContext

Initialize a new ‘ComputationContext`

Parameters:

  • computation (Object)

    The user-defined computation



35
36
37
38
39
40
# File 'lib/concord.rb', line 35

def initialize(computation)
  self.computation = computation
  self.transaction = ::Concord::Thrift::ComputationTx.new
  self.transaction.records = []
  self.transaction.timers = {}
end

Instance Attribute Details

#computationObject

Returns the value of attribute computation.



31
32
33
# File 'lib/concord.rb', line 31

def computation
  @computation
end

#transactionObject

Returns the value of attribute transaction.



31
32
33
# File 'lib/concord.rb', line 31

def transaction
  @transaction
end

Instance Method Details

#get_state(key) ⇒ String

Retrieve a binary blob stored in the proxy state

Parameters:

  • key (String)

    Key to fetch from data store

Returns:

  • (String)

    Binary blob of data



67
68
69
# File 'lib/concord.rb', line 67

def get_state(key)
  computation.get_state(key)
end

#produce_record(stream, key, value) ⇒ Object

Produce a record on ‘stream` with `key` and `value`

Parameters:

  • stream (String)

    Globally unique name of string

  • key (String)

    Key to group by on stream (pending grouping)

  • value (String)

    Binary blob to pass downstream with tuple



46
47
48
49
50
51
52
53
# File 'lib/concord.rb', line 46

def produce_record(stream, key, value)
  r = ::Concord::Thrift::Record.new
  r.meta = ::Concord::Thrift::RecordMetadata.new
  r.key = key
  r.data = value
  r.userStream = stream
  transaction.records.push(r)
end

#set_state(key, value) ⇒ Object

Store a binary blob, identified by a key, in the proxy state

Parameters:

  • key (String)

    Key to set in data store

  • value (String)

    Binary blob



74
75
76
# File 'lib/concord.rb', line 74

def set_state(key, value)
  computation.set_state(key, value)
end

#set_timer(key, time) ⇒ Object

Set a timer to trigger a callback in the future the ‘process_timer` callback to identify the specific callback.

Parameters:

  • key (String)

    Name of the timer. This parameter will be passed to

  • time (FixNum)

    Integer representing the time (in milliseconds) at which the callback should be triggered.



60
61
62
# File 'lib/concord.rb', line 60

def set_timer(key, time)
  transaction.timers[key] = time
end