Remember before
Tail is a common Linux command that can print the last n lines of a file or append data to a file in real time. Tail is simple to implement, but there are a lot of details that need to be considered in order to implement a complete tail, and there are other mechanisms that need to be introduced if performance is important. It started out as a simple implementation of tail’s basic functionality for Linux, but was later refined by Linux’s inotify mechanism for high-performance log file reads. Related source :github.com/so1n/exampl…
So1n. Me /2019/03/07/…
1. First version — Reads live data from the end of the file
The main idea is: open the file, move the pointer to the end of the file, and then there is data output data, no data sleep for a period of time.
import time
import sys
from typing import Callable, NoReturn
class Tail(object) :
def __init__(
self,
file_name: str,
output: Callable[[str], NoReturn] = sys.stdout.write,
interval: int = 1
) :
self.file_name: str = file_name
self.output: Callable[[str], NoReturn] = output
self.interval: int = interval
def __call__(self) :
with open(self.file_name) as f:
f.seek(0.2) # seek from the end of the file
while True:
line: str = f.readline()
if line:
self.output(line) # print a new line each time
else:
time.sleep(self.interval)
if __name__ == '__main__':
filename: str = sys.argv[0]
Tail(filename)()
Copy the code
Then just do the following call:
python xxx.py filename
Copy the code
2. Version 2 — Implementationtail -f
Tail -f By default, the last 10 lines of data are read first and then the real-time data is read from the end of the file. For small files, you can read all the contents of the file first, and output the last 10 lines, but the performance of reading the full text and then fetch the last 10 lines is not high, and the boundary conditions of rolling the last 10 lines from the last 10 lines are also very complex, first read the full text and then fetch the last 10 lines implementation:
import time
import sys
from typing import Callable, NoReturn
class Tail(object) :
def __init__(
self,
file_name: str,
output: Callable[[str], NoReturn] = sys.stdout.write,
interval: int = 1
) :
self.file_name: str = file_name
self.output: Callable[[str], NoReturn] = output
self.interval: int = interval
def __call__(self) :
with open(self.file_name) as f:
self.read_last_line(f)
while True:
line: str = f.readline()
if line:
self.output(line) # print a new line each time
else:
time.sleep(self.interval)
def read_last_line(self, f) :
last_lines = f.readlines()[-10:]
for line in last_lines:
self.output(line)
if __name__ == '__main__':
filename: str = sys.argv[0]
Tail(filename)()
Copy the code
As you can see, the implementation is very simple, compared to the first version of the read_last_line function, the next problem is to solve the performance problem, when the file is very large, this logic does not work, especially some log files are often several gigabytes in size, if you read all memory. In Linux, there is no interface that can specify a pointer to the bottom 10 lines. The only way to simulate the bottom 10 lines is as follows:
- First, the cursor jumps to the latest character, saves the current cursor, and then estimates the length of a line of data, preferably more than 1024 characters, here I use the length of a line to deal with
- Then jump to the characters of seek(-1024 * 10, 2) using the method of seek, and this is what we expect in the bottom 10 lines
- Then judge the content, if the jump character length is less than 10 * 1024, then prove that the entire file does not have 10 lines, then use the original
read_last_line
Methods. - If a jump to a length equal to 1024 *10, use the newline character to count the number of lines taken. If the number of lines is greater than 10, only the last 10 lines are printed. If you read only 4 lines, continue reading 6*1024 until 10 lines are read
Through the above step, the bottom 10 lines of data calculated can be printed out, can enter the additional data, but at this time the file content may have changed, our cursor has also changed, this time to jump back to the cursor saved, to prevent missing or repeated printing data.
Once the analysis is complete, you can start refactoring the read_last_line function.
import time
import sys
from typing import Callable.List, NoReturn
class Tail(object) :
def __init__(
self,
file_name: str,
output: Callable[[str], NoReturn] = sys.stdout.write,
interval: int = 1,
len_line: int = 1024
) :
self.file_name: str = file_name
self.output: Callable[[str], NoReturn] = output
self.interval: int = interval
self.len_line: int = len_line
def __call__(self, n: int = 10) :
with open(self.file_name) as f:
self.read_last_line(f, n)
while True:
line: str = f.readline()
if line:
self.output(line) # print a new line each time
else:
time.sleep(self.interval)
def read_last_line(self, file, n) :
read_len: int = self.len_line * n
# Jump cursor to end
file.seek(0.2)
Get the cursor position of the current end
now_tell: int = file.tell()
while True:
if read_len > file.tell():
If the jump is longer than the original file, print out the entire file
file.seek(0) Since the read method prints as a cursor, the cursor is reset
last_line_list: List[str] = file.read().split('\n')[-n:]
Retrieve cursor position
now_tell: int = file.tell()
break
# Jump to our expected character position
file.seek(-read_len, 2)
read_str: str = file.read(read_len)
cnt: int = read_str.count('\n')
if cnt >= n:
If the number of rows obtained is greater than the required number of rows, the number of the first n rows is obtained
last_line_list: List[str] = read_str.split('\n')[-n:]
break
else:
If the required number of rows is less than the required number of rows, estimate the number of rows to be fetched and continue to fetch
if cnt == 0:
line_per: int = read_len
else:
line_per: int = int(read_len / cnt)
read_len = line_per * n
for line in last_line_list:
self.output(line + '\n')
Reset the cursor to ensure that the data printed next is not duplicated
file.seek(now_tell)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-f"."--filename")
parser.add_argument("-n"."--num", default=10)
args, unknown = parser.parse_known_args()
if not args.filename:
raise RuntimeError('filename args error')
Tail(args.filename)(int(args.num))
Copy the code
3. Version 3 – Elegantly read output log files
It can be found that the logical performance of reading that block in real time is still very poor. If the file is read once per second, the real-time performance is too slow. If the interval is changed to small, the processor takes up too much. The best case scenario is if the file is updated and then printed, performance is guaranteed. Thankfully, inotify provides this functionality in Linux. In addition, one of the features of the log file is that it is logrotate. If the log is logrotate, we need to reopen the file and read the data further. Inotify can also be used in this case. When inotify gets an event to reopen the file, we reopen the file and read it again.
import os
import sys
from typing import Callable.List, NoReturn
import pyinotify
multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF # Monitor multiple events
class InotifyEventHandler(pyinotify.ProcessEvent) : Customize the event handler class, pay attention to inheritance
Perform encapsulation of inotify Events
f: 'open()'
filename: str
path: str
wm: 'pyinotify.WatchManager'
output: Callable
def my_init(self, **kargs) :
"""pyinotify.ProcessEvent requires that you not inherit __init__ directly. Instead, you override my_init.
# fetch file
filename: str = kargs.pop('filename')
if not os.path.exists(filename):
raise RuntimeError('Not Found filename')
if '/' not in filename:
filename = os.getcwd() + '/' + filename
index = filename.rfind('/')
if index == len(filename) - 1 or index == -1:
raise RuntimeError('Not a legal path')
self.f = None
self.filename = filename
self.output: Callable = kargs.pop('output')
self.wm = kargs.pop('wm')
Monitor only the path so that you know if the file has moved
self.path = filename[:index]
self.wm.add_watch(self.path, multi_event)
def read_line(self) :
""" Unified output method ""
for line in self.f.readlines():
self.output(line)
def process_IN_MODIFY(self, event) :
""" must be process_ event name, event stands for event object, here means file change is monitored, file read """
if event.pathname == self.filename:
self.read_line()
def process_IN_MOVE_SELF(self, event) :
""" must be process_ event name, event represents the event object, where the file is being monitored for reopening, file reading """
if event.pathname == self.filename:
# Reopen file detected that file has been moved
self.f.close()
self.f = open(self.filename)
self.read_line()
def __enter__(self) -> 'InotifyEventHandler':
self.f = open(self.filename)
return self
def __exit__(self, exc_type, exc_val, exc_tb) :
self.f.close()
class Tail(object) :
def __init__(
self,
file_name: str,
output: Callable[[str], NoReturn] = sys.stdout.write,
interval: int = 1,
len_line: int = 1024
) :
self.file_name: str = file_name
self.output: Callable[[str], NoReturn] = output
self.interval: int = interval
self.len_line: int = len_line
wm = pyinotify.WatchManager() # create WatchManager object
inotify_event_handler = InotifyEventHandler(
**dict(filename=file_name, wm=wm, output=output)
) # instantiate our custom event handler class using dict parameters
wm.add_watch('/tmp', multi_event) # Add a directory to monitor, and events
self.notifier = pyinotify.Notifier(wm, inotify_event_handler) # is passed in when notifier is instantiated, and notifier is executed automatically
self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler
def __call__(self, n: int = 10) :
Open files through inotify's with management
with self.inotify_event_handle as i:
First read the specified number of rows
self.read_last_line(i.f, n)
Enable inotify to listen
self.notifier.loop()
def read_last_line(self, file, n) :
read_len: int = self.len_line * n
Get the cursor position of the current end
file.seek(0.2)
now_tell: int = file.tell()
while True:
if read_len > file.tell():
If the jump is longer than the original file, print out the entire file
file.seek(0)
last_line_list: List[str] = file.read().split('\n')[-n:]
Retrieve cursor position
now_tell: int = file.tell()
break
file.seek(-read_len, 2)
read_str: str = file.read(read_len)
cnt: int = read_str.count('\n')
if cnt >= n:
If the number of rows obtained is greater than the required number of rows, the number of the first n rows is obtained
last_line_list: List[str] = read_str.split('\n')[-n:]
break
else:
If the required number of rows is less than the required number of rows, estimate the number of rows to be fetched and continue to fetch
if cnt == 0:
line_per: int = read_len
else:
line_per: int = int(read_len / cnt)
read_len = line_per * n
for line in last_line_list:
self.output(line + '\n')
Reset the cursor to ensure that the data printed next is not duplicated
file.seek(now_tell)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-f"."--filename")
parser.add_argument("-n"."--num", default=10)
args, unknown = parser.parse_known_args()
if not args.filename:
raise RuntimeError('filename args error')
Tail(args.filename)(int(args.num))
Copy the code
As you can see, instead of opening the file with open, you open it with inotify (which calls the my_init method to initialize it), and you run the same code that we opened n lines of, and inotify runs it. Before inotify runs, we mount the re-open file method and print file method in inotifiy events, and then when inotify runs, we execute the corresponding methods based on the corresponding events.