Discussion:
[tornado] Async request processing with queues
Franz Weckesser
2018-07-05 21:35:21 UTC
Permalink
I have a service that normally runs well but can sometimes get backed up
depending on the complexity of the input. I've been given the requirement
to only let N number of requests back up before rejecting requests with a
503, service unavailable. I decided to use a tornado queue to store hold
the backlog and process the actual work asynchronously. I am curious if
anyone is aware of any significant flaws with this approach. Sample code
below. Any feedback appreciated.

from tornado.ioloop import IOLoop
from tornado.queues import QueueFull
from tornado.queues import Queue
from tornado import gen
from tornado import web
import random
import time

requestQueue = Queue(maxsize=10)

class MainHandler(web.RequestHandler):
@web.asynchronous
def get(self):
try:
requestQueue.put_nowait(self)
print("ENQDEPTH: " + str(requestQueue.qsize()))
except QueueFull:
IOLoop.instance().add_callback(responder, self, "Full\n", 503)
return

@gen.coroutine
def worker():
while True:
request = yield requestQueue.get()
print("DEQDEPTH: " + str(requestQueue.qsize()))
try:
# simulate slow request
time.sleep(random.randint(5,15))
IOLoop.instance().add_callback(responder, request, "Done\n",
200)
yield gen.sleep(0.01)
finally:
requestQueue.task_done()

@gen.coroutine
def responder(req, text, code):
req.write(text)
req.set_status(code)
req.finish()
return

def make_app():
return web.Application([
(r"/", MainHandler),
])

@gen.coroutine
def main():
yield worker()

if __name__ == '__main__':
main()
app = make_app()
app.listen(8888)
IOLoop.current().start()
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Pierce Lopez
2018-07-05 21:58:11 UTC
Permalink
Post by Franz Weckesser
# simulate slow request
time.sleep(random.randint(5,15))
Is your real application's work synchronous, like this time.sleep? If so,
then this won't work. No other/new requests can be serviced at all while
this is running: no put_nowait(), no returning a 503 response, nothing.

You'll want to use a ThreadPoolExecutor to run sychronous work on a
separate thread:
http://www.tornadoweb.org/en/stable/faq.html#why-isn-t-this-example-with-time-sleep-running-in-parallel

