-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAutoGroupNote.py
More file actions
170 lines (143 loc) · 6.4 KB
/
AutoGroupNote.py
File metadata and controls
170 lines (143 loc) · 6.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import re
import time
from datetime import datetime
import pyperclip
from pywinauto import Application, Desktop
# 配置参数
REFRESH_INTERVAL = 0.1 # 固定刷新间隔,秒
WINDOW_DELAY = 0.1 # 窗口按钮点击延迟
DEFAULT_USERNAME = "测试" # 默认接龙用户名
REPEAT_THRESHOLD = 1 # 连续识别接龙次数后才发送
RETRY_INTERVAL_ON_REPEAT = 0.1 # 接龙重复检测间隔
SEND_DELAY = 0 # 识别到接龙后的发送延迟
class WechatAutoGroupnote:
def __init__(self, window_title, log_out=True, send_delay_seconds=SEND_DELAY):
self.title = window_title
self.send_delay_seconds = float(send_delay_seconds)
self.log_out = log_out
self.content = ""
self.last_message = ""
self.groupnote_state_repeat_time = 0
self.initial_time = time.time()
try:
self.app = Application(backend='uia').connect(title_re=re.escape(self.title))
self.window = self.app.window(title_re=re.escape(self.title))
except Exception as e:
print("❌ 未能连接指定窗口,当前可用窗口:")
for w in Desktop(backend="uia").windows():
print(f" - {w.window_text()}")
raise e
def monitor_messages(self):
"""获取聊天窗口最新一条消息"""
lists = self.window.descendants(control_type='List')
if self.log_out:
print(f"[调试] 找到 {len(lists)} 个 List 控件")
for l in lists:
try:
messages = l.descendants(control_type='ListItem')
if messages:
return messages[-1]
except Exception:
continue
return None
def is_groupnote_message(self, msg):
"""识别是否为接龙"""
pattern = r'#接龙\s*\n+((\d+\.\s+.+\n*){1,})'
return re.search(pattern, msg, re.MULTILINE)
def generate_reply(self, msg):
"""自动生成接龙回复"""
pattern = r'^(\d+)\.\s*(.+)$'
matches = re.findall(pattern, msg, re.M)
max_number = max([int(num) for num, _ in matches], default=0)
lines = [f"{num}. {text}" for num, text in matches]
new_line = f"{max_number + 1}. {self.get_user_name()}"
return "#接龙\n\n" + "\n".join(lines + [new_line])
def send_message(self, text):
"""发送接龙消息"""
try:
input_box = self.window.child_window(control_type="Edit", found_index=0)
input_box.set_focus()
input_box.type_keys("^a{BACKSPACE}")
time.sleep(0.05)
pyperclip.copy(text)
input_box.type_keys("^v")
time.sleep(0.1)
input_box.type_keys("{ENTER}")
except Exception as e:
print(f"[错误] 发送失败: {e}")
def click_button(self, item):
"""点击消息中的空按钮以聚焦"""
try:
buttons = item.descendants(control_type="Button")
for btn in buttons:
if btn.window_text().strip() == "":
btn.set_focus()
btn.click_input()
time.sleep(WINDOW_DELAY)
return
except Exception as e:
print(f"[错误] 点击按钮失败: {e}")
def get_user_name(self):
return self.content or DEFAULT_USERNAME
def run(self, content):
self.content = content
while True:
try:
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
last_msg_item = self.monitor_messages()
if not last_msg_item:
print(f"[{now_str}] 未检测到消息,等待 {REFRESH_INTERVAL:.2f}s...\n")
time.sleep(REFRESH_INTERVAL)
continue
try:
msg_text = last_msg_item.window_text()
except Exception as e:
print(f"[调试] 获取文本失败: {e}")
time.sleep(REFRESH_INTERVAL)
continue
if self.log_out:
print(f"[调试] 消息内容:\n{msg_text}\n")
if msg_text and msg_text != self.last_message:
# 程序刚启动时忽略历史消息
if time.time() - self.initial_time < 1.5:
self.last_message = msg_text
print(f"[调试] 程序刚启动,忽略旧消息\n")
time.sleep(REFRESH_INTERVAL)
continue
if self.is_groupnote_message(msg_text):
self.groupnote_state_repeat_time += 1
print(f"[{now_str}] 第 {self.groupnote_state_repeat_time} 次检测到接龙")
if self.groupnote_state_repeat_time < REPEAT_THRESHOLD:
time.sleep(RETRY_INTERVAL_ON_REPEAT)
continue
self.click_button(last_msg_item)
if self.send_delay_seconds > 0:
print(f"[{now_str}] 即将发送接龙,等待 {self.send_delay_seconds}s...")
time.sleep(self.send_delay_seconds)
reply = self.generate_reply(msg_text)
self.send_message(reply)
print(f"[{now_str}] ✅ 已发送接龙:\n{reply}\n")
self.last_message = msg_text
break
else:
self.groupnote_state_repeat_time = 0
time.sleep(REFRESH_INTERVAL)
except Exception as e:
print(f"[错误] {e}")
self.groupnote_state_repeat_time = 0
time.sleep(REFRESH_INTERVAL)
# CLI 入口
if __name__ == "__main__":
windows = Desktop(backend="uia").windows()
print("当前检测到的窗口:")
for i, w in enumerate(windows):
print(f"{i}: {w.window_text()}")
try:
selected_index = int(input("请输入需要监控的窗口序号:"))
selected_title = windows[selected_index].window_text()
print(f"✅ 已选择窗口: {selected_title}")
except (ValueError, IndexError):
print("输入无效,程序退出。")
exit(1)
bot = WechatAutoGroupnote(window_title=selected_title, log_out=True)
bot.run(DEFAULT_USERNAME)