-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSocketCommunicate.py
More file actions
104 lines (85 loc) · 3.81 KB
/
Copy pathSocketCommunicate.py
File metadata and controls
104 lines (85 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import socket, re, threading, time
class _SocketFather:
def __init__(self, ip:str, port:int) -> None:
if not isinstance(ip, str) or not isinstance(port, int):
raise TypeError("ip must be a string and port must be an integer")
if not re.match(r"^\d{1,3}\.\d{1,3}$", ip):
raise ValueError("Invalid IP address")
if not 0 < port < 65536:
raise ValueError("Invalid port number")
self.ip = ip
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.MainFlag = True
self.Lock = threading.Lock()
self.RecieveList = []
self.SendList = []
self.Status = 0
def add_to_sendlist(self, message:str) -> None:
self.Lock.acquire()
self.SendList.append(message)
self.Lock.release()
def pop_from_recievelist(self) -> list:
self.Lock.acquire()
if len(self.RecieveList) > 0:
result = self.RecieveList.pop(0)
else:
result = []
self.Lock.release()
return result
def _send_message(self, message:str, client_socket:socket.socket) -> None:
client_socket.send(message.encode('utf-8'))
def _receive_message(self, client_socket:socket.socket) -> str:
return client_socket.recv(1024).decode('utf-8')
def _recieve_message_handler(self, client_socket:socket.socket, client_address:tuple) -> None:
while self.MainFlag:
try:
message = self._receive_message(client_socket)
except ConnectionResetError:
self.Stop()
break
tokens = re.findall(r'<.+?>', message)
for token in tokens:
results = re.findall(token + r'.*?</' + token[1:], message)
for result in results:
self.Lock.acquire()
self.RecieveList.append([client_address, token[1:-1], token[1:-1], result[len(token):-len(token)-1]])
self.Lock.release()
def _send_message_handler(self, client_socket:socket.socket, client_address:tuple) -> None:
while self.MainFlag:
if len(self.SendList) > 0:
self.Lock.acquire()
message = self.SendList.pop(0)
self.Lock.release()
self._send_message(message, client_socket)
else:
time.sleep(0.1)
def get_status(self) -> int:
return self.Status
def stop(self) -> None:
self.Status = -1
self.MainFlag = False
self.socket.close()
class SocketClient(_SocketFather):
def __init__(self, ip:str, port:int) -> None:
super().__init__(ip, port)
self.socket.connect((self.ip, self.port))
def run_client(self) -> None:
self.Status = 1
self._recieve_thread = threading.Thread(target=self._recieve_message_handler, args=(self.socket, (self.ip, self.port)))
self._recieve_thread.start()
self._send_thread = threading.Thread(target=self._send_message_handler, args=(self.socket, (self.ip, self.port)))
self._send_thread.start()
class SocketServer(_SocketFather):
def __init__(self, ip:str, port:int) -> None:
super().__init__(ip, port)
self.socket.bind((self.ip, self.port))
self.socket.listen()
def run_server(self) -> None:
self.Status = 1
while self.MainFlag:
client_socket, client_address = self.socket.accept()
self._recieve_thread = threading.Thread(target=self._recieve_message_handler, args=(client_socket, client_address))
self._recieve_thread.start()
self._send_thread = threading.Thread(target=self._send_message_handler, args=(client_socket, client_address))
self._send_thread.start()