-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchat_app.py
More file actions
145 lines (111 loc) · 4.81 KB
/
chat_app.py
File metadata and controls
145 lines (111 loc) · 4.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import os
import argparse
import threading
import rti.connextdds as dds
from posixpath import split
from time import sleep
from os import error, path as os_path
import inspect
lock = threading.RLock()
finished = False
def user_subscriber_task(user_reader: dds.DataReader):
global finished
status_condition = dds.StatusCondition(user_reader)
status_condition.enabled_statuses = dds.StatusMask.LIVELINESS_CHANGED
waitset = dds.WaitSet()
waitset += status_condition
while not finished:
active_conditions = waitset.wait(dds.Duration(0.5))
if status_condition in active_conditions:
for sample in user_reader.read():
if sample.info.state.instance_state == dds.InstanceState.NOT_ALIVE_NO_WRITERS and \
sample.info.state.sample_state == dds.SampleState.NOT_READ and \
not sample.info.valid: # instance "gone" and we haven't read this sample, and there's no data
data = user_reader.key_value(sample.info.instance_handle) # get the instance keys from the reader
print(f"#Dropped user \"{data['username']}\"")
def message_subscriber_task(message_reader: dds.DataReader):
global finished
def process_message(_):
with lock:
nonlocal message_reader
for data, info in message_reader.take():
if info.valid:
print(f"#New chat message from {data['fromUser']},\t Message: \"{data['message']}\"")
status_condition = dds.StatusCondition(message_reader)
status_condition.enabled_statuses = dds.StatusMask.DATA_AVAILABLE
status_condition.set_handler(process_message)
waitset = dds.WaitSet()
waitset += status_condition
while not finished:
waitset.dispatch(dds.Duration(0.5))
def command_task(user, message_writer: dds.DataWriter, user_reader: dds.DataReader):
global finished
while not finished:
command = input('Please enter command: ')
if command == 'exit' or command == 'quit':
finished = True
elif command == 'list':
with lock:
for data, info in user_reader.read():
if info.valid and info.state.instance_state == dds.InstanceState.ALIVE:
print(f"#Username: {data['username']},\t\tGroup: {data['group']}")
elif command.startswith("send "):
dest = command.split(maxsplit=2)
if len(dest) == 3:
msg = dds.DynamicData(chat_message_t)
msg['fromUser'] = user
msg['toUser'] = dest[1]
msg['toGroup'] = dest[1]
msg['message'] = dest[2]
with lock:
message_writer.write(msg)
else:
print('Invalid usage. Use "send user|group message"\n')
else:
print('Unknown command')
file_path = os_path.dirname(os_path.realpath(__file__))
parser = argparse.ArgumentParser(description='DDS Chat Application')
parser.add_argument('user', help='User Name', type=str)
parser.add_argument('group', help='Group Name', type=str)
parser.add_argument('-f', '--firstname', help='First Name', type=str, default='')
parser.add_argument('-l', '--lastname', help='Last Name', type=str, default='')
args = parser.parse_args()
os.environ['user'] = str(args.user)
os.environ['group'] = str(args.group)
config_name="Chat_ParticipantLibrary::ChatParticipant"
provider = dds.QosProvider(uri=os_path.join(file_path, 'Chat.xml'))
participant = provider.create_participant_from_config(config=config_name)
# Types
chat_user_t = provider.type('ChatUser')
chat_message_t = provider.type('ChatMessage')
# Writers
user_writer = dds.DynamicData.DataWriter(
participant.find_datawriter('ChatUserPublisher::ChatUser_Writer'))
message_writer = dds.DynamicData.DataWriter(
participant.find_datawriter('ChatMessagePublisher::ChatMessage_Writer'))
# Readers
user_reader = dds.DynamicData.DataReader(
participant.find_datareader('ChatUserSubscriber::ChatUser_Reader'))
message_reader = dds.DynamicData.DataReader(
participant.find_datareader('ChatMessageSubscriber::ChatMessage_Reader'))
# Register user
user = dds.DynamicData(chat_user_t)
user['group'] = args.group
user['username'] = args.user
if args.firstname:
user['firstName'] = args.firstname
if args.lastname:
user['lastName'] = args.lastname
hinst = user_writer.register_instance(user)
t1 = threading.Thread(target=command_task, args=(args.user, message_writer, user_reader,))
t1.start()
t2 = threading.Thread(target=message_subscriber_task, args=(message_reader,))
t2.start()
t3 = threading.Thread(target=user_subscriber_task, args=(user_reader, ))
t3.start()
user_writer.write(user)
t1.join()
t2.join()
t3.join()
# Unregister user
user_writer.unregister_instance(hinst)