-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
225 lines (197 loc) · 7.16 KB
/
main.py
File metadata and controls
225 lines (197 loc) · 7.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# 主程序入口
import time
import threading
import sys
from key_storage import KeyStorage
from balance_checker import BalanceChecker
from key_manager import KeyManager
from data_persistence import DataPersistence
class ApiKeyPoolApp:
"""
API密钥池应用程序
提供命令行界面来管理API密钥
"""
def __init__(self):
"""
初始化应用程序
"""
self.key_storage = KeyStorage()
self.balance_checker = BalanceChecker(self.key_storage)
self.key_manager = KeyManager(self.key_storage, self.balance_checker)
self.data_persistence = DataPersistence(self.key_storage)
# 加载数据
self.data_persistence.load_data()
# 启动自动保存线程
self.auto_save_thread = threading.Thread(
target=self.data_persistence.auto_save,
args=(300,), # 每5分钟保存一次
daemon=True
)
self.auto_save_thread.start()
def show_menu(self):
"""
显示主菜单
"""
print("\n===== API密钥管理池 =====")
print("1. 添加新密钥")
print("2. 批量添加密钥")
print("3. 查看密钥状态")
print("4. 更新所有密钥状态")
print("5. 获取可用密钥")
print("6. 查看统计信息")
print("7. 删除密钥")
print("8. 保存数据")
print("0. 退出程序")
print("======================\n")
def add_key(self):
"""
添加新密钥
"""
key = input("请输入API密钥: ").strip()
if not key:
print("密钥不能为空")
return
success, message = self.key_manager.add_key(key)
print(message)
def batch_add_keys(self):
"""
批量添加密钥
"""
print("请输入多个API密钥,每行一个,输入空行结束:")
keys = []
while True:
key = input().strip()
if not key:
break
keys.append(key)
if not keys:
print("未输入任何密钥")
return
success_count, fail_count = self.key_manager.batch_add_keys(keys)
print(f"添加完成,成功: {success_count}个,失败: {fail_count}个")
def view_key_status(self):
"""
查看密钥状态
"""
key = input("请输入要查询的API密钥 (留空显示所有): ").strip()
if key:
status, info = self.key_storage.get_key_info(key)
if status == "valid":
print(f"密钥状态: 有效,余额: {info.get('balance')}")
elif status == "invalid":
print(f"密钥状态: 无效,原因: {info.get('reason')}")
else:
print("密钥不存在")
else:
# 显示所有密钥
valid_keys = self.key_storage.get_all_valid_keys()
invalid_keys = self.key_storage.get_all_invalid_keys()
print(f"\n有效密钥 ({len(valid_keys)}个):")
for i, (key, info) in enumerate(valid_keys.items(), 1):
print(f"{i}. {key[:10]}...{key[-5:]} - 余额: {info.get('balance')}")
print(f"\n无效密钥 ({len(invalid_keys)}个):")
for i, (key, info) in enumerate(invalid_keys.items(), 1):
print(f"{i}. {key[:10]}...{key[-5:]} - 原因: {info.get('reason')}")
def update_all_keys(self):
"""
更新所有密钥状态
"""
print("正在更新所有密钥状态,请稍候...")
success_count, fail_count, recovered_count = self.key_manager.update_all_keys_status()
print(f"更新完成,成功: {success_count}个,失败: {fail_count}个,恢复: {recovered_count}个")
def get_available_key(self):
"""
获取可用密钥
"""
min_balance = input("请输入最小余额要求 (默认0.1): ").strip()
if not min_balance:
min_balance = 0.1
else:
try:
min_balance = float(min_balance)
except ValueError:
print("余额格式错误,使用默认值0.1")
min_balance = 0.1
key = self.key_manager.get_available_key(min_balance)
if key:
status, info = self.key_storage.get_key_info(key)
print(f"可用密钥: {key}")
print(f"余额: {info.get('balance')}")
else:
print(f"没有找到余额大于{min_balance}的可用密钥")
def show_statistics(self):
"""
显示统计信息
"""
stats = self.key_manager.get_statistics()
print("\n===== 密钥池统计信息 =====")
print(f"有效密钥数量: {stats['valid_count']}")
print(f"无效密钥数量: {stats['invalid_count']}")
print(f"总余额: {stats['total_balance']:.2f}")
if stats['invalid_reasons']:
print("\n无效原因统计:")
for reason, count in stats['invalid_reasons'].items():
print(f"- {reason}: {count}个")
print("=========================\n")
def remove_key(self):
"""
删除密钥
"""
key = input("请输入要删除的API密钥: ").strip()
if not key:
print("密钥不能为空")
return
if self.key_manager.remove_key(key):
print("密钥删除成功")
else:
print("密钥不存在")
def save_data(self):
"""
手动保存数据
"""
if self.data_persistence.save_data():
print("数据保存成功")
else:
print("数据保存失败")
def run(self):
"""
运行应用程序
"""
print("欢迎使用API密钥管理池系统")
while True:
self.show_menu()
choice = input("请选择操作 (0-8): ").strip()
if choice == "1":
self.add_key()
elif choice == "2":
self.batch_add_keys()
elif choice == "3":
self.view_key_status()
elif choice == "4":
self.update_all_keys()
elif choice == "5":
self.get_available_key()
elif choice == "6":
self.show_statistics()
elif choice == "7":
self.remove_key()
elif choice == "8":
self.save_data()
elif choice == "0":
# 退出前保存数据
self.data_persistence.save_data()
print("感谢使用,再见!")
break
else:
print("无效的选择,请重新输入")
# 程序入口
if __name__ == "__main__":
app = ApiKeyPoolApp()
try:
app.run()
except KeyboardInterrupt:
# 处理Ctrl+C中断
print("\n程序被中断,正在保存数据...")
app.data_persistence.save_data()
print("数据已保存,程序退出")
sys.exit(0)