-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
341 lines (281 loc) · 12.5 KB
/
utils.py
File metadata and controls
341 lines (281 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
import base64
import logging
import os
from datetime import datetime
import pytz
from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES
from cryptography.hazmat.primitives import serialization, hashes, padding as sym_padding
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.asymmetric.padding import OAEP, MGF1, PSS
from cryptography.hazmat.primitives.ciphers import Cipher, modes
# Cấu hình logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Đặt múi giờ Việt Nam
vn_timezone = pytz.timezone('Asia/Ho_Chi_Minh')
def get_current_vn_timestamp():
"""Lấy timestamp hiện tại theo múi giờ Việt Nam"""
return int(datetime.now(vn_timezone).timestamp())
def generate_rsa_key_pair():
"""Tạo cặp khóa RSA 2048-bit"""
try:
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode()
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode()
logger.info("Generated RSA key pair successfully")
return public_pem, private_pem
except Exception as e:
logger.error(f"Failed to generate RSA key pair: {str(e)}")
raise
def handshake_initiate():
"""Tạo tín hiệu handshake 'Hello!'"""
logger.info("Initiating handshake with 'Hello!'")
return "Hello!"
def handshake_respond():
"""Tạo phản hồi handshake 'Ready!'"""
logger.info("Responding to handshake with 'Ready!'")
return "Ready!"
def encrypt_3des_key(triple_des_key, rsa_public_key_pem):
"""Mã hóa khóa TripleDES bằng khóa công khai RSA (OAEP + SHA-256)"""
try:
if len(triple_des_key) != 24:
logger.error(f"Invalid TripleDES key length: expected 24 bytes, got {len(triple_des_key)}")
raise ValueError("TripleDES key must be 24 bytes")
if not isinstance(rsa_public_key_pem, str):
logger.error("RSA public key must be a string")
raise ValueError("RSA public key must be a string")
public_key = serialization.load_pem_public_key(rsa_public_key_pem.encode())
encrypted_key = public_key.encrypt(
triple_des_key,
OAEP(
mgf=MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
encoded_key = base64.b64encode(encrypted_key).decode()
logger.info("Successfully encrypted TripleDES key")
return encoded_key
except Exception as e:
logger.error(f"Failed to encrypt TripleDES key: {str(e)}")
raise
def decrypt_3des_key(encrypted_key, rsa_private_key_pem):
"""Giải mã khóa TripleDES bằng khóa riêng RSA"""
try:
if not encrypted_key:
logger.error("Encrypted key is empty")
raise ValueError("Encrypted key is empty")
if not isinstance(rsa_private_key_pem, str):
logger.error("RSA private key must be a string")
raise ValueError("RSA private key must be a string")
try:
encrypted_key_bytes = base64.b64decode(encrypted_key)
except Exception as e:
logger.error(f"Invalid base64 encoded key: {str(e)}")
raise ValueError(f"Invalid base64 encoded key: {str(e)}")
private_key = serialization.load_pem_private_key(rsa_private_key_pem.encode(), password=None)
decrypted_key = private_key.decrypt(
encrypted_key_bytes,
OAEP(
mgf=MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
if len(decrypted_key) != 24:
logger.error(f"Decrypted TripleDES key has invalid length: {len(decrypted_key)} bytes")
raise ValueError(f"Decrypted TripleDES key must be 24 bytes, got {len(decrypted_key)}")
return decrypted_key
except ValueError as ve:
logger.error(f"Failed to decrypt TripleDES key: {str(ve)}")
raise
except Exception as e:
logger.error(f"Failed to decrypt TripleDES key: {str(e)}")
raise ValueError(f"Decryption failed: {str(e)}")
def sign_auth_info(sender_id, receiver_id, rsa_private_key_pem):
"""Ký thông tin xác thực bằng RSA/SHA-256"""
try:
timestamp = get_current_vn_timestamp()
auth_data = f"{sender_id}:{receiver_id}:{timestamp}"
private_key = serialization.load_pem_private_key(rsa_private_key_pem.encode(), password=None)
signature = private_key.sign(
auth_data.encode(),
PSS(
mgf=MGF1(hashes.SHA256()),
salt_length=padding.calculate_max_pss_salt_length(private_key, hashes.SHA256())
),
hashes.SHA256()
)
logger.info(f"Signed auth info: sender={sender_id}, receiver={receiver_id}, timestamp={timestamp}")
return {
"auth_data": auth_data,
"signature": base64.b64encode(signature).decode(),
"timestamp": timestamp
}
except Exception as e:
logger.error(f"Failed to sign auth info: {str(e)}")
raise
def verify_auth_info(signature, auth_data, rsa_public_key_pem):
try:
# Kiểm tra định dạng auth_data
parts = auth_data.split(':')
if len(parts) != 3:
logger.error(f"Invalid auth_data format: expected sender_id:receiver_id:timestamp, got {auth_data}")
return False
sender_id, receiver_id, timestamp = parts
timestamp = int(timestamp)
current_time = get_current_vn_timestamp()
# Kiểm tra timestamp (cho phép lệch ±5 phút)
if abs(current_time - timestamp) > 300:
logger.error(f"Timestamp expired: {timestamp} (current: {current_time})")
return False
logger.debug(f"Auth Data for verify: {auth_data}")
logger.debug(
f"Signature for verify: {signature[:50] if isinstance(signature, str) else str(signature)[:50]}...")
logger.debug(f"Public Key for verify: {rsa_public_key_pem[:50]}...")
public_key = serialization.load_pem_public_key(rsa_public_key_pem.encode())
public_key.verify(
base64.b64decode(signature),
auth_data.encode(),
PSS(
mgf=MGF1(algorithm=hashes.SHA256()),
salt_length=PSS.AUTO
),
hashes.SHA256()
)
logger.info(f"Verified auth info: sender={sender_id}, receiver={receiver_id}, timestamp={timestamp}")
return True
except Exception as e:
logger.error(f"Failed to verify auth info: {str(e)}")
return False
def generate_3des_key():
"""Tạo khóa TripleDES ngẫu nhiên (24 bytes)"""
try:
key = os.urandom(24)
logger.info("Generated TripleDES key successfully")
return key
except Exception as e:
logger.error(f"Failed to generate TripleDES key: {str(e)}")
raise
def encrypt_message(message, triple_des_key):
"""Mã hóa tin nhắn bằng TripleDES (CBC mode) với padding đúng cách"""
try:
if len(triple_des_key) != 24:
raise ValueError(f"TripleDES key must be 24 bytes, got {len(triple_des_key)}")
iv = os.urandom(8)
padder = sym_padding.PKCS7(64).padder() # Block size 64-bit cho TripleDES
# Chuyển đổi message sang bytes nếu chưa phải
message_bytes = message.encode() if isinstance(message, str) else message
padded_data = padder.update(message_bytes) + padder.finalize()
cipher = Cipher(TripleDES(triple_des_key), modes.CBC(iv))
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
return base64.b64encode(iv).decode(), base64.b64encode(ciphertext).decode()
except Exception as e:
logger.error(f"Encryption failed: {str(e)}", exc_info=True)
raise
def decrypt_message(ciphertext, iv, triple_des_key):
"""Giải mã tin nhắn với xử lý padding đúng cách"""
try:
if len(triple_des_key) != 24:
raise ValueError(f"TripleDES key must be 24 bytes, got {len(triple_des_key)}")
iv_bytes = base64.b64decode(iv)
ciphertext_bytes = base64.b64decode(ciphertext)
cipher = Cipher(TripleDES(triple_des_key), modes.CBC(iv_bytes))
decryptor = cipher.decryptor()
padded_plaintext = decryptor.update(ciphertext_bytes) + decryptor.finalize()
# Xử lý padding
unpadder = sym_padding.PKCS7(64).unpadder()
plaintext = unpadder.update(padded_plaintext) + unpadder.finalize()
return plaintext.decode('utf-8')
except ValueError as ve:
logger.error(f"Padding error: {str(ve)}")
raise ValueError(f"Invalid padding bytes: {str(ve)}")
except Exception as e:
logger.error(f"Decryption failed: {str(e)}", exc_info=True)
raise ValueError(f"Decryption failed: {str(e)}")
def compute_message_hash(iv, ciphertext):
"""Tính hash SHA-256 của IV || ciphertext"""
try:
digest = hashes.Hash(hashes.SHA256())
digest.update(base64.b64decode(iv) + base64.b64decode(ciphertext))
return digest.finalize().hex()
except Exception as e:
logger.error(f"Failed to compute message hash: {str(e)}")
raise
def sign_message(iv, ciphertext, rsa_private_key_pem):
"""Ký số tin nhắn (IV || ciphertext) bằng RSA/SHA-256"""
try:
private_key = serialization.load_pem_private_key(rsa_private_key_pem.encode(), password=None)
data = base64.b64decode(iv) + base64.b64decode(ciphertext)
signature = private_key.sign(
data,
PSS(
mgf=MGF1(hashes.SHA256()),
salt_length=padding.calculate_max_pss_salt_length(private_key, hashes.SHA256())
),
hashes.SHA256()
)
return base64.b64encode(signature).decode()
except Exception as e:
logger.error(f"Failed to sign message: {str(e)}")
raise
def verify_message(iv, ciphertext, signature, rsa_public_key_pem):
"""Xác minh chữ ký RSA và hash SHA-256 của tin nhắn"""
try:
public_key = serialization.load_pem_public_key(rsa_public_key_pem.encode())
public_key.verify(
base64.b64decode(signature),
base64.b64decode(iv) + base64.b64decode(ciphertext),
PSS(
mgf=MGF1(algorithm=hashes.SHA256()),
salt_length=PSS.AUTO
),
hashes.SHA256()
)
return True
except Exception as e:
logger.error(f"Failed to verify message signature: {str(e)}")
return False
def create_message_packet(message, triple_des_key, rsa_private_key_pem):
"""Tạo gói tin nhắn theo yêu cầu đề bài"""
try:
iv, ciphertext = encrypt_message(message, triple_des_key)
hash_value = compute_message_hash(iv, ciphertext)
signature = sign_message(iv, ciphertext, rsa_private_key_pem)
return {
"iv": iv,
"cipher": ciphertext,
"hash": hash_value,
"sig": signature
}
except Exception as e:
logger.error(f"Failed to create message packet: {str(e)}")
raise
def verify_and_decrypt_message(packet, triple_des_key, rsa_public_key_pem):
"""Xác minh và giải mã với xử lý lỗi tốt hơn"""
try:
# Kiểm tra hash trước
computed_hash = compute_message_hash(packet['iv'], packet['cipher'])
if computed_hash != packet['hash']:
return None, "NACK: Hash mismatch"
# Kiểm tra chữ ký
if not verify_message(packet['iv'], packet['cipher'], packet['sig'], rsa_public_key_pem):
return None, "NACK: Invalid signature"
# Giải mã
message = decrypt_message(packet['cipher'], packet['iv'], triple_des_key)
return message, "ACK"
except ValueError as ve:
logger.error(f"Decryption failed: {str(ve)}")
return None, f"NACK: {str(ve)}"
except Exception as e:
logger.error(f"Verification failed: {str(e)}", exc_info=True)
return None, f"NACK: {str(e)}"