- Pierce
Post by Franz Weckesser
I have a service that normally runs well but can sometimes get backed up
depending on the complexity of the input. I've been given the requirement
to only let N number of requests back up before rejecting requests with a
503, service unavailable. I decided to use a tornado queue to store hold
the backlog and process the actual work asynchronously. I am curious if
anyone is aware of any significant flaws with this approach. Sample code
below. Any feedback appreciated.
from tornado.ioloop import IOLoop
from tornado.queues import QueueFull
from tornado.queues import Queue
from tornado import gen
from tornado import web
import random
import time
requestQueue = Queue(maxsize=10)
@web.asynchronous
requestQueue.put_nowait(self)
print("ENQDEPTH: " + str(requestQueue.qsize()))
IOLoop.instance().add_callback(responder, self, "Full\n", 503)
return
@gen.coroutine
request = yield requestQueue.get()
print("DEQDEPTH: " + str(requestQueue.qsize()))
# simulate slow request
time.sleep(random.randint(5,15))
IOLoop.instance().add_callback(responder, request, "Done\n",
200)
yield gen.sleep(0.01)
requestQueue.task_done()
@gen.coroutine
req.write(text)
req.set_status(code)
req.finish()
return
return web.Application([
(r"/", MainHandler),
])
@gen.coroutine
yield worker()
main()
app = make_app()
app.listen(8888)
IOLoop.current().start()
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Pierce Lopez
2018-07-05 22:02:55 UTC
Permalink
Post by Pierce Lopez
You'll want to use a ThreadPoolExecutor to run sychronous work on a
separate thread: http://www.tornadoweb.org/en/stable/faq.html#why-isn-t-
this-example-with-time-sleep-running-in-parallel
Also, you don't really need a queue, you just need a global counter, of
requests-in-progress. If over the limit, respond 503. Else, increment,
await theadpool execution, decrement, respond.
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Franz Weckesser
2018-07-05 22:08:06 UTC
Permalink
Thanks. I will look at that. It actually does work now, but does have to
wait for a given request to complete before put_nowait and 503s are
issued. This is acceptable for now, but i would prefer to avoid it. Give
the GIL and the lack of actual multiprocessing i assumed i would not do
better, but i will try the ThreadPoolExecutor.
Post by Pierce Lopez
Post by Franz Weckesser
# simulate slow request
time.sleep(random.randint(5,15))
Is your real application's work synchronous, like this time.sleep? If so,
then this won't work. No other/new requests can be serviced at all while
this is running: no put_nowait(), no returning a 503 response, nothing.
You'll want to use a ThreadPoolExecutor to run sychronous work on a
http://www.tornadoweb.org/en/stable/faq.html#why-isn-t-this-example-with-time-sleep-running-in-parallel
- Pierce
Post by Franz Weckesser
I have a service that normally runs well but can sometimes get backed up
depending on the complexity of the input. I've been given the requirement
to only let N number of requests back up before rejecting requests with a
503, service unavailable. I decided to use a tornado queue to store hold
the backlog and process the actual work asynchronously. I am curious if
anyone is aware of any significant flaws with this approach. Sample code
below. Any feedback appreciated.
from tornado.ioloop import IOLoop
from tornado.queues import QueueFull
from tornado.queues import Queue
from tornado import gen
from tornado import web
import random
import time
requestQueue = Queue(maxsize=10)
@web.asynchronous
requestQueue.put_nowait(self)
print("ENQDEPTH: " + str(requestQueue.qsize()))
IOLoop.instance().add_callback(responder, self, "Full\n", 503)
return
@gen.coroutine
request = yield requestQueue.get()
print("DEQDEPTH: " + str(requestQueue.qsize()))
# simulate slow request
time.sleep(random.randint(5,15))
IOLoop.instance().add_callback(responder, request, "Done\n",
200)
yield gen.sleep(0.01)
requestQueue.task_done()
@gen.coroutine
req.write(text)
req.set_status(code)
req.finish()
return
return web.Application([
(r"/", MainHandler),
])
@gen.coroutine
yield worker()
main()
app = make_app()
app.listen(8888)
IOLoop.current().start()
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Pierce Lopez
2018-07-05 22:17:40 UTC
Permalink
Post by Franz Weckesser
Thanks. I will look at that. It actually does work now, but does have to
wait for a given request to complete before put_nowait and 503s are
issued. This is acceptable for now, but i would prefer to avoid it. Give
the GIL and the lack of actual multiprocessing i assumed i would not do
better, but i will try the ThreadPoolExecutor.
That makes sense, it can do some work on 503 responses in between the
worker jobs. Still, it would be better if it could respond immediately, and
the GIL will not prevent that, the IOLoop just needs to get a small bit of
cpu time to receive new requests and send some 503 responses. (The GIL just
prevents the total amount of cpu work done in python from being more than a
single core could do.)
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Franz Weckesser
2018-07-06 04:06:36 UTC
Permalink
I took your suggestion and the result is much cleaner looking and does
respond immediately to the full condition. I used a non-blocking semaphore
for the counter and it works well. Thanks for your feedback.

from concurrent.futures import ThreadPoolExecutor
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
from tornado.web import Application
from tornado.ioloop import IOLoop
from tornado.gen import coroutine
from threading import Semaphore
import random
import time

class MainHandler(RequestHandler):
executor = ThreadPoolExecutor(max_workers=4)
sem = Semaphore(16)

@coroutine
def get(self):
if self.sem.acquire(False):
res = yield self.worker()
self.write(res)
self.sem.release()
else:
self.write("Full\n")
self.set_status(503)

@run_on_executor
def worker(request):
time.sleep(random.randint(5,15))
return "Done\n"

def make_app():
return Application([
(r"/", MainHandler),
])

if __name__ == '__main__':
app = make_app()
app.listen(8888)
IOLoop.current().start()
Post by Pierce Lopez
Post by Franz Weckesser
Thanks. I will look at that. It actually does work now, but does have
to wait for a given request to complete before put_nowait and 503s are
issued. This is acceptable for now, but i would prefer to avoid it. Give
the GIL and the lack of actual multiprocessing i assumed i would not do
better, but i will try the ThreadPoolExecutor.
That makes sense, it can do some work on 503 responses in between the
worker jobs. Still, it would be better if it could respond immediately, and
the GIL will not prevent that, the IOLoop just needs to get a small bit of
cpu time to receive new requests and send some 503 responses. (The GIL just
prevents the total amount of cpu work done in python from being more than a
single core could do.)
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Loading...