Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 44 additions & 9 deletions st2reactor/tests/integration/test_sensor_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,16 @@ def setUpClass(cls):
def setUp(self):
super().setUp()
# pre-condition: Make sure there is no test pollution
# Delete any leftover queues from previous failed test runs
self.rabbit_client = self._get_client()
self._delete_sensor_watcher_amqp_queues(
queue_name="st2.sensor.watch.covfefe", rabbit_client=self.rabbit_client
)

# Verify queues are deleted
sw_queues = self._get_sensor_watcher_amqp_queues(
queue_name="st2.sensor.watch.covfefe"
queue_name="st2.sensor.watch.covfefe", rabbit_client=self.rabbit_client
)
# TODO: Maybe just delete any leftover queues from previous failed test runs.
self.assertTrue(len(sw_queues) == 0)

def test_sensor_watch_queue_gets_deleted_on_stop(self):
Expand All @@ -55,30 +61,59 @@ def delete_handler(sensor_db):
)
sensor_watcher.start()
sw_queues = self._get_sensor_watcher_amqp_queues(
queue_name="st2.sensor.watch.covfefe"
queue_name="st2.sensor.watch.covfefe", rabbit_client=self.rabbit_client
)

start = monotonic()
done = False
while not done:
concurrency.sleep(0.01)
sw_queues = self._get_sensor_watcher_amqp_queues(
queue_name="st2.sensor.watch.covfefe"
queue_name="st2.sensor.watch.covfefe", rabbit_client=self.rabbit_client
)
done = len(sw_queues) > 0 or ((monotonic() - start) < 5)

sensor_watcher.stop()
sw_queues = self._get_sensor_watcher_amqp_queues(
queue_name="st2.sensor.watch.covfefe"
queue_name="st2.sensor.watch.covfefe", rabbit_client=self.rabbit_client
)
self.assertTrue(len(sw_queues) == 0)

@staticmethod
def _list_amqp_queues():
def _get_client(self):
rabbit_client = Client("localhost:15672", "guest", "guest")
return rabbit_client

def _list_amqp_queues(self, rabbit_client):
"""
:param rabbit_client: Client
"""
queues = [q["name"] for q in rabbit_client.get_queues()]
return queues

def _get_sensor_watcher_amqp_queues(self, queue_name):
all_queues = self._list_amqp_queues()
def _get_sensor_watcher_amqp_queues(self, queue_name, rabbit_client):
"""
:param rabbit_client: Client
"""
all_queues = self._list_amqp_queues(rabbit_client)
return set([q_name for q_name in all_queues if queue_name in q_name])

def _delete_sensor_watcher_amqp_queues(self, queue_name, rabbit_client):
"""
Delete all queues containing the specified queue_name pattern.

:param string queue_name: Pattern to match in queue names
:param rabbit_client: Client
:returns: None
"""
# Get all queues matching the pattern
queues_to_delete = self._get_sensor_watcher_amqp_queues(
queue_name=queue_name, rabbit_client=rabbit_client
)

if not queues_to_delete:
return

# Create a rabbit client and delete each queue
for queue in queues_to_delete:
# Use default vhost "/"
rabbit_client.delete_queue("/", queue)
Loading