Discussion:
[tornado] Connecting HTTP Client body_producer to HTTP Request Handler data_received.
Shane Spencer
2018-08-28 05:10:47 UTC
Permalink
OK. I can't find hair nor hide of some past conversations with this.

I have a feeling I need to perform my http request in `prepare` and gain
early access to self.request.body in `prepare` with the intent of removing
it before moving on to the target method.

For now this is what I'm trying to hook up properly. Been all over github
looking for examples and I've found some pretty stellar ones.

@tornado.web.stream_request_body
class UpstreamHTTPHandler(tornado.web.RequestHandler):

def initialize(self, *args, **kwargs):

self.upstream_headers = tornado.httputil.HTTPHeaders()

self.upstream_body_queue = tornado.queues.Queue(maxsize=1)

self.request_body_queue = tornado.queues.Queue(maxsize=1)
self.request_body_buffer =
tempfile.SpooledTemporaryFile(mode='w+b', max_size=2e+6)

#self.request_body_read_bytes = 0 #use tell
self.request_body_content_length = None
self.request_body_cached = False

async def prepare(self, *args, **kwargs):

self.path_uri = self.path_kwargs.get('uri')
self.upstream_uri = f'{self.path_uri}?{self.request.query}'

if self.request.method in ('PATCH', 'POST', 'PUT'):
content_length = self.request.headers.get('Content-Length')
if not content_length:
raise tornado.web.HTTPError(405)

self.request_body_content_length = int(content_length, 10)

async def body_producer(self, write):

async for chunk in self.request_body_queue:

if chunk is QUEUE_FINISH:
break

print('sssss', time.time(), len(chunk),
self.request_body_buffer.tell(), self.request_body_queue.qsize())
write(chunk)

print('fin')

async def data_received(self, chunk):

print('rrrrr', time.time(), len(chunk),
self.request_body_buffer.tell(), self.request_body_queue.qsize())

if self.request_body_content_length is None:
return

await self.request_body_queue.put(chunk)
self.request_body_buffer.write(chunk)

if self.request_body_buffer.tell() >=
self.request_body_content_length:
self.request_body_queue.put(QUEUE_FINISH)
#self.request_body_buffer.rollover()

def header_callback(self, header_line):

print(textwrap.indent(header_line.strip(), 'header_callback: '))

if header_line.startswith("HTTP/"):
self.upstream_headers.clear()
return

if header_line == "\r\n":
return

self.upstream_headers.parse_line(header_line)

print(textwrap.indent(pprint.pformat(dict(self.upstream_headers)),
'all_headers: '))

