GoReplay profile
As the complexity of an application increases, the amount of work required to test it increases exponentially. GoReplay gives us a simple idea of reusing existing traffic for testing. GoReplay is a simple traffic recording plugin developed with Golang. It supports many kinds of filtering, limiting and overwriting. GoReplay is completely non-invasive to code, does not require changes to your production infrastructure, and is language neutral. Instead of a proxy, it listens directly for traffic on the network card.
How GoReplay works: The Listener Server captures traffic and sends it to replay Server or saves it to a file or kafka. Replay Server then diverts traffic to the configured address
Use process
Requirements: The algorithm needs to record real traffic in the production environment and play it back to any environment at any time.
As part of the scenes on the algorithm side are written in non-Java language, the existing flow recording platform cannot support it temporarily. Therefore, a new recording component is needed to support the pressure measurement requirements, so Goreplay is selected.
GoReplay supports storing recorded data to a local file and then reading it from the file when playing back. Considering the complexity of storing and delivering files each time we record playback, we expect a more convenient way to manage data.
GoReplay also natively supports recording data to store in Kafka, but it has some limitations when used. When kafka is used to store data, traffic must be recorded and played back at the same time. The architecture is as follows:
Processes 1-4 cannot be split, but must be done simultaneously
This will appear that the flow recording playback function is very weak, we need to record the data playback at any time, and also to support a recorded data playback multiple times. Now that it has stored traffic data in Kafka, we can consider adapting GoReplay to support our requirements.
Architecture diagram of traffic recording and playback after transformation:In the figure, stages 1-2 and 3-5 are independent of each other
In other words, the flow recording process can be separated from the playback process. By recording kafka’s offset at the start and end of a recording, we can know what data the recording task contains. We can easily organize each recorded data into a recording task, and then play back the traffic when needed.
Transformation and integration
Kafka offset supports modification
Brief process:
InputKafkaConfig source code definition
type InputKafkaConfig struct { producer sarama.AsyncProducer consumer sarama.Consumer Host string
json:”input-kafka-host”Topic string
json:”input-kafka-topic”UseJSON bool
json:”input-kafka-json-format” }
Modified definition of InputKafkaConfig
type InputKafkaConfig struct { producer sarama.AsyncProducer consumer sarama.Consumer Host string
json:”input-kafka-host”Topic string
json:”input-kafka-topic”UseJSON bool
json:”input-kafka-json-format”StartOffset int64
json:”input-kafka-offset”EndOffset int64
json:”input-kafka-end-offset” }
In the source code, a fragment of data is read from Kafka: as you can see, it selects an offset of Newest
for index, partition := range partitions { consumer, err := con.ConsumePartition(config.Topic, partition, sarama.OffsetNewest) go func(consumer sarama.PartitionConsumer) { defer consumer.Close() for message := range consumer.Messages() { i.messages <- message } }(consumer) }
A modified fragment of reading data from Kafka:
for index, partition := range partitions { consumer, err := con.ConsumePartition(config.Topic, partition, config.StartOffset) offsetEnd := config.EndOffset - 1 go func(consumer sarama.PartitionConsumer) { defer Consumer.close () for message := range consumer.messages () { Offset > offsetEnd {i.quit < -struct {}{} break} i.messages < -message}}(consumer)}
In this case, just specify the range of kafka offset when starting the playback task. We can get what we want.
Integrated into the pressure measuring platform
StringBuilder builder = new StringBuilder(“nohup /opt/apps/gor/gor”); Append (” –input-kafka-host “).append(“‘”).append(kafkaServer).append(“‘”); builder.append(” –input-kafka-topic “).append(“‘”).append(kafkaTopic).append(“‘”); builder.append(” –input-kafka-start-offset “).append(record.getStartOffset()); builder.append(” –input-kafka-end-offset “).append(record.getEndOffset()); builder.append(” –output-http “).append(replayDTO.getTargetAddress()); builder.append(” –exit-after “).append(replayDTO.getMonitorTimes()).append(“s”); if (StringUtils.isNotBlank(replayDTO.getExtParam())) { builder.append(” “).append(replayDTO.getExtParam()); } builder.append(” > /opt/apps/gor/replay.log 2>&1 &”); String completeParam = builder.toString();
The pressure platform controls the start and stop of the GoReplay process through the Java Agent exposed interface
String sourceAddress = replayDTO.getSourceAddress(); String[] split = sourceAddress.split(COMMA); for (String ip : split) { String uri = String.format(HttpTrafficRecordServiceImpl.BASE_URL + "/gor/start", ip, HttpTrafficRecordServiceImpl.AGENT_PORT); GoreplayRequest = new GoreplayRequest(); request.setConfig(replayDTO.getCompleteParam()); request.setType(0); try { restTemplate.postForObject(uri, request, String.class); } catch (RestClientException e) {logutil. error(" Start gor fail, please check it!" , e); MSException. ThrowException (" start gor fail, both please check it!" , e); }}