preface
With the increasing popularity of various social forums, sensitive word filtering has gradually become a very important and important function. What are the new implementations of sensitive word filtering in Python under the Serverless architecture? Can we implement an API for filtering sensitive words in the simplest way?
Sensitive Filtering Introduction
The Replace method
In Python, when it comes to word replacement, we have to think of replace. We can prepare a sensitive word library and replace sensitive words through replace:
def check_filter(keywords, text):
for eve in keywords:
text = text.replace(eve, "* * *")
return text
keywords = ("Key Word 1"."Key Word 2"."Key Word 3")
content = "This is an example of keyword substitution, involving keyword 1 and keyword 2, and keyword 3 at the end."
print(check_filter(keywords, content))
Copy the code
However, if you think about it a little bit, you will see that this approach has serious performance problems when the text and sensitive thesaurus are very large. For example, I modify the code to perform a basic performance test:
import time
def check_filter(keywords, text):
for eve in keywords:
text = text.replace(eve, "* * *")
return text
keywords =[ "Keywords" + str(i) for i in range(0.10000)]
startTime = time.time()
content = "This is an example of keyword substitution, involving keyword 1 and keyword 2, and keyword 3 at the end." * 10000
check_filter(keywords, content)
print(time.time()-startTime)
Copy the code
The output is 1.235044002532959, which shows very poor performance.
Regular expression method
Instead of using replace, it’s much faster to express re.sub with the re.
def check_filter(keywords, text):
return re.sub("|".join(keywords), "* * *", text)
keywords = ("Key Word 1"."Key Word 2"."Key Word 3")
content = "This is an example of keyword substitution, involving keyword 1 and keyword 2, and keyword 3 at the end."
print(check_filter(keywords, content))
Copy the code
We also added the performance test and reformed the test according to the above method. The output result was 0.47878289222717285. In this example, we can see that the performance level of this approach is much higher, which is at least several times higher, if the number of thesaurus increases, the multiple will be multiplied.
DFA filters sensitive words
This method is relatively more efficient. For example, we consider bad person, bad child, bad person to be sensitive words, then their tree relationship can be expressed:
Use the DFA dictionary to express:
{
'bad': {
'egg': {
'\x00': 0}.'people': {
'\x00': 0}.'child': {
'child': {
'\x00': 0}}}}Copy the code
The biggest advantage of using this tree to represent the problem is that it can reduce the number of retrieval times and improve the retrieval efficiency. The basic code is as follows:
import time
class DFAFilter(object):
def __init__(self):
self.keyword_chains = {} # keyword linked list
self.delimit = '\x00' # define
def add(self, keyword):
keyword = keyword.lower() Change keyword English to lowercase
chars = keyword.strip() The # keyword removes leading and trailing Spaces and line feeds
if not chars: Return if the keyword is empty
return
level = self.keyword_chains
Iterate over every word of the keyword
for i in range(len(chars)):
Enter the subdictionary if the word already exists in the key of the chain
if chars[i] in level:
level = level[chars[i]]
else:
if not isinstance(level, dict):
break
for j in range(i, len(chars)):
level[chars[j]] = {}
last_level, last_char = level, chars[j]
level = level[chars[j]]
last_level[last_char] = {self.delimit: 0}
break
if i == len(chars) - 1:
level[self.delimit] = 0
def parse(self, path):
with open(path, encoding='utf-8') as f:
for keyword in f:
self.add(str(keyword).strip())
def filter(self, message, repl="*"):
message = message.lower()
ret = []
start = 0
while start < len(message):
level = self.keyword_chains
step_ins = 0
for char in message[start:]:
if char in level:
step_ins += 1
if self.delimit not in level[char]:
level = level[char]
else:
ret.append(repl * step_ins)
start += step_ins - 1
break
else:
ret.append(message[start])
break
else:
ret.append(message[start])
start += 1
return ' '.join(ret)
startTime = time.time()
gfw = DFAFilter()
gfw.parse( "./sensitive_words.txt")
content = "This is an example of keyword substitution, involving keyword 1 and keyword 2, and keyword 3 at the end." * 10000
result = gfw.filter(content)
print(time.time()-startTime)
Copy the code
Here our dictionary library is:
with open("./sensitive_words".'w') as f:
f.write("\n".join( [ "Keywords" + str(i) for i inRange (0100, 00)))Copy the code
Execution Result:
4.9114227294921875 e-05Copy the code
You can see further performance improvements.
AC automata filter sensitive words algorithm
Next, let’s take a look at the AC automata filtering algorithm for sensitive words:
AC automata: A common example is to give n words and then a passage of m characters and ask you to find out how many words appear in the passage.
To put it simply, AC automata is dictionary tree + KMP algorithm + mismatched pointer
Code implementation:
# AC automata algorithm
class node(object):
def __init__(self):
self.next = {}
self.fail = None
self.isWord = False
self.word = ""
class ac_automation(object):
def __init__(self):
self.root = node()
# add sensitive word function
def addword(self, word):
temp_root = self.root
for char in word:
if char not in temp_root.next:
temp_root.next[char] = node()
temp_root = temp_root.next[char]
temp_root.isWord = True
temp_root.word = word
# failed pointer function
def make_fail(self):
temp_que = []
temp_que.append(self.root)
whilelen(temp_que) ! = 0: temp = temp_que.pop(0) p = Nonefor key, value in temp.next.item():
if temp == self.root:
temp.next[key].fail = self.root
else:
p = temp.fail
while p is not None:
if key in p.next:
temp.next[key].fail = p.fail
break
p = p.fail
if p is None:
temp.next[key].fail = self.root
temp_que.append(temp.next[key])
Find sensitive word functions
def search(self, content):
p = self.root
result = []
currentposition = 0
while currentposition < len(content):
word = content[currentposition]
while word inp.next == False and p ! = self.root: p = p.failif word in p.next:
p = p.next[word]
else:
p = self.root
if p.isWord:
result.append(p.word)
p = self.root
currentposition += 1
return result
Load sensitive thesaurus functions
def parse(self, path):
with open(path, encoding='utf-8') as f:
for keyword in f:
self.addword(str(keyword).strip())
# Sensitive word substitution function
def words_replace(self, text):
""":param ah: AC automaton :param text: text: return: filter text after sensitive words"""
result = list(set(self.search(text)))
for x in result:
m = text.replace(x, The '*' * len(x))
text = m
return text
ah = ac_automation()
path = './sensitive_words'
ah.parse(path)
content = "This is an example of keyword substitution, involving keyword 1 and keyword 2, and keyword 3 at the end."
print(ah.words_replace(content))
Copy the code
The thesaurus is also:
with open("./sensitive_words".'w') as f:
f.write("\n".join( [ "Keywords" + str(i) for i inRange (0100, 00)))Copy the code
Using the above method, test the content*10000 as 0.1727597713470459.
summary
All can see this algorithm, in the basic algorithm of DFA filtering sensitive parts of speech to the highest, but in fact, for the latter two algorithms, and no one is better, may be some of the time, AC automata filtering algorithm to get higher performance sensitive word, so in the production and living, recommend use both, when can be done according to their specific business needs.
How to deploy in the Serverless architecture
Very simple, take AC automata filter sensitive words algorithm as an example: we only need to add a few lines of code, the complete code is as follows:
# -*- coding:utf-8 -*-
import json, uuid
# AC automata algorithm
class node(object):
def __init__(self):
self.next = {}
self.fail = None
self.isWord = False
self.word = ""
class ac_automation(object):
def __init__(self):
self.root = node()
# add sensitive word function
def addword(self, word):
temp_root = self.root
for char in word:
if char not in temp_root.next:
temp_root.next[char] = node()
temp_root = temp_root.next[char]
temp_root.isWord = True
temp_root.word = word
# failed pointer function
def make_fail(self):
temp_que = []
temp_que.append(self.root)
whilelen(temp_que) ! = 0: temp = temp_que.pop(0) p = Nonefor key, value in temp.next.item():
if temp == self.root:
temp.next[key].fail = self.root
else:
p = temp.fail
while p is not None:
if key in p.next:
temp.next[key].fail = p.fail
break
p = p.fail
if p is None:
temp.next[key].fail = self.root
temp_que.append(temp.next[key])
Find sensitive word functions
def search(self, content):
p = self.root
result = []
currentposition = 0
while currentposition < len(content):
word = content[currentposition]
while word inp.next == False and p ! = self.root: p = p.failif word in p.next:
p = p.next[word]
else:
p = self.root
if p.isWord:
result.append(p.word)
p = self.root
currentposition += 1
return result
Load sensitive thesaurus functions
def parse(self, path):
with open(path, encoding='utf-8') as f:
for keyword in f:
self.addword(str(keyword).strip())
# Sensitive word substitution function
def words_replace(self, text):
""":param ah: AC automaton :param text: text: return: filter text after sensitive words"""
result = list(set(self.search(text)))
for x in result:
m = text.replace(x, The '*' * len(x))
text = m
return text
def response(msg, error=False):
return_data = {
"uuid": str(uuid.uuid1()),
"error": error,
"message": msg
}
print(return_data)
return return_data
ah = ac_automation()
path = './sensitive_words'
ah.parse(path)
def main_handler(event, context):
try:
sourceContent = json.loads(event["body"[])"content"]
return response({
"sourceContent": sourceContent,
"filtedContent": ah.words_replace(sourceContent)
})
except Exception as e:
return response(str(e), True)
Copy the code
Finally, to facilitate local testing, we can add:
def test():
event = {
"requestContext": {
"serviceId": "service-f94sy04v"."path": "/test/{path}"."httpMethod": "POST"."requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef"."identity": {
"secretId": "abdcdxxxxxxxsdfs"
},
"sourceIp": "14.17.22.34"."stage": "release"
},
"headers": {
"Accept-Language": "en-US,en,cn"."Accept": "text/html,application/xml,application/json"."Host": "service-3ei3tii4-251000691.ap-guangzhou.apigateway.myqloud.com"."User-Agent": "User Agent String"
},
"body": "{\"content\":\" This is a test text, I will also ha ha \"}"."pathParameters": {
"path": "value"
},
"queryStringParameters": {
"foo": "bar"
},
"headerParameters": {
"Refer": "10.0.2.14"
},
"stageVariables": {
"stage": "release"
},
"path": "/test/value"."queryString": {
"foo": "bar"."bob": "alice"
},
"httpMethod": "POST"
}
print(main_handler(event, None))
if __name__ == "__main__":
test(a)Copy the code
Once that’s done, we can test run, for example, my dictionary is:
Ha ha testCopy the code
Results after execution:
{'uuid': '9961ae2a-5cfc-11ea-a7c2-acde48001122'.'error': False, 'message': {'sourceContent': 'This is a test text, and I'm okay with it.'.'filtedContent': 'This is a ** text, I will **'}}
Copy the code
Next, we deploy the code to the cloud and create a new serverless.yaml:
sensitive_word_filtering:
component: "@serverless/tencent-scf"
inputs:
name: sensitive_word_filtering
codeUri: . /
exclude:
- .gitignore
- .git/**
- .serverless
- .env
handler: index.main_handler
runtime: Python3.6
region: ap-beijing
description: Sensitive word filtering
memorySize: 64
timeout: 2
events:
- apigw:
name: serverless
parameters:
environment: release
endpoints:
- path: /sensitive_word_filtering
description: Sensitive word filtering
method: POST
enableCORS: true
param:
- name: content
position: BODY
required: 'FALSE'
type: string
desc: Sentences to be filtered
Copy the code
SLS –debug is then deployed, and the result is as follows:
Finally, take the POSTMan test:
Additional words
- For sensitive thesaurus to go there to get questions, Github has many, you can search for yourself, Chinese and English have. I’m just doing an example here;
- This API usage scenario can be put into our community comment system/message comment system/blog release system to prevent the occurrence of sensitive words, resulting in unnecessary trouble.