HI! Hello, I’m Zane. ZanePerfor is a front-end performance monitoring platform developed by me. It now supports web browser side and wechat applet side.
I define it as a complete, high performance, high availability front-end performance monitoring system, which is the goal that will be achieved in the future. The current architecture also basically supports high availability, high performance deployment. In fact, it is not enough. There is still room for improvement in many places. I will continue to improve and upgrade.
Open source is not easy, if you also love technology, embrace open source, hope to give a small support to a star.
Github address of the project: github.com/wangweiange…
Project documentation: blog.seosiwei.com/performance…
Kafka is an open source stream processing platform developed by the Apache Software Foundation and written in Scala and Java. Kafka is a high-throughput distributed publish-subscribe messaging system that processes all action flow data in consumer-scale websites. This action (web browsing, searching and other user actions) is a key factor in many social functions on the modern web. This data is usually addressed by processing logs and log aggregation due to throughput requirements. This is a viable solution for systems such as zanePerfor that monitor user access behavior and page performance but require real-time processing limitations.
ZanePerfor uses the Kafka-Node package for Kafka, and on this basis encapsulates the egg-kafka plug-in.
ZanePerfor has initially explored kafka’s application in Node.js. The following sections will focus on how Kafka is used in zanePerfor projects.
If you have any more tips or insights on kafka in Node.js, please share them with me.
Kafka application in zanePerfor project
Enable kafka configuration instructions:
/ / config/config. Default. Js / / kafka configuration (report_data_type = kafka) / / configuration reference https://www.npmjs.com/package/kafka-node config.kafka = { client: { kafkaHost:'localhost:9092',
},
producer: {
web: {
topic: 'zane_perfor_web',
partition: 0, // default 0
attributes: 0, // default: 0
// timestamp: Date.now(),
},
wx: {
topic: 'zane_perfor_wx',},}, // Select either one of the consumer: {web: {topic:'zane_perfor_web',
offset: 0, // default 0
partition: 0, // default 0
isone: falseTotal_limit: 10000, total_limit: 10000, 0: no limit number: limit graceful degradation for high concurrency}, wx: {topic:'zane_perfor_wx',
isone: false,
total_limit: 10000,
},
},
consumerGroup: {
web: { // ConsumerGroup(options, topics)
topic: 'zane_perfor_web',
groupId: 'WebPerformanceGroup',
commitOffsetsOnFirstJoin: true,
},
wx: {
topic: 'zane_perfor_wx',
groupId: 'WxPerformanceGroup',
commitOffsetsOnFirstJoin: true,}}};Copy the code
Configuration description:
Client parameters:
The client parameters namely the KafkaClient in kafka – node, reference address: www.npmjs.com/package/kaf…
Parameter description: producer
Producer Can be configured on the Web and WX ends
Producer for the send of kafka – node parameters, refer to the address: www.npmjs.com/package/kaf…
Consumer Parameter Description:
Consumer is configured on both the Web and WX sides
Consumer parameters for consumer of kafka – node, refer to the address: www.npmjs.com/package/kaf…
ConsumerGroup Parameter Description:
ConsumerGroup is configured on both web and WX sides
ConsumerGroup parameters for consumerGroup of kafka – node, refer to the address: www.npmjs.com/package/kaf…
About Consumer Information
There are consumer and consumerGroup configurations in the config configuration. The rules are as follows:
- If the Consumer configuration is true, use the consumer configuration
- If you want to enable the consumerGroup configuration, comment out or delete the consumer configuration
Kafka production consumption logic implementation:
Core code implementation:
One: Producers
The performance of Kafka is very strong and supports extremely high concurrency. Therefore, all the information reported by the client is stored in the message queue. The traffic limiting policy is only applied to the consuming end, but not to the production end.
/ / app/controller/API/web/report. The js / / by kafka message queue consumer data async saveWebReportDataForKafka (query) {/ / producer this.app.kafka.send('web', JSON.stringify(query) ); / / consumerif(! isKafkaConsumer && ! this.app.config.kafka.consumer.web.isone) { this.ctx.service.web.reportTask.saveWebReportDatasForKafka(); isKafkaConsumer =true;
this.app.config.kafka.consumer.web.isone = true; }}Copy the code
-
This.app.kafka. Send is a method in egg-kafka that produces information
-
if (! isKafkaConsumer && ! . This app. Config. Kafka. Consumer. Web. Isone) is to ensure the subscribe message method is executed only once, but there is the news back, automatically trigger data subscription function, consumer spending.
Two: consumers
// app/service/web/report_task.js // kafka consumes information asyncsaveWebReportDatasForKafka() {
if (this.kafkaConfig.consumer) {
this.app.kafka.consumer('web', message => {
this.consumerDatas(message);
});
} else if (this.kafkaConfig.consumerGroup) {
this.app.kafka.consumerGroup('web', message => { this.consumerDatas(message); }); }}Copy the code
-
This.app.kafka. Consumer Separate consumption, egg- method exposed in Kafka
-
This. App. Kafka. ConsumerGroup consumption in the form of grouping messages
-
Use consumer first, then Use consumerGroup for consumption
Egg-kafka plug-in package description
In order to use Kafka better and more convenient, node-Kafka is encapsulated in the project.
For details, see: /lib/plugin/egg-kafka/lib/kafka.js
The send code is implemented as follows:
send(type, data) {
assert(type.'[egg-kafka] type is must required.');
if(! data)return;
let producer = this.app.config.kafka.producer[type) | | {};let producers = [];
if (typeof (data) === 'string') {
producer.messages = data;
producers = [ producer ];
} else if (Object.prototype.toString.call(data) === '[object Object]') {
producer = Object.assign({}, producer, data);
producers = [ producer ];
} else if (Object.prototype.toString.call(data) === '[object Array]') {
for (let i = 0; i < data.length; i++) {
data[i] = Object.assign({}, producer, data[i]);
}
producers = data;
}
this.producer.send(producers, (err, data) => {
if (err) assert(err, '[egg-kafka] err. errmsg ${err}');
console.log(data);
});
}Copy the code
Send has two parameters. The first parameter type is the send type. The value can be Web or WX.
After judging data, send can be called in the following ways:
// The message is String this.app.kafka.send('web'.'hello world! '); // The message is Object this.app.kafka.send('web',{ topic:'test', messages:'hello world! '}); // The message is Array this.app.kafka.send('web',[{ topic: 'test', messages: 'hi', partition: 0}]);Copy the code
The consumer method code implements:
consumer(type = 'web', fn) {
assert(type.'[egg-kafka] consumers type argument must be required');
const kafkaConfig = this.app.config.kafka;
const consumer = kafkaConfig.consumer[type) | | {}; const consumers = Array.isArray(consumer) ? consumer : [ consumer ]; const Consumer = kafka.Consumer; const _consumer = new Consumer( this.client, consumers, { autoCommit:true}); _consumer.on('error', err => {
this.app.coreLogger.error(`[egg-kafka] consumer have error ${err}`);
});
_consumer.on('message', message => {
fn && fn(message);
});
}Copy the code
ConsumerGroup code implementation:
consumerGroup(type = 'web', fn) {
assert(type.'[egg-kafka] consumers type argument must be required');
const kafkaConfig = this.app.config.kafka;
const kafkaHost = kafkaConfig.client.kafkaHost;
const consumerOption = kafkaConfig.consumerGroup[type) | | {}; const topic = consumerOption.topic; consumerOption.kafkaHost = kafkaHost; const ConsumerGroup = kafka.ConsumerGroup; const _consumer = new ConsumerGroup(consumerOption, topic); _consumer.on('error', err => {
this.app.coreLogger.error(`[egg-kafka] consumer have error ${err}`);
});
_consumer.on('message', message => {
fn && fn(message);
});
}Copy the code
Consumption flow limiting strategy:
Because Kafka is extremely powerful, zanePerfor only limits the flow for consumption
Code implementation:
Set the number of consumption pools
// config.default.js
{
topic: 'zane_perfor_web',
offset: 0, // default 0
partition: 0, // default 0
isone: falseTotal_limit: 10000, 0: no limit number: graceful degradation scheme for high concurrencyCopy the code
Determine the number of Kafka connection pools
// kafka connection pool limit const msgtab = query.time + query.ip;if (this.kafkatotal && this.kafkalist.length >= this.kafkatotal) return;
this.kafkalist.push(msgtab);Copy the code
Delete the consumption identifier after the data consumption is complete
// app/service/web/report_task.js getWebItemDataForKafka this.savepages (item, system.slow_page_time, Const index = this.kafkalist. IndexOf (msgtab);if (index > -1) this.kafkalist.splice(index, 1);
});Copy the code
So far, the application and encapsulation of Kafka in egg.js has been implemented.