Mainly based on: github.com/observerss/…
A simple implementation
- Mainly through
Circulation and the replace
In the way of sensitive word replacement
class NaiveFilter() :
'''Filter Messages from keywords very simple filter implementation >>> f = NaiveFilter() >>> f.parse("filepath") >>> f.filter("hello sexy baby") hello **** baby '''
def __init__(self) :
self.keywords = set([])
def parse(self, path) :
for keyword in open(path):
self.keywords.add(keyword.strip().decode('utf-8').lower())
def filter(self, message, repl="*") :
message = str(message).lower()
for kw in self.keywords:
message = message.replace(kw, repl)
return message
Copy the code
This is implemented using BSF(width first search)
The search lookup has been optimized, for English words, directly by the word index dictionary lookup. For other language patterns, we use a pattern that looks for matches character by character. BFS: width-first search
class BSFilter:
'''Filter Messages from keywords Use Back Sorted Mapping to reduce replacement times >>> f = BSFilter() >>> f.add("sexy") >>> f.filter("hello sexy baby") hello **** baby '''
def __init__(self) :
self.keywords = []
self.kwsets = set([])
self.bsdict = defaultdict(set)
self.pat_en = re.compile(r'^[0-9a-zA-Z]+$') # english phrase or not
def add(self, keyword) :
if not isinstance(keyword, str):
keyword = keyword.decode('utf-8')
keyword = keyword.lower()
if keyword not in self.kwsets:
self.keywords.append(keyword)
self.kwsets.add(keyword)
index = len(self.keywords) - 1
for word in keyword.split():
if self.pat_en.search(word):
self.bsdict[word].add(index)
else:
for char in word:
self.bsdict[char].add(index)
def parse(self, path) :
with open(path, "r") as f:
for keyword in f:
self.add(keyword.strip())
def filter(self, message, repl="*") :
if not isinstance(message, str):
message = message.decode('utf-8')
message = message.lower()
for word in message.split():
if self.pat_en.search(word):
for index in self.bsdict[word]:
message = message.replace(self.keywords[index], repl)
else:
for char in word:
for index in self.bsdict[char]:
message = message.replace(self.keywords[index], repl)
return message
Copy the code
DFA(Deterministic Finite Automaton) is used to implement it
DFA is Deterministic Finite Automaton, also known as Deterministic Finite Automaton. Nested dictionaries are used to achieve this.
class DFAFilter() :
'''Filter Messages from keywords Use DFA to keep algorithm perform constantly >>> f = DFAFilter() >>> f.add("sexy") >>> f.filter("hello sexy baby") hello **** baby '''
def __init__(self) :
self.keyword_chains = {}
self.delimit = '\x00'
def add(self, keyword) :
if not isinstance(keyword, str):
keyword = keyword.decode('utf-8')
keyword = keyword.lower()
chars = keyword.strip()
if not chars:
return
level = self.keyword_chains
for i in range(len(chars)):
if chars[i] in level:
level = level[chars[i]]
else:
if not isinstance(level, dict) :break
for j in range(i, len(chars)):
level[chars[j]] = {}
last_level, last_char = level, chars[j]
level = level[chars[j]]
last_level[last_char] = {self.delimit: 0}
break
if i == len(chars) - 1:
level[self.delimit] = 0
def parse(self, path) :
with open(path,encoding='UTF-8') as f:
for keyword in f:
self.add(keyword.strip())
def filter(self, message, repl="*") :
if not isinstance(message, str):
message = message.decode('utf-8')
message = message.lower()
ret = []
start = 0
while start < len(message):
level = self.keyword_chains
step_ins = 0
for char in message[start:]:
if char in level:
step_ins += 1
if self.delimit not in level[char]:
level = level[char]
else:
ret.append(repl * step_ins)
start += step_ins - 1
break
else:
ret.append(message[start])
break
else:
ret.append(message[start])
start += 1
return ' '.join(ret)
Copy the code