Nginx Kafka数据生产接口
為什么80%的碼農都做不了架構師?>>> ??
介紹
????? ? 在向kafka產生數據的過程中,在隨機的一臺服務器上想給kafka集群發送數據,還要先安裝相關語言的lib庫,實在是繁瑣,我的需求是向kafka集群發送大量的數據,而讀取kafka中的數據則是通過另一套系統去讀取,有沒有開箱即用的插件呢,比如http接口,github上真有幾個,包括前面博文中介紹的(前面介紹的是作為kafka rest接口來實現的,功能齊全,但復雜),出現較早的幾個模塊很早已經測試過了,因為項目的種種原因并不能完全符合需求,后面出現國人的一個項目,拿來改改湊活用。
開源項目
????? ? 1? ? ngx_kafka_module? 3Q?brg_liuwei
????? ? 2? ? 上面的變種
缺陷
????? ? ngx_kafka_module只能向kafka集群發送POST請求的數據,比如想象nginx一樣獲取變量值記錄在日志中,暫時還沒有實現,作者還在改進中....
改進項
????? ? 根據項目需要先獲取以下變量的值(由于kafka模塊在nginx中所處的處理階段,并不是所有的nginx內置變量都可以獲取到)。
$remote_addr|$time_local|$request|$http_user_agent|$request_body變種代碼
????????ngx_http_kafka_module.c:
/** nginx kafka module** using librdkafka: https://github.com/edenhill/librdkafka*/#include <ngx_config.h> #include <ngx_core.h> #include <ngx_http.h> #include <ngx_string.h> #include <stdint.h>#include <alloca.h> #include <librdkafka/rdkafka.h> #include <errno.h>#define KAFKA_TOPIC_MAXLEN 256 #define KAFKA_BROKER_MAXLEN 512#define KAFKA_ERR_NO_DATA "no_message\n" #define KAFKA_ERR_BODY_TO_LARGE "body_too_large\n" #define KAFKA_ERR_PRODUCER "kafka_producer_error\n"#define KAFKA_PARTITION_UNSET 0xFFFFFFFFstatic ngx_int_t ngx_http_kafka_init_worker(ngx_cycle_t *cycle); static void ngx_http_kafka_exit_worker(ngx_cycle_t *cycle);static void *ngx_http_kafka_create_main_conf(ngx_conf_t *cf); static void *ngx_http_kafka_create_loc_conf(ngx_conf_t *cf); static char *ngx_http_kafka_merge_loc_conf(ngx_conf_t *cf,void *parent, void *child); static char *ngx_http_set_kafka(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); static char *ngx_http_set_kafka_broker_list(ngx_conf_t *cf,ngx_command_t *cmd, void *conf); static char *ngx_http_set_kafka_topic(ngx_conf_t *cf,ngx_command_t *cmd, void *conf); static char *ngx_http_set_kafka_partition(ngx_conf_t *cf,ngx_command_t *cmd, void *conf); static char *ngx_http_set_kafka_broker(ngx_conf_t *cf,ngx_command_t *cmd, void *conf); static ngx_int_t ngx_http_kafka_handler(ngx_http_request_t *r); static void ngx_http_kafka_post_callback_handler(ngx_http_request_t *r);typedef enum {ngx_str_push = 0,ngx_str_pop = 1 } ngx_str_op;static void ngx_str_helper(ngx_str_t *str, ngx_str_op op);typedef struct {rd_kafka_t *rk;rd_kafka_conf_t *rkc;ngx_array_t *broker_list; } ngx_http_kafka_main_conf_t;static char *ngx_http_kafka_main_conf_broker_add(ngx_http_kafka_main_conf_t *cf,ngx_str_t *broker);typedef struct {ngx_str_t topic; /* kafka topic */ngx_str_t broker; /* broker addr (eg: localhost:9092) *//* kafka partition(0...N), default value: RD_KAFKA_PARTITION_UA */ngx_int_t partition;rd_kafka_topic_t *rkt;rd_kafka_topic_conf_t *rktc;} ngx_http_kafka_loc_conf_t;static ngx_command_t ngx_http_kafka_commands[] = {{ngx_string("kafka"),NGX_HTTP_MAIN_CONF|NGX_CONF_NOARGS,ngx_http_set_kafka,NGX_HTTP_MAIN_CONF_OFFSET,0,NULL },{ngx_string("kafka_broker_list"),NGX_HTTP_MAIN_CONF|NGX_CONF_1MORE,ngx_http_set_kafka_broker_list,NGX_HTTP_MAIN_CONF_OFFSET,0,NULL },{ngx_string("kafka_topic"),NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,ngx_http_set_kafka_topic,NGX_HTTP_LOC_CONF_OFFSET,offsetof(ngx_http_kafka_loc_conf_t, topic),NULL },{ngx_string("kafka_partition"),NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,ngx_http_set_kafka_partition,NGX_HTTP_LOC_CONF_OFFSET,offsetof(ngx_http_kafka_loc_conf_t, partition),NULL },{ngx_string("kafka_broker"),NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,ngx_http_set_kafka_broker,NGX_HTTP_LOC_CONF_OFFSET,offsetof(ngx_http_kafka_loc_conf_t, broker),NULL },ngx_null_command };static ngx_http_module_t ngx_http_kafka_module_ctx = {NULL, /* pre conf */NULL, /* post conf */ngx_http_kafka_create_main_conf, /* create main conf */NULL, /* init main conf */NULL, /* create server conf */NULL, /* merge server conf */ngx_http_kafka_create_loc_conf, /* create local conf */ngx_http_kafka_merge_loc_conf, /* merge location conf */ };ngx_module_t ngx_http_kafka_module = {NGX_MODULE_V1,&ngx_http_kafka_module_ctx, /* module context */ngx_http_kafka_commands, /* module directives */NGX_HTTP_MODULE, /* module type */NULL, /* init master */NULL, /* init module */ngx_http_kafka_init_worker, /* init process */NULL, /* init thread */NULL, /* exit thread */ngx_http_kafka_exit_worker, /* exit process */NULL, /* exit master */NGX_MODULE_V1_PADDING };char *ngx_http_kafka_main_conf_broker_add(ngx_http_kafka_main_conf_t *cf,ngx_str_t *broker) {ngx_str_t *new_broker;new_broker = ngx_array_push(cf->broker_list);if (new_broker == NULL) {return NGX_CONF_ERROR;}*new_broker = *broker;return NGX_OK; }void *ngx_http_kafka_create_main_conf(ngx_conf_t *cf) {ngx_http_kafka_main_conf_t *conf;conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_kafka_main_conf_t));if (conf == NULL) {return NGX_CONF_ERROR;}conf->rk = NULL;conf->rkc = NULL;conf->broker_list = ngx_array_create(cf->pool, 4, sizeof(ngx_str_t));if (conf->broker_list == NULL) {return NULL;}return conf; }void *ngx_http_kafka_create_loc_conf(ngx_conf_t *cf) {ngx_http_kafka_loc_conf_t *conf;conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_kafka_loc_conf_t));if (conf == NULL) {return NGX_CONF_ERROR;}ngx_str_null(&conf->topic);ngx_str_null(&conf->broker);/** Could not set conf->partition RD_KAFKA_PARTITION_UA, * because both values of RD_KAFKA_PARTITION_UA and NGX_CONF_UNSET is -1*/conf->partition = KAFKA_PARTITION_UNSET;return conf; }char *ngx_http_kafka_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) {ngx_http_kafka_loc_conf_t *prev = parent;ngx_http_kafka_loc_conf_t *conf = child;#define ngx_conf_merge_kafka_partition_conf(conf, prev, def) \if (conf == KAFKA_PARTITION_UNSET) { \conf = (prev == KAFKA_PARTITION_UNSET) ? def : prev; \}ngx_conf_merge_kafka_partition_conf(conf->partition, prev->partition,RD_KAFKA_PARTITION_UA);#undef ngx_conf_merge_kafka_partition_confreturn NGX_CONF_OK; }void kafka_callback_handler(rd_kafka_t *rk,void *msg, size_t len, int err, void *opaque, void *msg_opaque) {if (err != 0) {ngx_log_error(NGX_LOG_ERR,(ngx_log_t *)msg_opaque, 0, rd_kafka_err2str(err));} }char *ngx_http_set_kafka(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {/* we can add more code here to config ngx_http_kafka_main_conf_t */return NGX_CONF_OK; }char *ngx_http_set_kafka_broker_list(ngx_conf_t *cf,ngx_command_t *cmd, void *conf) {char *cf_result;ngx_uint_t i;ngx_str_t *value;ngx_http_kafka_main_conf_t *main_conf;main_conf = conf;value = cf->args->elts;for (i = 1; i < cf->args->nelts; ++i) {cf_result = ngx_http_kafka_main_conf_broker_add(main_conf, &value[i]);if (cf_result != NGX_OK) {return cf_result;}}return NGX_OK; }char *ngx_http_set_kafka_topic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {char *cf_result;ngx_http_core_loc_conf_t *clcf;ngx_http_kafka_loc_conf_t *local_conf;/* install ngx_http_kafka_handler */clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);if (clcf == NULL) {return NGX_CONF_ERROR;}clcf->handler = ngx_http_kafka_handler;/* ngx_http_kafka_loc_conf_t::topic assignment */cf_result = ngx_conf_set_str_slot(cf, cmd, conf);if (cf_result != NGX_CONF_OK) {return cf_result;}local_conf = conf;local_conf->rktc = rd_kafka_topic_conf_new();return NGX_CONF_OK; }char *ngx_http_set_kafka_partition(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {char *p = conf;ngx_int_t *np; ngx_str_t *value;np = (ngx_int_t *)(p + cmd->offset);if (*np != KAFKA_PARTITION_UNSET) {return "is duplicate";} value = cf->args->elts;if (ngx_strncmp("auto", (const char *)value[1].data, value[1].len) == 0) {*np = RD_KAFKA_PARTITION_UA;} else {*np = ngx_atoi(value[1].data, value[1].len);if (*np == NGX_ERROR) {return "invalid number";}}return NGX_CONF_OK; }char *ngx_http_set_kafka_broker(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {char *cf_result;ngx_http_kafka_loc_conf_t *local_conf;ngx_http_kafka_main_conf_t *main_conf;/* ngx_http_kafka_loc_conf_t::broker assignment */cf_result = ngx_conf_set_str_slot(cf, cmd, conf);if (cf_result != NGX_CONF_OK) {return cf_result;}local_conf = conf;main_conf = ngx_http_conf_get_module_main_conf(cf, ngx_http_kafka_module);if (main_conf == NULL) {return NGX_CONF_ERROR;}return ngx_http_kafka_main_conf_broker_add(main_conf, &local_conf->broker); }static ngx_int_t ngx_http_kafka_handler(ngx_http_request_t *r) {ngx_int_t rv;if (!(r->method & NGX_HTTP_POST)) {return NGX_HTTP_NOT_ALLOWED;}rv = ngx_http_read_client_request_body(r, ngx_http_kafka_post_callback_handler);if (rv >= NGX_HTTP_SPECIAL_RESPONSE) {return rv;}return NGX_DONE; }// get remote_addr_ip static int ngx_http_variable_remote_addr(ngx_http_request_t *r, char *ip_addr, uint32_t *ip_len) {if (!r || !ip_addr || !ip_len)return -1;*ip_len = r->connection->addr_text.len;strncpy(ip_addr, (const char *)r->connection->addr_text.data, *ip_len);return 0; }static int32_t ngx_construct_log_prefix(ngx_http_request_t *r, u_char *in_buf, size_t in_buf_len, u_char **out_buf, size_t *p_out_buf_len) {//u_char str_buf[1000];// remote_addrif (r == NULL || in_buf == NULL || in_buf_len == 0)return -1;uint32_t ip_len = 0;if (ngx_http_variable_remote_addr(r, (char*)in_buf, &ip_len) == 0) {in_buf[ip_len] = '|';} elsereturn -2;if ((ip_len + 1) > in_buf_len)return -999;*p_out_buf_len += (ip_len + 1);// time_localu_char buf[1000]; // for hash_key bufferint keylen = snprintf((char*)buf, sizeof(buf), "%s", "time_local");ngx_uint_t ikey = ngx_hash_strlow(buf, buf, keylen);ngx_str_t name = ngx_string("time_local");ngx_variable_value_t *pvar = ngx_http_get_variable(r, &name, ikey);if (pvar == NULL || pvar->not_found)return -3;int var_len = snprintf((char*)(in_buf + *p_out_buf_len), pvar->len + 2, "%s|", (char*)(pvar->data));*p_out_buf_len += var_len;if (*p_out_buf_len > in_buf_len)return -999;// requestkeylen = snprintf((char*)buf, sizeof(buf), "%s", "request");//keylen = (int)(tmp - buf);ikey = ngx_hash_strlow(buf, buf, keylen);ngx_str_t req_name = ngx_string("request");pvar = ngx_http_get_variable(r, &req_name, ikey);if (pvar == NULL || pvar->not_found)return -4;var_len = snprintf((char*)(in_buf + *p_out_buf_len), pvar->len + 1, "%s|", (char*)(pvar->data));*p_out_buf_len += var_len;if (*p_out_buf_len > in_buf_len)return -999;// http_user_agentkeylen = snprintf((char*)buf, sizeof(buf), "%s", "http_user_agent");//keylen = (int)(tmp - buf);ikey = ngx_hash_strlow(buf, buf, keylen);ngx_str_t agent_name = ngx_string("http_user_agent");pvar = ngx_http_get_variable(r, &agent_name, ikey);if (pvar == NULL || pvar->not_found)return -7;var_len = snprintf((char*)(in_buf + *p_out_buf_len), pvar->len + 3, "|%s|", (char*)(pvar->data));*p_out_buf_len += var_len;if (*p_out_buf_len > in_buf_len)return -999;*out_buf = in_buf;return 0; }static void ngx_http_kafka_post_callback_handler(ngx_http_request_t *r) {int rc, nbufs;u_char *msg, *err_msg;size_t len, err_msg_size;ngx_log_t *conn_log;ngx_buf_t *buf;ngx_chain_t out;ngx_chain_t *cl, *in;ngx_http_request_body_t *body;ngx_http_kafka_main_conf_t *main_conf;ngx_http_kafka_loc_conf_t *local_conf;err_msg = NULL;err_msg_size = 0;main_conf = NULL;/* get body */body = r->request_body;if (body == NULL || body->bufs == NULL) {err_msg = (u_char *)KAFKA_ERR_NO_DATA;err_msg_size = sizeof(KAFKA_ERR_NO_DATA);r->headers_out.status = NGX_HTTP_OK;goto end;}/* calc len and bufs */len = 0;nbufs = 0;in = body->bufs;for (cl = in; cl != NULL; cl = cl->next) {nbufs++;len += (size_t)(cl->buf->last - cl->buf->pos);}/* get msg */if (nbufs == 0) {err_msg = (u_char *)KAFKA_ERR_NO_DATA;err_msg_size = sizeof(KAFKA_ERR_NO_DATA);r->headers_out.status = NGX_HTTP_OK;goto end;}int in_memory = 0; // 0: not in memory, 1: in memoryif (nbufs == 1 && ngx_buf_in_memory(in->buf)) {msg = in->buf->pos;in_memory = 1;} else {if ((msg = ngx_pnalloc(r->pool, len )) == NULL) {ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);return;}//in_memory = 0; // not in memoryfor (cl = in; cl != NULL; cl = cl->next) {if (ngx_buf_in_memory(cl->buf)) {msg = ngx_copy(msg, cl->buf->pos, cl->buf->last - cl->buf->pos);} else {/* TODO: handle buf in file */ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,"ngx_http_kafka_handler cannot handle in-file-post-buf");err_msg = (u_char *)KAFKA_ERR_BODY_TO_LARGE;err_msg_size = sizeof(KAFKA_ERR_BODY_TO_LARGE);r->headers_out.status = NGX_HTTP_INTERNAL_SERVER_ERROR;goto end;}}msg -= len;}/* send to kafka */main_conf = ngx_http_get_module_main_conf(r, ngx_http_kafka_module);local_conf = ngx_http_get_module_loc_conf(r, ngx_http_kafka_module);if (local_conf->rkt == NULL) {ngx_str_helper(&local_conf->topic, ngx_str_push);local_conf->rkt = rd_kafka_topic_new(main_conf->rk,(const char *)local_conf->topic.data, local_conf->rktc);ngx_str_helper(&local_conf->topic, ngx_str_pop);}/** the last param should NOT be r->connection->log, for reason that* the callback handler (func: kafka_callback_handler) would be called * asynchronously when some errors being happened.** At this time, ngx_http_finalize_request may have been invoked.* In this case, the object r had been destroyed* but kafka_callback_handler use the pointer* r->connection->log! Worker processes CRASH!** Thanks for engineers of www.360buy.com report me this bug.** */u_char buffer[1024 * 1024];size_t out_buf_len = 0;u_char *p_out_buf = NULL;if (ngx_construct_log_prefix(r, buffer, sizeof(buffer), &p_out_buf, &out_buf_len) < 0)goto end;void *kafka_msg = NULL;size_t all_len = out_buf_len + len;int new_memory = 0;if (all_len <= sizeof(buffer))memcpy(buffer + out_buf_len, msg, len);else{new_memory = 1;if (all_len < sizeof(buffer)){kafka_msg = (void*)malloc(all_len);if (kafka_msg == NULL)goto end;memcpy(kafka_msg, buffer, out_buf_len);memcpy(((char*)kafka_msg + out_buf_len), msg, len);}}conn_log = r->connection->log;//rc = rd_kafka_produce(local_conf->rkt, (int32_t)local_conf->partition,// RD_KAFKA_MSG_F_COPY, (void *)msg, len, NULL, 0, conn_log);if (new_memory == 1)rc = rd_kafka_produce(local_conf->rkt, (int32_t)local_conf->partition,RD_KAFKA_MSG_F_COPY, (void *)kafka_msg, all_len, NULL, 0, conn_log);else if (new_memory == 0)rc = rd_kafka_produce(local_conf->rkt, (int32_t)local_conf->partition,RD_KAFKA_MSG_F_COPY, (void *)buffer, all_len, NULL, 0, conn_log);if (rc != 0) {ngx_log_error(NGX_LOG_ERR, conn_log, 0,rd_kafka_err2str(rd_kafka_errno2err(errno)));err_msg = (u_char *)KAFKA_ERR_PRODUCER;err_msg_size = sizeof(KAFKA_ERR_PRODUCER);r->headers_out.status = NGX_HTTP_INTERNAL_SERVER_ERROR;}if (new_memory && kafka_msg != NULL) {free(kafka_msg);kafka_msg = NULL;}end:if (err_msg != NULL) {buf = ngx_pcalloc(r->pool, sizeof(ngx_buf_t));out.buf = buf;out.next = NULL;buf->pos = err_msg;buf->last = err_msg + err_msg_size - 1;buf->memory = 1;buf->last_buf = 1;ngx_str_set(&(r->headers_out.content_type), "text/html");ngx_http_send_header(r);ngx_http_output_filter(r, &out);} else {r->headers_out.status = NGX_HTTP_NO_CONTENT;ngx_http_send_header(r);}ngx_http_finalize_request(r, NGX_OK);if (main_conf != NULL) {rd_kafka_poll(main_conf->rk, 0);} }ngx_int_t ngx_http_kafka_init_worker(ngx_cycle_t *cycle) {ngx_uint_t n;ngx_str_t *broker_list;ngx_http_kafka_main_conf_t *main_conf;main_conf = ngx_http_cycle_get_module_main_conf(cycle,ngx_http_kafka_module);main_conf->rkc = rd_kafka_conf_new();rd_kafka_conf_set_dr_cb(main_conf->rkc, kafka_callback_handler);main_conf->rk = rd_kafka_new(RD_KAFKA_PRODUCER, main_conf->rkc, NULL, 0);broker_list = main_conf->broker_list->elts;for (n = 0; n < main_conf->broker_list->nelts; ++n) {ngx_str_helper(&broker_list[n], ngx_str_push);rd_kafka_brokers_add(main_conf->rk, (const char *)broker_list[n].data);ngx_str_helper(&broker_list[n], ngx_str_pop);}return 0; }void ngx_http_kafka_exit_worker(ngx_cycle_t *cycle) {ngx_http_kafka_main_conf_t *main_conf;main_conf = ngx_http_cycle_get_module_main_conf(cycle,ngx_http_kafka_module);rd_kafka_poll(main_conf->rk, 0);while (rd_kafka_outq_len(main_conf->rk) > 0) {rd_kafka_poll(main_conf->rk, 100);}// TODO: rd_kafka_topic_destroy(each loc conf rkt);rd_kafka_destroy(main_conf->rk); }void ngx_str_helper(ngx_str_t *str, ngx_str_op op) {static char backup;switch (op) {case ngx_str_push:backup = str->data[str->len];str->data[str->len] = 0;break;case ngx_str_pop:str->data[str->len] = backup;break;default:ngx_abort();} }編譯安裝
????? ? 下載nginx-1.10.3,采用動態模塊安裝,根據這里安裝即可
????? ? 編譯
./configure --prefix=/opt/programs/nginx_1.10.3 --add-dynamic-module=/opt/programs/ngx_kafka_module????? ? 安裝
make make install????? ? nginx配置
????????? ? 注意so文件的加載位置
#user nobody; worker_processes 1;error_log logs/error.log; pid logs/nginx.pid;load_module modules/ngx_http_kafka_module.so;events {worker_connections 1024; }http {include mime.types;default_type application/octet-stream;log_format main $remote_addr|$time_local|$request|$status|$body_bytes_sent|$http_user_agent|$http_x_forwarded_for|$request_body;access_log logs/access.log main;sendfile on;keepalive_timeout 65;kafka_broker_list 172.31.68.243:9092;server {listen 80;server_name localhost;access_log logs/host.access.log main;location / {kafka_topic test2;}} }測試
? ? 通過http接口發送消息
curl -X POST -d 'guol123456' 'http://172.31.68.243/kafka_topic'? ? 查看kafka
?
?
轉載于:https://my.oschina.net/guol/blog/855782
總結
以上是生活随笔為你收集整理的Nginx Kafka数据生产接口的全部內容,希望文章能夠幫你解決所遇到的問題。

- 上一篇: 本地连接不见了怎么办?
- 下一篇: 使用curl操作InfluxDB