Class: Hermann::Provider::RDKafka::Consumer
- Inherits:
-
Object
- Object
- Hermann::Provider::RDKafka::Consumer
- Defined in:
- ext/hermann/hermann_rdkafka.c
Instance Method Summary collapse
-
#consume(topic) ⇒ Object
Hermann::Provider::RDKafka::Consumer.consume.
-
#initialize(topic, brokers, partition, offset) ⇒ Object
constructor
consumer_initialize.
-
#initialize_copy(orig) ⇒ Object
consumer_init_copy.
Constructor Details
#initialize(topic, brokers, partition, offset) ⇒ Object
consumer_initialize
todo: configure the brokers through passed parameter, later through zk
Set up the Consumer’s HermannInstanceConfig context.
1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 |
# File 'ext/hermann/hermann_rdkafka.c', line 1021
static VALUE consumer_initialize(VALUE self,
VALUE topic,
VALUE brokers,
VALUE partition,
VALUE offset) {
HermannInstanceConfig* consumerConfig;
char* topicPtr;
char* brokersPtr;
int partitionNo;
TRACER("initing consumer ruby object\n");
topicPtr = StringValuePtr(topic);
brokersPtr = StringValuePtr(brokers);
partitionNo = FIX2INT(partition);
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
consumerConfig->topic = strdup(topicPtr);
consumerConfig->brokers = strdup(brokersPtr);
consumerConfig->partition = partitionNo;
consumerConfig->run = 1;
consumerConfig->exit_eof = 0;
consumerConfig->quiet = 0;
if ( FIXNUM_P(offset) ) {
consumerConfig->start_offset = FIX2LONG(offset);
} else if ( SYMBOL_P(offset) ) {
if ( offset == ID2SYM(rb_intern("start")) )
consumerConfig->start_offset = RD_KAFKA_OFFSET_BEGINNING;
else if ( offset == ID2SYM(rb_intern("end")) )
consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
} else {
consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
}
return self;
}
|
Instance Method Details
#consume(topic) ⇒ Object
Hermann::Provider::RDKafka::Consumer.consume
473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'ext/hermann/hermann_rdkafka.c', line 473
static VALUE consumer_consume(VALUE self, VALUE topic) {
HermannInstanceConfig* consumerConfig;
TRACER("starting consume\n");
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
if ((NULL == consumerConfig->topic) ||
(0 == strlen(consumerConfig->topic))) {
fprintf(stderr, "Topic is null!\n");
rb_raise(rb_eRuntimeError, "Topic cannot be empty");
return self;
}
if (!consumerConfig->isInitialized) {
consumer_init_kafka(consumerConfig);
}
/* Start consuming */
if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1) {
fprintf(stderr, "%% Failed to start consuming: %s\n",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
rb_raise(rb_eRuntimeError, "%s",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
return Qnil;
}
return rb_ensure(consumer_consume_loop, self, consumer_consume_loop_stop, self);
}
|
#initialize_copy(orig) ⇒ Object
consumer_init_copy
When copying into a new instance of a Consumer, reproduce the configuration info.
1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 |
# File 'ext/hermann/hermann_rdkafka.c', line 1069
static VALUE consumer_init_copy(VALUE copy,
VALUE orig) {
HermannInstanceConfig* orig_config;
HermannInstanceConfig* copy_config;
if (copy == orig) {
return copy;
}
if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)consumer_free) {
rb_raise(rb_eTypeError, "wrong argument type");
}
Data_Get_Struct(orig, HermannInstanceConfig, orig_config);
Data_Get_Struct(copy, HermannInstanceConfig, copy_config);
// Copy over the data from one struct to the other
MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1);
return copy;
}
|