-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_beanstalkd.py
More file actions
48 lines (37 loc) · 1.27 KB
/
test_beanstalkd.py
File metadata and controls
48 lines (37 loc) · 1.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import subprocess
import beanstalkc
import utils
class BeanstalkdTester(utils.QueueTester):
server_process = None
def start_server(self):
self.server_process = subprocess.Popen(['./servers/beanstalkd'])
def stop_server(self):
if not self.server_process: return
self.server_process.terminate()
status = self.server_process.wait()
if status != -15:
logger.warn("beanstalkd exited with %d", status)
def connect(self,queues_to_watch=None):
self.beanstalk = beanstalkc.Connection()
self.last_queue = None
if queues_to_watch:
for q in queues_to_watch:
self.beanstalk.watch(q)
def send(self, queue, message):
if self.last_queue != queue:
self.beanstalk.use(queue)
self.last_queue = queue
self.beanstalk.put(queue+":"+message)
class Job(object):
def __init__(self, job):
self.job = job
self.queue,self.body = job.body.split(':',1)
def done(self):
self.job.delete()
def recv(self, timeout=0):
job = self.beanstalk.reserve(timeout=timeout)
if not job:
return None
return self.Job(job)
if __name__ == '__main__':
BeanstalkdTester.main()