def streaming_callback(self, chunk):
print(textwrap.indent(chunk.decode('utf-8'), 'streaming_callback:
'))
self.upstream_body_queue.put(chunk)

async def run(self,*args, **kwargs):

response = await tornado.httpclient.AsyncHTTPClient(
max_buffer_size=100000000,
max_body_size=1e+10, #10 GB
).fetch(
self.upstream_uri,
raise_error=False,
follow_redirects=True,
method=self.request.method,
allow_nonstandard_methods=True,
request_timeout=10000,
body_producer=self.body_producer,
header_callback=self.header_callback,
streaming_callback=self.streaming_callback,
)

print('response', response, self.request_body_buffer.name,
self.request_body_buffer._rolled)

self.finish()
self.request_body_buffer.close()

get = run
post = run

<https://about.me/ShaneSpencer?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api&utm_content=thumb>
Shane Spencer
about.me/ShaneSpencer
<https://about.me/ShaneSpencer?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api&utm_content=thumb>
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Shane Spencer
2018-08-28 13:08:09 UTC
Permalink
My brain refuses to understand placement of futures and contextual scopes.
This appears to work as Ben showcased in his gist
https://gist.github.com/bdarnell/5bb1bd04a443c4e06ccd

I added an event setter to data_received to delay body_producer and hope to
clean up how body_producer finishes this way soon ... without a POST/PUT
content-length.

I moved the fetch future to prepare.

@tornado.web.stream_request_body
class UpstreamHTTPHandler(tornado.web.RequestHandler):

def initialize(self, *args, **kwargs):

self.upstream_headers = tornado.httputil.HTTPHeaders()

self.upstream_body_queue = tornado.queues.Queue(maxsize=1)

self.request_body_event = tornado.locks.Event()
self.request_body_queue = tornado.queues.Queue(maxsize=1)
self.request_body_buffer =
tempfile.SpooledTemporaryFile(mode='w+b', max_size=2e+6)

#self.request_body_read_bytes = 0 #use tell
self.request_body_content_length = None
self.request_body_cached = False

async def prepare(self, *args, **kwargs):

print('wut')

self.path_uri = self.path_kwargs.get('uri')
self.upstream_uri = f'{self.path_uri}?{self.request.query}'

if self.request.method in ('PATCH', 'POST', 'PUT'):
content_length = self.request.headers.get('Content-Length')
if not content_length:
raise tornado.web.HTTPError(405)

self.request_body_content_length = int(content_length, 10)

self.request_future = tornado.httpclient.AsyncHTTPClient(
max_body_size=1e+10, #10 GB
).fetch(
self.upstream_uri,
raise_error=False,
follow_redirects=False,
method=self.request.method,
allow_nonstandard_methods=True,
request_timeout=10000,
body_producer=self.body_producer,
header_callback=self.header_callback,
streaming_callback=self.streaming_callback,
)

async def body_producer(self, write):

await self.request_body_event.wait()

async for chunk in self.request_body_queue:

if chunk is QUEUE_FINISH:
break

print('sssss', time.time(), len(chunk),
hashlib.md5(chunk).hexdigest(), self.request_body_buffer.tell(),
self.request_body_queue.qsize())
write(chunk)

print('fin')
return

async def data_received(self, chunk):

print('rrrrr', time.time(), len(chunk),
hashlib.md5(chunk).hexdigest(), self.request_body_buffer.tell(),
self.request_body_queue.qsize())

if self.request_body_content_length is None:
return

await self.request_body_queue.put(chunk)
self.request_body_buffer.write(chunk)

self.request_body_event.set()

if self.request_body_buffer.tell() >=
self.request_body_content_length:
self.request_body_queue.put(QUEUE_FINISH)
#self.request_body_buffer.rollover()

def header_callback(self, header_line):

print(textwrap.indent(header_line.strip(), 'header_callback: '))

if header_line.startswith("HTTP/"):
self.upstream_headers.clear()
return

if header_line == "\r\n":
return

self.upstream_headers.parse_line(header_line)

print(textwrap.indent(pprint.pformat(dict(self.upstream_headers)),
'all_headers: '))

async def streaming_callback(self, chunk):
print(textwrap.indent(chunk.decode('utf-8'), 'streaming_callback:
'))
await self.upstream_body_queue.put(chunk)

async def run(self,*args, **kwargs):

print('wat')

response = await self.request_future

print('response', response, self.request_body_buffer.name,
self.request_body_buffer._rolled)

self.finish()
self.request_body_buffer.close()

get = run
post = run

<https://about.me/ShaneSpencer?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api&utm_content=thumb>
Shane Spencer
about.me/ShaneSpencer
<https://about.me/ShaneSpencer?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api&utm_content=thumb>
Post by Shane Spencer
OK. I can't find hair nor hide of some past conversations with this.
I have a feeling I need to perform my http request in `prepare` and gain
early access to self.request.body in `prepare` with the intent of removing
it before moving on to the target method.
For now this is what I'm trying to hook up properly. Been all over github
looking for examples and I've found some pretty stellar ones.
@tornado.web.stream_request_body
self.upstream_headers = tornado.httputil.HTTPHeaders()
self.upstream_body_queue = tornado.queues.Queue(maxsize=1)
self.request_body_queue = tornado.queues.Queue(maxsize=1)
self.request_body_buffer =
tempfile.SpooledTemporaryFile(mode='w+b', max_size=2e+6)
#self.request_body_read_bytes = 0 #use tell
self.request_body_content_length = None
self.request_body_cached = False
self.path_uri = self.path_kwargs.get('uri')
self.upstream_uri = f'{self.path_uri}?{self.request.query}'
content_length = self.request.headers.get('Content-Length')
raise tornado.web.HTTPError(405)
self.request_body_content_length = int(content_length, 10)
break
print('sssss', time.time(), len(chunk),
self.request_body_buffer.tell(), self.request_body_queue.qsize())
write(chunk)
print('fin')
print('rrrrr', time.time(), len(chunk),
self.request_body_buffer.tell(), self.request_body_queue.qsize())
return
await self.request_body_queue.put(chunk)
self.request_body_buffer.write(chunk)
if self.request_body_buffer.tell() >=
self.request_body_queue.put(QUEUE_FINISH)
#self.request_body_buffer.rollover()
print(textwrap.indent(header_line.strip(), 'header_callback: '))
self.upstream_headers.clear()
return
return
self.upstream_headers.parse_line(header_line)
print(textwrap.indent(pprint.pformat(dict(self.upstream_headers)),
'all_headers: '))
'))
self.upstream_body_queue.put(chunk)
response = await tornado.httpclient.AsyncHTTPClient(
max_buffer_size=100000000,
max_body_size=1e+10, #10 GB
).fetch(
self.upstream_uri,
raise_error=False,
follow_redirects=True,
method=self.request.method,
allow_nonstandard_methods=True,
request_timeout=10000,
body_producer=self.body_producer,
header_callback=self.header_callback,
streaming_callback=self.streaming_callback,
)
print('response', response, self.request_body_buffer.name,
self.request_body_buffer._rolled)
self.finish()
self.request_body_buffer.close()
get = run
post = run
<https://about.me/ShaneSpencer?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api&utm_content=thumb>
Shane Spencer
about.me/ShaneSpencer
<https://about.me/ShaneSpencer?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api&utm_content=thumb>
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Ben Darnell
2018-09-08 22:32:24 UTC
Permalink
Post by Shane Spencer
My brain refuses to understand placement of futures and contextual
scopes. This appears to work as Ben showcased in his gist
https://gist.github.com/bdarnell/5bb1bd04a443c4e06ccd
I added an event setter to data_received to delay body_producer and hope
to clean up how body_producer finishes this way soon ... without a POST/PUT
content-length.
The key to avoid relying on content-length is to write the QUEUE_FINISH
marker to the queue in post() and put() (and on_connection_close()), which
are guaranteed to come after the last data_received(). You'll just have to
override those separately; there's not a one-stop hook for it).

I don't think the event setter is currently doing anything for you; it's
set at the same time that the first chunk is added to the queue.

-Ben
Post by Shane Spencer
I moved the fetch future to prepare.
@tornado.web.stream_request_body
self.upstream_headers = tornado.httputil.HTTPHeaders()
self.upstream_body_queue = tornado.queues.Queue(maxsize=1)
self.request_body_event = tornado.locks.Event()
self.request_body_queue = tornado.queues.Queue(maxsize=1)
self.request_body_buffer =
tempfile.SpooledTemporaryFile(mode='w+b', max_size=2e+6)
#self.request_body_read_bytes = 0 #use tell
self.request_body_content_length = None
self.request_body_cached = False
print('wut')
self.path_uri = self.path_kwargs.get('uri')
self.upstream_uri = f'{self.path_uri}?{self.request.query}'
content_length = self.request.headers.get('Content-Length')
raise tornado.web.HTTPError(405)
self.request_body_content_length = int(content_length, 10)
self.request_future = tornado.httpclient.AsyncHTTPClient(
max_body_size=1e+10, #10 GB
).fetch(
self.upstream_uri,
raise_error=False,
follow_redirects=False,
method=self.request.method,
allow_nonstandard_methods=True,
request_timeout=10000,
body_producer=self.body_producer,
header_callback=self.header_callback,
streaming_callback=self.streaming_callback,
)
await self.request_body_event.wait()
break
print('sssss', time.time(), len(chunk),
hashlib.md5(chunk).hexdigest(), self.request_body_buffer.tell(),
self.request_body_queue.qsize())
write(chunk)
print('fin')
return
print('rrrrr', time.time(), len(chunk),
hashlib.md5(chunk).hexdigest(), self.request_body_buffer.tell(),
self.request_body_queue.qsize())
return
await self.request_body_queue.put(chunk)
self.request_body_buffer.write(chunk)
self.request_body_event.set()
if self.request_body_buffer.tell() >=
self.request_body_queue.put(QUEUE_FINISH)
#self.request_body_buffer.rollover()
print(textwrap.indent(header_line.strip(), 'header_callback: '))
self.upstream_headers.clear()
return
return
self.upstream_headers.parse_line(header_line)
print(textwrap.indent(pprint.pformat(dict(self.upstream_headers)),
'all_headers: '))
'))
await self.upstream_body_queue.put(chunk)
print('wat')
response = await self.request_future
print('response', response, self.request_body_buffer.name,
self.request_body_buffer._rolled)
self.finish()
self.request_body_buffer.close()
get = run
post = run
<https://about.me/ShaneSpencer?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api&utm_content=thumb>
Shane Spencer
about.me/ShaneSpencer
<https://about.me/ShaneSpencer?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api&utm_content=thumb>
Post by Shane Spencer
OK. I can't find hair nor hide of some past conversations with this.
I have a feeling I need to perform my http request in `prepare` and gain
early access to self.request.body in `prepare` with the intent of removing
it before moving on to the target method.
For now this is what I'm trying to hook up properly. Been all over
github looking for examples and I've found some pretty stellar ones.
@tornado.web.stream_request_body
self.upstream_headers = tornado.httputil.HTTPHeaders()
self.upstream_body_queue = tornado.queues.Queue(maxsize=1)
self.request_body_queue = tornado.queues.Queue(maxsize=1)
self.request_body_buffer =
tempfile.SpooledTemporaryFile(mode='w+b', max_size=2e+6)
#self.request_body_read_bytes = 0 #use tell
self.request_body_content_length = None
self.request_body_cached = False
self.path_uri = self.path_kwargs.get('uri')
self.upstream_uri = f'{self.path_uri}?{self.request.query}'
content_length = self.request.headers.get('Content-Length')
raise tornado.web.HTTPError(405)
self.request_body_content_length = int(content_length, 10)
break
print('sssss', time.time(), len(chunk),
self.request_body_buffer.tell(), self.request_body_queue.qsize())
write(chunk)
print('fin')
print('rrrrr', time.time(), len(chunk),
self.request_body_buffer.tell(), self.request_body_queue.qsize())
return
await self.request_body_queue.put(chunk)
self.request_body_buffer.write(chunk)
if self.request_body_buffer.tell() >=
self.request_body_queue.put(QUEUE_FINISH)
#self.request_body_buffer.rollover()
print(textwrap.indent(header_line.strip(), 'header_callback: '))
self.upstream_headers.clear()
return
return
self.upstream_headers.parse_line(header_line)
print(textwrap.indent(pprint.pformat(dict(self.upstream_headers)),
'all_headers: '))
'))
self.upstream_body_queue.put(chunk)
response = await tornado.httpclient.AsyncHTTPClient(
max_buffer_size=100000000,
max_body_size=1e+10, #10 GB
).fetch(
self.upstream_uri,
raise_error=False,
follow_redirects=True,
method=self.request.method,
allow_nonstandard_methods=True,
request_timeout=10000,
body_producer=self.body_producer,
header_callback=self.header_callback,
streaming_callback=self.streaming_callback,
)
print('response', response, self.request_body_buffer.name,
self.request_body_buffer._rolled)
self.finish()
self.request_body_buffer.close()
get = run
post = run
<https://about.me/ShaneSpencer?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api&utm_content=thumb>
Shane Spencer
about.me/ShaneSpencer
<https://about.me/ShaneSpencer?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api&utm_content=thumb>
--
You received this message because you are subscribed to the Google Groups
"Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Loading...