This article has been authorized by the author Yang Wangshu NetEase cloud community.
Welcome to visit NetEase Cloud Community to learn more about NetEase’s technical product operation experience.
background
When you view logs on the server, you often run the tail -f command to track file changes in real time. So the question is, if you write a function of the same, where to write? If you have used Beats/FileBeat in ELK, you will know that fileBeat monitors log changes and sends the latest data to ElasticSearch/kafka/… Filebeat is an append new line that triggers a message, preprocessing the line, printing it, and connecting it to Kafka. You can still read from where you last read.
tool
Golang IDEA
A flowchart
The specific implementation
As can be seen from the flow chart, we need to solve the following problems
-
Record the position of the file read before the last program closure, and load this position information when the next program starts.
-
The file locates and reads lines, and publishes the lines read
-
Monitor changes in document content and issue notifications
Record the last read position
The key question should be when to record the offset that was last read.
-
If the program hangs after publishing but before recording, the line will be read again after the program restarts.
-
Record the data immediately after it is read, and release it to the public only after it is recorded successfully. This will cause another problem, before the release of the program hung, after the restart, the message may not be sent, the external is not available.
If I understand you correctly, the Elastic fileBeat chooses the first and doesn’t do any exception handling. It sets up a channel pool, receives and writes location information asynchronously, and if it fails, prints an error log and continues
logp.Err("Writing of registry returned error: %v. Continuing...", err)Copy the code
The file locates and reads lines, and publishes the lines read
To read a file, you first have a reader
func (tail *Tailf) openReader() {
tail.file, _ = os.Open(tail.FileName)
tail.reader = bufio.NewReader(tail.file)
}Copy the code
For reading a line from offset =0, there is no problem, just use the following method.
func (tail *Tailf) readLine() (string, error) {
line, err := tail.reader.ReadString('\n') iferr ! = nil {return line, err
}
line = strings.TrimRight(line, "\n") return line, err
}Copy the code
However, if the contents of the file are added but not yet to a line, i.e. the EOF(end of file) does not appear \n, then we need to wait for offset to remain on the line.
func (tail *Tailf) getOffset() (offset int64, err error) {
offset, err = tail.file.Seek(0, os.SEEK_CUR)
offset -= int64(tail.reader.Buffered()) return}func (tail *Tailf) beginWatch() {
tail.openReader() var offset int64
for{// Offset, _ = tail.getoffSet () line, err := tail.readline ()if err == nil {
tail.publishLine(line)
} else ifErr == IO.EOF {// EOF is read, offset is set to tail. SeekTo (Seek{offset: offset,whence: 0}) //block and wait for changes
tail.waitChangeEvent()
} else {
fmt.Println(err) return}}func (tail *Tailf) seekTo(pos Seek) error {tail.file.seek (pos. Offset, pos.whence) This reader must be reset to take effect tail.reader.reset (tail.file)returnFunc (tail *Tailf) publishLine(line string) {tail.lines < -line}Copy the code
Now let’s talk about waitChangeEvent
How do I monitor and notify file content changes
There are basically two ways to monitor the increase in file content
-
Poll monitors changes in the last modification time and file size
-
Monitoring is implemented using Linux’s inotify command, which triggers events when a file’s state changes
The first method is used here, and fileBeat also uses the first method. How do we do that ourselves?
//currReadPos: Func (w *PollWatcher) ChangeEvent(currReadPos int64) (*ChangeEvent, error) {watchingFile, err := os.Stat(w.FileName)iferr ! = nil {returnnil, Err} changes := NewChangeEvent() // Current size w.filesize = currReadPos // previous change time previousModTime := watchingfile.modtime () / / polling gofunc() {
previousSize := w.FileSize for{time.sleep (POLL_DURATION) // A lot of code is omitted here, assuming the file exists and has not been renamed, deleted, etc., the file is appended like a log file, _ := os.Stat(w.FileName) // ... Omit a large chunk of codeifPreviousSize > 0 &&previoussize < w.filesize {// The file is modified changes.notifyModified () previousSize = w.filesizecontinue} previousSize = w.filesize // modTime := file.modtime ()ifmodTime ! = previousModTime { previousModTime = modTime changes.NotifyModified() } } }()return changes, nil}Copy the code
The changes.NotifyModified method here simply adds ce.Modified < -true to the Modified Channel in the instance below
type ChangeEvent struct {
Modified chan bool
Truncated chan bool
Deleted chan bool}Copy the code
It is this action that, in the main thread, notifies the file that it has been modified and continues with the readLine action
BeginWatch func (tail *Tailf)waitChangeEvent() error { // ... Omit the initialization action select {// only test for file content incrementcase <-tail.changes.Modified:
fmt.Println(">> find Modified") returnnil // ... Omit other}}Copy the code
With this string of code, we can now monitor file changes in Main
func main() {
t, _ := tailf.NewTailf("/Users/yws/Desktop/test.log") forFMT.Println(line)}}Copy the code
The extension point
This extension point is the same as FileBeat.
-
When reading, it is not necessarily read by line, can read multiple lines, JSON parsing, etc
-
At the time of release, this example is written directly to console, which can be connected to Kafka, Redis, database, etc
-
. I can’t think of anything
conclusion
Although it is a very simple function, modern major server-side programming languages can basically implement, but why use GO to implement? A list of advantages and disadvantages will not be listed.. This is not a soft text
-
The code is clean and doesn’t support a lot of high-level language features, but it still looks great, except for those overwrapped structs and weird names.
-
Write concurrency (Goroutine) is so simple, so elegant, but also easy to abuse by novices like me, this language debug is a bit of a pain in the neck right now
-
Goroutine communication is also so simple, the channel design is great, it is fun to use
-
Method (yinstance.method ()) returns multiple values. If yinstance.method () returns multiple values, it must be split into 2 or more lines. Every time the compiler returns an error, it wants to break the keyboard.
The resources
-
Github.com/elastic/bea… Filebeat is just one feature
-
Github.com/hpcloud/tai… Half of the writing found that others had done the same thing, the code is basically the same, interested can see his code, write more perfect.
NetEase Cloud Free experience pavilion, 0 cost experience 20+ cloud products!
For more information about NetEase’s technology, products and operating experience, please click here.
Spark — Provides more flexible capabilities for data analysis and processing