Class: Hermann::Provider::RDKafka::Producer
- Inherits:
-
Object
- Object
- Hermann::Provider::RDKafka::Producer
- Defined in:
- ext/hermann/hermann_rdkafka.c
Instance Method Summary collapse
-
#connect(timeout) ⇒ Object
Producer.connect.
-
#connected? ⇒ Boolean
Producer.connected?.
-
#errored? ⇒ Boolean
Producer.errored?.
-
#initialize(brokers) ⇒ Object
constructor
producer_initialize.
-
#initialize_copy(orig) ⇒ Object
producer_init_copy.
-
#metadata(topicStr, timeout) ⇒ Object
producer_metadata.
-
#push_single(message, topic, partition_key, result) ⇒ Object
producer_push_single.
-
#tick(timeout) ⇒ Object
producer_tick.
Constructor Details
#initialize(brokers) ⇒ Object
producer_initialize
Set up the configuration context for the Producer instance
1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 |
# File 'ext/hermann/hermann_rdkafka.c', line 1167
static VALUE producer_initialize(VALUE self,
VALUE brokers) {
HermannInstanceConfig* producerConfig;
char* topicPtr;
char* brokersPtr;
TRACER("initialize Producer ruby object\n");
brokersPtr = StringValuePtr(brokers);
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
producerConfig->brokers = brokersPtr;
/** Using RD_KAFKA_PARTITION_UA specifies we want the partitioner callback to be called to determine the target
* partition
*/
producerConfig->partition = RD_KAFKA_PARTITION_UA;
producerConfig->run = 1;
producerConfig->exit_eof = 0;
producerConfig->quiet = 0;
return self;
}
|
Instance Method Details
#connect(timeout) ⇒ Object
Producer.connect
764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 |
# File 'ext/hermann/hermann_rdkafka.c', line 764
static VALUE producer_connect(VALUE self, VALUE timeout) {
HermannInstanceConfig *producerConfig;
rd_kafka_resp_err_t err;
VALUE result = Qfalse;
hermann_metadata_ctx_t md_context;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (!producerConfig->isInitialized) {
producer_init_kafka(self, producerConfig);
}
md_context.rk = producerConfig->rk;
md_context.topic = NULL;
md_context.data = NULL;
md_context.timeout_ms = rb_num2int(timeout);
err = producer_metadata_request(&md_context);
TRACER("err: %s (%i)\n", rd_kafka_err2str(err), err);
if (RD_KAFKA_RESP_ERR_NO_ERROR == err) {
TRACER("brokers: %i, topics: %i\n",
md_context.data->broker_cnt,
md_context.data->topic_cnt);
producerConfig->isConnected = 1;
result = Qtrue;
}
else {
producerConfig->isErrored = err;
}
if ( md_context.data )
rd_kafka_metadata_destroy(md_context.data);
return result;
}
|
#connected? ⇒ Boolean
Producer.connected?
908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 |
# File 'ext/hermann/hermann_rdkafka.c', line 908
static VALUE producer_is_connected(VALUE self) {
HermannInstanceConfig *producerConfig;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (!producerConfig->isInitialized) {
return Qfalse;
}
if (!producerConfig->isConnected) {
return Qfalse;
}
return Qtrue;
}
|
#errored? ⇒ Boolean
Producer.errored?
924 925 926 927 928 929 930 931 932 933 934 |
# File 'ext/hermann/hermann_rdkafka.c', line 924
static VALUE producer_is_errored(VALUE self) {
HermannInstanceConfig *producerConfig;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (producerConfig->isErrored) {
return Qtrue;
}
return Qfalse;
}
|
#initialize_copy(orig) ⇒ Object
producer_init_copy
Copy the configuration information from orig into copy for the given Producer instances.
1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 |
# File 'ext/hermann/hermann_rdkafka.c', line 1199
static VALUE producer_init_copy(VALUE copy,
VALUE orig) {
HermannInstanceConfig* orig_config;
HermannInstanceConfig* copy_config;
if (copy == orig) {
return copy;
}
if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)producer_free) {
rb_raise(rb_eTypeError, "wrong argument type");
}
Data_Get_Struct(orig, HermannInstanceConfig, orig_config);
Data_Get_Struct(copy, HermannInstanceConfig, copy_config);
// Copy over the data from one struct to the other
MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1);
return copy;
}
|
#metadata(topicStr, timeout) ⇒ Object
producer_metadata
make a metadata request to the kafka server, returning a hash containing a list of brokers and topics.
873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 |
# File 'ext/hermann/hermann_rdkafka.c', line 873
static VALUE producer_metadata(VALUE self, VALUE topicStr, VALUE timeout) {
HermannInstanceConfig *producerConfig;
rd_kafka_resp_err_t err;
hermann_metadata_ctx_t md_context;
VALUE result;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (!producerConfig->isInitialized) {
producer_init_kafka(self, producerConfig);
}
md_context.rk = producerConfig->rk;
md_context.timeout_ms = rb_num2int(timeout);
if ( !NIL_P(topicStr) ) {
Check_Type(topicStr, T_STRING);
md_context.topic = rd_kafka_topic_new(producerConfig->rk, StringValuePtr(topicStr), NULL);
} else {
md_context.topic = NULL;
}
err = producer_metadata_request(&md_context);
if ( err != RD_KAFKA_RESP_ERR_NO_ERROR ) {
// annoyingly, this is always a timeout error -- the rest rdkafka just jams onto STDERR
rb_raise( rb_eRuntimeError, "%s", rd_kafka_err2str(err) );
} else {
result = producer_metadata_make_hash(md_context.data);
rd_kafka_metadata_destroy(md_context.data);
return result;
}
}
|
#push_single(message, topic, partition_key, result) ⇒ Object
producer_push_single
push completes
607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 |
# File 'ext/hermann/hermann_rdkafka.c', line 607
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE partition_key, VALUE result) {
HermannInstanceConfig* producerConfig;
/* Context pointer, pointing to `result`, for the librdkafka delivery
* callback
*/
hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t));
rd_kafka_topic_t *rkt = NULL;
rd_kafka_topic_conf_t *rkt_conf = NULL;
TRACER("self: %p, message: %p, result: %p)\n", self, message, result);
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
delivery_ctx->producer = producerConfig;
delivery_ctx->result = (VALUE) NULL;
TRACER("producerConfig: %p\n", producerConfig);
if ((Qnil == topic) ||
(0 == RSTRING_LEN(topic))) {
rb_raise(rb_eArgError, "Topic cannot be empty");
return self;
}
if (!producerConfig->isInitialized) {
producer_init_kafka(self, producerConfig);
}
TRACER("kafka initialized\n");
/* Topic configuration */
rkt_conf = rd_kafka_topic_conf_new();
/* Set the partitioner callback */
rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, producer_partitioner_callback);
rkt = rd_kafka_topic_new(producerConfig->rk,
RSTRING_PTR(topic),
rkt_conf);
if (NULL == rkt) {
rb_raise(rb_eRuntimeError, "Could not construct a topic structure");
return self;
}
/* Only pass result through if it's non-nil */
if (Qnil != result) {
delivery_ctx->result = result;
TRACER("setting result: %p\n", result);
}
TRACER("rd_kafka_produce() message of %i bytes\n", RSTRING_LEN(message));
/* Send/Produce message. */
if (-1 == rd_kafka_produce(rkt,
producerConfig->partition,
RD_KAFKA_MSG_F_COPY,
RSTRING_PTR(message),
RSTRING_LEN(message),
RSTRING_PTR(partition_key),
RSTRING_LEN(partition_key),
delivery_ctx)) {
fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n",
rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition,
rd_kafka_err2str(rd_kafka_errno2err(errno)));
/* TODO: raise a Ruby exception here, requires a test though */
}
if (NULL != rkt) {
rd_kafka_topic_destroy(rkt);
}
TRACER("returning\n");
return self;
}
|
#tick(timeout) ⇒ Object
producer_tick
This function is responsible for ticking the librdkafka reactor so we can get feedback from the librdkafka threads back into the Ruby environment
@param self VALUE the Ruby producer instance
@param message VALUE A Ruby FixNum of how many ms we should wait on librdkafka
694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 |
# File 'ext/hermann/hermann_rdkafka.c', line 694
static VALUE producer_tick(VALUE self, VALUE timeout) {
hermann_conf_t *conf = NULL;
long timeout_ms = 0;
int events = 0;
if (Qnil != timeout) {
timeout_ms = rb_num2int(timeout);
}
else {
rb_raise(rb_eArgError, "Cannot call `tick` with a nil timeout!\n");
}
Data_Get_Struct(self, hermann_conf_t, conf);
/*
* if the producerConfig is not initialized then we never properly called
* producer_push_single, so why are we ticking?
*/
if (!conf->isInitialized) {
rb_raise(rb_eRuntimeError, "Cannot call `tick` without having ever sent a message\n");
}
events = rd_kafka_poll(conf->rk, timeout_ms);
if (conf->isErrored) {
rb_raise(rb_eStandardError, "%s", conf->error);
}
return rb_int_new(events);
}
|