Use NestJS + Redis + Kafka to achieve a simple second kill system
Tech Stack: our old buddyNestJS
, as well asioredis
.kafka-node
Recently I was researching kafka message queues, so I wanted to write a second kill to try my hand. I read several blogs without specific project examples, so I referenced various implementations to write a working project with NestJS.
Step one, create the project
The Nest CLI command is used to quickly generate the project template;
-
Install @nest/ CLI scaffolding for generating projects;
npm i -g @nest/cli # installation nest - cli
Copy the code
-
Generating project
nest new nest-seckill Build projects using nest CLI
cd ./nest-seckill
yarn # install dependencies
yarn add -S redis ioredis nestjs-redis kafka-node mysql2 typeorm uuid-random Add a dependency
Copy the code
Second, generate the Seckill module
The Nest CLI command is used to quickly generate template code; See the official documentation: Nest-CLI Documentation for details
-
generate
seckill.module.ts
File;Used to create kafka consumers, receive Kafka messages, write order information;
nest generate module seckill # nest g mo seckill Copy the code
-
generate
seckill.controller.ts
;RESTful interface for implementing seckill;
nest g co seckill Copy the code
-
generate
seckill.service.ts
;In the service, redis optimistic lock (Watch) and transaction (mult) are used to implement seckill logic, and Kafka Producer is used to produce a consumption data.
nest g service seckill Copy the code
-
generate
redis.service.ts
;Used to connect redis;
nest g service redis Copy the code
Modified contents:
import { Injectable } from '@nestjs/common' import { RedisService } from 'nestjs-redis' @Injectable(a)export class RedisClientService { constructor(private readonly redisService: RedisService) {} // Connection configuration is set in app.module async getSeckillRedisClient() { return await this.redisService.getClient('seckill')}}Copy the code
Step 3, write the second kill logic;
-
Define a kill interface:
Add a Post interface to seckill.controller.ts:
import { Body, Controller, Post } from '@nestjs/common' import * as uuid from 'uuid-random' // Use uUID to generate order number import { CreateOrderDTO } from '.. /order/order.dto' // Add order field definition import { SeckillService } from './seckill.service' // Implement the seckill logic import { awaitWrap } from '@/utils/index' // async returns a simplified method @Controller('seckill') export class SeckillController { constructor(private readonly seckillService: SeckillService) {} @Post('/add') async addOrder(@Body() order: CreateOrderDTO) { constparams: CreateOrderDTO = { ... order,openid: `${uuid()}-The ${new Date().valueOf()}`,}// Call the secKill method of the service and wait for it to complete const [error, result] = await awaitWrap(this.seckillService.secKill(params)) return error || result } } Copy the code
-
Implement seckill logic:
Add a seckill method to seckill.service.ts;
Use Redis optimistic lock (Watch) and transaction (MULT) to modify data concurrently. See the Node Redis documentation for details.
import { Injectable, Logger } from '@nestjs/common' import * as kafka from 'kafka-node' import * as Redis from 'ioredis' import { RedisClientService } from '.. /redis/redis.service' import { getConfig } from '@root/config/index' // Redis and Kafka connection configuration import { awaitWrap } from '@/utils' const { redisSeckill, kafkaConfig } = getConfig() // Create kafka Client const kafkaClient = new kafka.KafkaClient({ kafkaHost: kafkaConfig.kafkaHost }) // Create a kafka producer const producer = new kafka.Producer(kafkaClient, { // Configuration for when to consider a message as acknowledged, default 1 requireAcks: 1.// The amount of time in milliseconds to wait for all acks before considered, default 100ms ackTimeoutMs: 100.// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0 partitionerType: 2,})@Injectable(a)export class SeckillService { logger = new Logger('SeckillService') // Create a nest log instanceseckillRedisClient! : Redis.Redis// redis connection instance count = 0 // Number of current requests constructor(private readonly redisClientService: RedisClientService) { // Initializes the Redis connection asynchronously when a service is created this.redisClientService.getSeckillRedisClient().then(client= > { this.seckillRedisClient = client }) } / * * * * * * * * * * * * * * * * * * * * * * * * * * @ desc seconds kill concrete implementation * * * * * * * * * * * * * * * * * * * * * * * * * / async secKill(params) { const { seckillCounterKey } = redisSeckill this.logger.log('Current request count:The ${this.count++}`) // Tips: Use optimistic locks to solve concurrency const [watchError] = await awaitWrap(this.seckillRedisClient.watch(seckillCounterKey)) // Listen for 'counter' field changes watchError && this.logger.error(watchError) if (watchError) return watchError // Get the remaining quantity of the current order const [getError, reply] = await awaitWrap(this.seckillRedisClient.get(seckillCounterKey)) getError && this.logger.error(getError) if (getError) return getError if (parseInt(reply) <= 0) { this.logger.warn('It's sold out.') return 'It's sold out.' } // Tips: Use redis transactions to change the number of redis counters by one const [execError, replies] = await awaitWrap(this.seckillRedisClient.multi().decr(seckillCounterKey).exec()) execError && this.logger.error(execError) if (execError) return execError // The counter field is in operation, waiting for counter to be released by others if(! replies) {this.logger.warn('Counter is used') this.secKill(params) // Automatically retry return } // Kafka consumes the content of data const payload = [ { topic: kafkaConfig.topic, partition: 0.messages: [JSON.stringify(params)], }, ] this.logger.log('Payload :') this.logger.verbose(payload) // Wait asynchronously to send kafka consumption data return new Promise((resolve, reject) = > { producer.send(payload, (err, kafkaProducerResponse) = > { if (err) { this.logger.error(err) reject(err) return err } this.logger.verbose(kafkaProducerResponse) resolve({ payload, kafkaProducerResponse }) }) }) } } Copy the code
-
Listen for kafka messages, consume order queue messages;
In seckill. Module. Ts new handleListenerKafkaMessage () method, is used to deal with kafka message;
You also need to call this method when the Seckill module mounts (onApplicationBootstrap) to start subscribesto kafka messages.
import { Logger, Module, OnApplicationBootstrap } from '@nestjs/common' import * as Redis from 'ioredis' import { awaitWrap } from '@/utils' import { CreateOrderDTO } from '.. /order/order.dto' import { OrderModule } from '.. /order/order.module' import { OrderService } from '.. /order/order.service' import { RedisClientService } from '.. /redis/redis.service' import { getKafkaConsumer } from './kafka-utils' import { SeckillController } from './seckill.controller' import { SeckillService } from './seckill.service' import { getConfig } from '@root/config' const { kafkaConfig } = getConfig() @Module({ imports: [OrderModule], providers: [RedisClientService, SeckillService], controllers: [SeckillController], }) export class SeckillModule implements OnApplicationBootstrap { logger = new Logger('SeckillModule') seckillRedisClient! : Redis.Redisconstructor( private readonly orderService: OrderService, // The Service that processes the order private readonly seckillService: SeckillService, // Seckill related implementation private readonly redisClientService: RedisClientService / / redis connection ) { this.redisClientService.getSeckillRedisClient().then(client= > { this.seckillRedisClient = client }) } async handleListenerKafkaMessage() { const kafkaConsumer = getKafkaConsumer() // Extract create consumer implementation methods as functions kafkaConsumer.on('message'.async message => { this.logger.log(The producer data obtained is:) this.logger.verbose(message) letorder! : CreateOrderDTO// The order data from the kafka queue is the messages content of the producer.send service if (typeof message.value === 'string') { order = JSON.parse(message.value) } else { order = JSON.parse(message.value.toString()) } // Write to the database to complete the order creation const [err, order] = await awaitWrap(this.orderService.saveOne(value)) if (err) { this.logger.error(err) return } this.logger.log(【 ` order${order.id}The information is stored in the database)})}async onApplicationBootstrap() { this.logger.log('onApplicationBootstrap: ') await this.seckillService.initCount() // Reset the remaining inventory in redis this.handleListenerKafkaMessage() } } Copy the code
-
Kafka consumers
getKafkaConsumer
The method is implemented as follows:Add kafka-utils.ts file to seckill module folder:
import * as kafka from 'kafka-node' import * as Redis from 'ioredis' import { getConfig } from '@root/config/index' import { awaitWrap } from '@/utils' const { kafkaConfig } = getConfig() letkafkaConsumer! : kafka.Consumer// Get the Kafka client function getKafkaClient() { letkafkaClient! : kafka.KafkaClientreturn () = > { if(! kafkaClient) { kafkaClient =new kafka.KafkaClient({ kafkaHost: kafkaConfig.kafkaHost, }) } return kafkaClient } } / * * *@desc Get the consumer instance */ export function getKafkaConsumer() { // Consumer the topics configuration to subscribe to const topics = [ { topic: kafkaConfig.topic, partition: 0.offset: 0,},]const options = { // Auto-commit configuration (false does not commit offsets, reads from scratch each time) autoCommit: true.autoCommitIntervalMs: 5000.// If set to true, the consumer gets the message from the given offset in the payload fromOffset: false,}const kafkaClient = getKafkaClient()() if(! kafkaConsumer) { kafkaConsumer =new kafka.Consumer(kafkaClient, topics, options) } return kafkaConsumer } Copy the code
The resulting file structure looks something like this:
Operating Projects:
yarn dev
Copy the code
Some instructions
-
If you need to test the seckill interface concurrently, you can use Postman’s Runner multiple; Simple test interface logic, you can open the project default configuration swagger – UI page http://localhost:3000/api-docs
-
So that’s the end of our main kill logic. Since we are mainly implementing the seckill logic, all the order module code is not expanded here. With just a few lines of command as in step 2, we can simply create the Order module for Order curd;
-
For redis,mysql, Kafka and other services, docker-comemage. yaml can be written to start up quickly. For details, please refer to the code of this project.
Kafka containers may fail to start due to the firewall of centos. The solution is to close the firewall of the host and restart the docker.
-
Kafka container after creation, we need to open the browser to access kafka-Manager container mapping 9000 port kafka management page, create cluster and our Topic, specific initialization operation is relatively simple, can search kafka-manager;
For example,Kafka cluster management tool Kafka-manager installation use
Github address: github.com/wenqieqiu/n…