Class: Hermann::Provider::RDKafka::Consumer

Inherits:
Object
  • Object
show all
Defined in:
ext/hermann/hermann_rdkafka.c

Instance Method Summary collapse

Constructor Details

#initialize(topic, brokers, partition, offset) ⇒ Object

consumer_initialize

todo: configure the brokers through passed parameter, later through zk

Set up the Consumer’s HermannInstanceConfig context.

Parameters:

  • self

    VALUE the Ruby instance of the Consumer

  • topic

    VALUE a Ruby string

  • brokers

    VALUE a Ruby string containing list of host:port

  • partition

    VALUE a Ruby number

  • offset

    VALUE a Ruby number



1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
# File 'ext/hermann/hermann_rdkafka.c', line 1021

static VALUE consumer_initialize(VALUE self,
								 VALUE topic,
								 VALUE brokers,
								 VALUE partition,
								 VALUE offset) {

	HermannInstanceConfig* consumerConfig;
	char* topicPtr;
	char* brokersPtr;
	int partitionNo;

	TRACER("initing consumer ruby object\n");

	topicPtr = StringValuePtr(topic);
	brokersPtr = StringValuePtr(brokers);
	partitionNo = FIX2INT(partition);
	Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);

	consumerConfig->topic = strdup(topicPtr);
	consumerConfig->brokers = strdup(brokersPtr);
	consumerConfig->partition = partitionNo;
	consumerConfig->run = 1;
	consumerConfig->exit_eof = 0;
	consumerConfig->quiet = 0;

	if ( FIXNUM_P(offset) ) {
		consumerConfig->start_offset = FIX2LONG(offset);
	} else if ( SYMBOL_P(offset) ) {
		if ( offset == ID2SYM(rb_intern("start")) )
			consumerConfig->start_offset = RD_KAFKA_OFFSET_BEGINNING;
		else if ( offset == ID2SYM(rb_intern("end")) )
			consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
	} else {
		consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
	}

	return self;
}

Instance Method Details

#consume(topic) ⇒ Object

Hermann::Provider::RDKafka::Consumer.consume

Parameters:

  • VALUE

    self the Ruby object for this consumer

  • VALUE

    topic the Ruby string representing a topic to consume



473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
# File 'ext/hermann/hermann_rdkafka.c', line 473

static VALUE consumer_consume(VALUE self, VALUE topic) {

	HermannInstanceConfig* consumerConfig;

	TRACER("starting consume\n");

	Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);

	if ((NULL == consumerConfig->topic) ||
		(0 == strlen(consumerConfig->topic))) {
		fprintf(stderr, "Topic is null!\n");
		rb_raise(rb_eRuntimeError, "Topic cannot be empty");
		return self;
	}

	if (!consumerConfig->isInitialized) {
		consumer_init_kafka(consumerConfig);
	}

	/* Start consuming */
	if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1) {
		fprintf(stderr, "%% Failed to start consuming: %s\n",
			rd_kafka_err2str(rd_kafka_errno2err(errno)));
		rb_raise(rb_eRuntimeError, "%s",
				rd_kafka_err2str(rd_kafka_errno2err(errno)));
		return Qnil;
	}

	return rb_ensure(consumer_consume_loop, self, consumer_consume_loop_stop, self);
}

#initialize_copy(orig) ⇒ Object

consumer_init_copy

When copying into a new instance of a Consumer, reproduce the configuration info.

Parameters:

  • copy

    VALUE the Ruby Consumer instance (with configuration) as destination

  • orig

    VALUE the Ruby Consumer instance (with configuration) as source



1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
# File 'ext/hermann/hermann_rdkafka.c', line 1069

static VALUE consumer_init_copy(VALUE copy,
								VALUE orig) {
	HermannInstanceConfig* orig_config;
	HermannInstanceConfig* copy_config;

	if (copy == orig) {
		return copy;
	}

	if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)consumer_free) {
		rb_raise(rb_eTypeError, "wrong argument type");
	}

	Data_Get_Struct(orig, HermannInstanceConfig, orig_config);
	Data_Get_Struct(copy, HermannInstanceConfig, copy_config);

	// Copy over the data from one struct to the other
	MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1);

	return copy;
}