-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtextutils.py
More file actions
94 lines (78 loc) · 3.89 KB
/
Copy pathtextutils.py
File metadata and controls
94 lines (78 loc) · 3.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import re
import os
import Levenshtein
from concurrent.futures import ProcessPoolExecutor
from rapidfuzz import fuzz, utils
from app_logging import writelog
def clean_text(text):
text = utils.default_process(text)
return text
def compare_single_contact(contact, checklist, threshold_high, threshold_low):
contact_name = contact["name"]
contact_name_cleaned = clean_text(contact_name)
matches_high = []
matches_low = []
if contact_name_cleaned == '':
return contact_name, matches_high, matches_low
for item in checklist:
checklist_name = item["name"]
checklist_name_cleaned = clean_text(checklist_name)
if checklist_name_cleaned == '':
continue
score = fuzz.token_sort_ratio(contact_name_cleaned, checklist_name_cleaned)
if score >= threshold_high:
matches_high.append((checklist_name, score))
elif score >= threshold_low:
matches_low.append((checklist_name, score))
return contact_name, matches_high, matches_low
def compare_contacts(contacts, checklist, threshold_high=97, threshold_low=80):
results = {}
# Определяем количество потоков
num_cores = os.cpu_count()
num_threads = (num_cores) // 2
if num_threads <= 0:
num_threads = 1
contacts_len = len(contacts)
chunk_size = contacts_len // num_threads + (contacts_len % num_threads > 0)
# Разделяем список contacts на части
chunks = [contacts[i:i + chunk_size] for i in range(0, contacts_len, chunk_size)]
checklist_len = len(checklist)
writelog(f"List1 {contacts_len} and list2 {checklist_len} comparison started in {num_threads} threads.")
with ProcessPoolExecutor(max_workers=num_threads) as executor:
futures = []
for chunk in chunks:
futures.append(executor.submit(process_chunk, chunk, checklist, threshold_high, threshold_low))
for future in futures:
contact_results = future.result()
for contact_name, matches_high, matches_low in contact_results:
# Добавляем контакт в результат только если есть совпадения в high или low
if matches_high or matches_low:
results[contact_name] = {
"matches_high": matches_high,
"matches_low": matches_low,
}
return results
def process_chunk(chunk, checklist, threshold_high, threshold_low):
results = []
for contact in chunk:
contact_name, matches_high, matches_low = compare_single_contact(contact, checklist, threshold_high, threshold_low)
results.append((contact_name, matches_high, matches_low))
return results
def find_best_match(string_list, text, length_penalty_factor):
cleaned_text = clean_text(text)
best_match = None
best_score = float('inf')
for current_string in string_list:
cleaned_current_string = clean_text(current_string)
if len(cleaned_current_string) == 0:
continue
for i in range(len(cleaned_text) - len(cleaned_current_string) + 1):
substring = cleaned_text[i:i+len(cleaned_current_string)]
distance = Levenshtein.distance(cleaned_current_string, substring)
# Нормализуем расстояние делением на длину строки и добавляем штраф за длину
normalized_distance = (distance / len(cleaned_current_string)) + (length_penalty_factor * len(cleaned_current_string))
# Используем нормализованное расстояние для определения лучшего совпадения
if normalized_distance < best_score:
best_score = normalized_distance
best_match = current_string
return best_match