Class: RFlow::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/rflow/message.rb

Defined Under Namespace

Classes: Data, ProcessingEvent

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(data_type_name, provenance = [], serialization_type = 'avro', schema = nil, serialized_data = nil) ⇒ Message

Returns a new instance of Message.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/rflow/message.rb', line 39

def initialize(data_type_name, provenance = [], serialization_type = 'avro', schema = nil, serialized_data = nil)
  @data_type_name = data_type_name.to_s

  # TODO: Make this better.  This check is technically
  # unnecessary, as we are able to completely deserialize the
  # message without needing to resort to the registered schema.
  registered_schema = RFlow::Configuration.available_data_types[@data_type_name][serialization_type.to_s]
  unless registered_schema
    raise ArgumentError, "Data type '#{@data_type_name}' with serialization_type '#{serialization_type}' not found"
  end

  # TODO: think about registering the schemas automatically if not
  # found in Configuration
  if schema && (registered_schema != schema)
    raise ArgumentError, "Passed schema ('#{schema}') does not equal registered schema ('#{registered_schema}') for data type '#{@data_type_name}' with serialization_type '#{serialization_type}'"
  end

  # Turn the provenance array of Hashes into an array of
  # ProcessingEvents for easier access and time validation.
  # TODO: do this lazily so as not to create/destroy objects that are
  # never used
  @provenance = (provenance || []).map do |event|
    if event.is_a? ProcessingEvent
      event
    else
      ProcessingEvent.new(event['component_instance_uuid'],
                          event['started_at'], event['completed_at'],
                          event['context'])
    end
  end

  @data = Data.new(registered_schema, serialization_type.to_s, serialized_data)

  # Get the extensions and apply them to the data object to add capability
  RFlow::Configuration.available_data_extensions[@data_type_name].each do |e|
    RFlow.logger.debug "Extending '#{data_type_name}' with extension '#{e}'"
    @data.extend e
  end
end

Instance Attribute Details

#dataObject (readonly)

Returns the value of attribute data.



36
37
38
# File 'lib/rflow/message.rb', line 36

def data
  @data
end

#data_type_nameObject (readonly)

Returns the value of attribute data_type_name.



36
37
38
# File 'lib/rflow/message.rb', line 36

def data_type_name
  @data_type_name
end

#provenanceObject

Returns the value of attribute provenance.



37
38
39
# File 'lib/rflow/message.rb', line 37

def provenance
  @provenance
end

Class Method Details

.encode(message) ⇒ Object



24
# File 'lib/rflow/message.rb', line 24

def encode(message); RFlow::Avro.encode(message_writer, message); end

.from_avro(bytes) ⇒ Object

Take in an Avro serialization of a message and return a new Message object. Assumes the org.rflow.Message Avro schema.



28
29
30
31
32
33
# File 'lib/rflow/message.rb', line 28

def from_avro(bytes)
  message = RFlow::Avro.decode(message_reader, bytes)
  Message.new(message['data_type_name'], message['provenance'],
              message['data_serialization_type'], message['data_schema'],
              message['data'])
end

.message_readerObject



22
# File 'lib/rflow/message.rb', line 22

def message_reader; @message_reader ||= ::Avro::IO::DatumReader.new(schema, schema); end

.message_writerObject



23
# File 'lib/rflow/message.rb', line 23

def message_writer; @message_writer ||= ::Avro::IO::DatumWriter.new(schema); end

.schemaObject



21
# File 'lib/rflow/message.rb', line 21

def schema; @schema ||= ::Avro::Schema.parse(File.read(File.join(File.dirname(__FILE__), '..', '..', 'schema', 'message.avsc'))); end

Instance Method Details

#to_avroObject

Serialize the current message object to Avro using the org.rflow.Message Avro schema. Note that we have to manually set the encoding for Ruby 1.9, otherwise the stringio would use UTF-8 by default, which would not work correctly, as a serialize avro string is BINARY, not UTF-8



84
85
86
87
88
89
90
# File 'lib/rflow/message.rb', line 84

def to_avro
  Message.encode('data_type_name' => data_type_name.to_s,
                 'provenance' => provenance.map(&:to_hash),
                 'data_serialization_type' => data.serialization_type.to_s,
                 'data_schema' => data.schema_string,
                 'data' => data.to_avro)
end