Discussion:
WebSockets and Buffering
Davy Durham
2013-05-29 21:36:11 UTC
Permalink
Hi,
I'm somewhat versed in Tornado and have read through most of the relevant
document, but I'm not sure how to solve this particular problem.

I've created a "proxy" application which accepts websocket upgrade
requests. In my open() method, I create an IOStream to another server and
"fairy" the data to and fro. Essentially I'm wrapping another network
protocol over a websocket. And I need to do this for many simultaneous
connections en mass, asynchronously. It's working properly right now, but I
have a concern:

In my on_message() method, I call self.my_stream.write(data) to send data
received from a websocket to the other server. (I also go the other way
too, but I'll discuss later on if necessary.) Obviously, if there's the
network capacity between the web browser and my webserver is large but the
network capacity between my webserver and the other server is small, then
data may continue to arrive at a fast rate, but every time I call
self.my_stream.write(data) it will be forced to buffer that data. My
concern is that a resource exhaustion attach could be leveraged again my
webserver.

I see in the WebSocketHandler code that we don't add the socket back to the
ioloop for READING until on_message() returns. But, of course if I don't
return from on_message() until the my_stream's write buffer is back below a
threshold, then the ioloop won't be able to process other websocket's data.

I played with gen a bit, but wasn't exactly sure how that is supposed to
work. I had no success.

Here's a basic outline of the program in case someone can suggest how to
deal with this issue:

class WebSocketPassThru(tornado.websocket.WebSocketHandler):
def open(self):
# connect to remote server ...
self.my_stream = new IOStream(...)
...

def on_message(self, message):
...
self.my_stream.write(message)
...

def on_close(self)
...
self.my_stream.close()


So, basically, on_message() is going to be repeatedly called and
my_stream.write() is going to continue to buffer data which is a memory
problem if the data cannot be dumped to the network as fast as its arriving.

Thanks
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit https://groups.google.com/groups/opt_out.
Roey Berman
2013-05-29 23:22:14 UTC
Permalink
Hi.

AFAIK there's no accurate way of determining the write rate of an IOStream.

The only methods IOStream exposes that can be used to control the write
rate is to check writing() and to pass a callback to write().
IOStream.write()'s callback param is called when flushing the entire write
buffer.
IOStream doesn't expose the length of the write buffer, you could throttle
based on len(my_stream._write_buffer) but it's private so I guess it's not
recommended.

I think some improvements could be made to IOStream in this regard.
Some options that pop to my head:
1. IOStream.write()'s callback param could be called when a chunk is
flushed to to the OS buffer.
2. Expose the length of the write buffer.

Using gen won't help you either, your on_message method will return once
you yield and will be resumed once gen.Runner (internal) wakes it up.
Post by Davy Durham
Hi,
I'm somewhat versed in Tornado and have read through most of the
relevant document, but I'm not sure how to solve this particular problem.
I've created a "proxy" application which accepts websocket upgrade
requests. In my open() method, I create an IOStream to another server and
"fairy" the data to and fro. Essentially I'm wrapping another network
protocol over a websocket. And I need to do this for many simultaneous
connections en mass, asynchronously. It's working properly right now, but I
In my on_message() method, I call self.my_stream.write(data) to send data
received from a websocket to the other server. (I also go the other way
too, but I'll discuss later on if necessary.) Obviously, if there's the
network capacity between the web browser and my webserver is large but the
network capacity between my webserver and the other server is small, then
data may continue to arrive at a fast rate, but every time I call
self.my_stream.write(data) it will be forced to buffer that data. My
concern is that a resource exhaustion attach could be leveraged again my
webserver.
I see in the WebSocketHandler code that we don't add the socket back to
the ioloop for READING until on_message() returns. But, of course if I
don't return from on_message() until the my_stream's write buffer is back
below a threshold, then the ioloop won't be able to process other
websocket's data.
I played with gen a bit, but wasn't exactly sure how that is supposed to
work. I had no success.
Here's a basic outline of the program in case someone can suggest how to
# connect to remote server ...
self.my_stream = new IOStream(...)
...
...
self.my_stream.write(message)
...
def on_close(self)
...
self.my_stream.close()
So, basically, on_message() is going to be repeatedly called and
my_stream.write() is going to continue to buffer data which is a memory
problem if the data cannot be dumped to the network as fast as its arriving.
Thanks
--
You received this message because you are subscribed to the Google Groups
"Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/groups/opt_out.
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit https://groups.google.com/groups/opt_out.
Ben Darnell
2013-05-30 14:11:23 UTC
Permalink
Post by Roey Berman
Hi.
AFAIK there's no accurate way of determining the write rate of an IOStream.
The only methods IOStream exposes that can be used to control the write
rate is to check writing() and to pass a callback to write().
IOStream.write()'s callback param is called when flushing the entire write
buffer.
IOStream doesn't expose the length of the write buffer, you could throttle
based on len(my_stream._write_buffer) but it's private so I guess it's not
recommended.
I think some improvements could be made to IOStream in this regard.
1. IOStream.write()'s callback param could be called when a chunk is
flushed to to the OS buffer.
2. Expose the length of the write buffer.
IOStream.write's callback is sufficient for a simple form of flow control.
It's a little crude since it is all-or-nothing, but since it marks the
emptying of a second tier of buffer (when everything has been passed to the
OS socket buffer), you shouldn't be completely bottoming out and losing
efficiency. If we make some changes here it might be to introduce a
variable analogous to SO_SNDLOWAT, but I'm not sure how much that would
really help.

The real problem here is that WebSocketHandler gives you no way to control
*reads*. Even if you knew everything about the IOStream's write behavior,
you'd have to do something to prevent WebSocketHandler from listening for
reads on the socket (and then TCP backpressure will apply automatically).
We could do this either by adding a pair of pause_reading/resume_reading
methods on WebSocketHandler or perhaps by allowing on_message to be
asynchronous by returning a Future (in which case we wouldn't start
listening for the next message until that Future resolves).

-Ben
Post by Roey Berman
Using gen won't help you either, your on_message method will return once
you yield and will be resumed once gen.Runner (internal) wakes it up.
Post by Davy Durham
Hi,
I'm somewhat versed in Tornado and have read through most of the
relevant document, but I'm not sure how to solve this particular problem.
I've created a "proxy" application which accepts websocket upgrade
requests. In my open() method, I create an IOStream to another server and
"fairy" the data to and fro. Essentially I'm wrapping another network
protocol over a websocket. And I need to do this for many simultaneous
connections en mass, asynchronously. It's working properly right now, but I
In my on_message() method, I call self.my_stream.write(data) to send data
received from a websocket to the other server. (I also go the other way
too, but I'll discuss later on if necessary.) Obviously, if there's the
network capacity between the web browser and my webserver is large but the
network capacity between my webserver and the other server is small, then
data may continue to arrive at a fast rate, but every time I call
self.my_stream.write(data) it will be forced to buffer that data. My
concern is that a resource exhaustion attach could be leveraged again my
webserver.
I see in the WebSocketHandler code that we don't add the socket back to
the ioloop for READING until on_message() returns. But, of course if I
don't return from on_message() until the my_stream's write buffer is back
below a threshold, then the ioloop won't be able to process other
websocket's data.
I played with gen a bit, but wasn't exactly sure how that is supposed to
work. I had no success.
Here's a basic outline of the program in case someone can suggest how to
# connect to remote server ...
self.my_stream = new IOStream(...)
...
...
self.my_stream.write(message)
...
def on_close(self)
...
self.my_stream.close()
So, basically, on_message() is going to be repeatedly called and
my_stream.write() is going to continue to buffer data which is a memory
problem if the data cannot be dumped to the network as fast as its arriving.
Thanks
--
You received this message because you are subscribed to the Google Groups
"Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/groups/opt_out.
--
You received this message because you are subscribed to the Google Groups
"Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/groups/opt_out.
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit https://groups.google.com/groups/opt_out.
Davy Durham
2013-05-30 17:23:07 UTC
Permalink
Post by Ben Darnell
The real problem here is that WebSocketHandler gives you no way to control
*reads*. Even if you knew everything about the IOStream's write behavior,
you'd have to do something to prevent WebSocketHandler from listening for
reads on the socket (and then TCP backpressure will apply automatically).
We could do this either by adding a pair of pause_reading/resume_reading
methods on WebSocketHandler or perhaps by allowing on_message to be
asynchronous by returning a Future (in which case we wouldn't start
listening for the next message until that Future resolves).
-Ben
I was afraid that yielding wouldn't actually help.. I'm new to coroutines,
but I have a good understanding of how call stacks work and how that would
probably not be possible without stalling everything else.

What I might play with is adding a boolean return value to on_message()
that, when True, tells WebSocketProtocol not to turn around and call
_receive_frame().. and add a public method, resume_reading(), that would
simply call _receive_frame() if it isn't already in process. I suggest a
return value instead of a pause_reading() method, because I'm not sure how
to reliably make that work if a _receive_request() or any other part of
that chain of events is already in process. Allowing it to pause only on
returning of on_message() simplifies the implementation. Agree/Disagree?

---

I then have the same problem when data is going the other direction. I
don't see that WebSocketHandler.send_message() has a callback when
complete, but that looks trivial to implement by just passing a callback
through until we call self.stream.write(). Agree/Disagree?
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit https://groups.google.com/groups/opt_out.
Ben Darnell
2013-05-31 02:52:54 UTC
Permalink
Post by Davy Durham
Post by Ben Darnell
The real problem here is that WebSocketHandler gives you no way to
control *reads*. Even if you knew everything about the IOStream's write
behavior, you'd have to do something to prevent WebSocketHandler from
listening for reads on the socket (and then TCP backpressure will apply
automatically). We could do this either by adding a pair of
pause_reading/resume_reading methods on WebSocketHandler or perhaps by
allowing on_message to be asynchronous by returning a Future (in which case
we wouldn't start listening for the next message until that Future
resolves).
-Ben
I was afraid that yielding wouldn't actually help.. I'm new to coroutines,
but I have a good understanding of how call stacks work and how that would
probably not be possible without stalling everything else.
What I might play with is adding a boolean return value to on_message()
that, when True, tells WebSocketProtocol not to turn around and call
_receive_frame().. and add a public method, resume_reading(), that would
simply call _receive_frame() if it isn't already in process. I suggest a
return value instead of a pause_reading() method, because I'm not sure how
to reliably make that work if a _receive_request() or any other part of
that chain of events is already in process. Allowing it to pause only on
returning of on_message() simplifies the implementation. Agree/Disagree?
It's definitely simpler (at least with the current implementation) to
signal a pause via a return from on_message. Returning a bool is awkward
if on_message is most naturally expressed as a coroutine; returning a
Future works better with coroutines and avoids the asymmetry of pausing
with a return value and resuming with a method call, but it's a pain if
your on_message is not a coroutine.

If pause_reading guarantees that on_message will not be called until
reading is resumed, then it's difficult to implement as a separate method.
It would be easier to implement this method if we allowed for an in-flight
message to be delivered after the pause call. IOStream may have buffered
an unknown amount of data, so it might be useful to deliver this data to
the app as long as it can be done without doing any additional reads. On
the other hand, this may just push the problem to another level and require
additional buffering at the application layer.


I then have the same problem when data is going the other direction. I
Post by Davy Durham
don't see that WebSocketHandler.send_message() has a callback when
complete, but that looks trivial to implement by just passing a callback
through until we call self.stream.write(). Agree/Disagree?
Yes, this is straightforward. The only catch is that the IOStream only has
one write callback at a time, so if you write a second message before the
first's callback has run it will never be run.

-Ben
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit https://groups.google.com/groups/opt_out.
Davy Durham
2013-05-31 16:56:07 UTC
Permalink
Post by Davy Durham
I then have the same problem when data is going the other direction. I
Post by Davy Durham
don't see that WebSocketHandler.send_message() has a callback when
complete, but that looks trivial to implement by just passing a callback
through until we call self.stream.write(). Agree/Disagree?
Yes, this is straightforward. The only catch is that the IOStream only
has one write callback at a time, so if you write a second message before
the first's callback has run it will never be run.
So, I successfully added the callback to write_message(), but I have a
question. When I'm establishing the copy sequence //from// the IOStream
//to// the WebSocketHandler, I need to get a callback from the IOStream for
at least 1 byte, but preferably all bytes that are available so that I can
pass them to WebSocketHandler.write_message() (then get the callback to
read some more).

However, on IOStream, I only see read_bytes(), read_until(),
read_until_regex(), and read_until_closed()... All these take callbacks,
but how can I have a callback called when at least one byte is available
but with as much data as is available? I would memic the guts of
read_until_close(), but it's calling on all sorts of private stuff. Am I
going to have to implement a new reading method for this?

Thanks
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit https://groups.google.com/groups/opt_out.
Davy Durham
2013-06-01 00:25:07 UTC
Permalink
Post by Davy Durham
So, I successfully added the callback to write_message(), but I have a
question. When I'm establishing the copy sequence //from// the IOStream
//to// the WebSocketHandler, I need to get a callback from the IOStream for
at least 1 byte, but preferably all bytes that are available so that I can
pass them to WebSocketHandler.write_message() (then get the callback to
read some more).
However, on IOStream, I only see read_bytes(), read_until(),
read_until_regex(), and read_until_closed()... All these take callbacks,
but how can I have a callback called when at least one byte is available
but with as much data as is available? I would memic the guts of
read_until_close(), but it's calling on all sorts of private stuff. Am I
going to have to implement a new reading method for this?
Thanks
This seems to work. Do you see any potential problems with it?

def establish_copy_loop_from_stream_to_websocket(webSocketHandler,
clear_stream):
def read_callback(data):
assert(data == b'')

if clear_stream.closed() and not webSocketHandler.is_closed:
# TODO? does doing this now prevent data perhaps
still buffered in the underlying stream from being flushed?
webSocketHandler.close()
else:
read_more()

def read_streaming_callback(data):
log.debug("data arrived from stream: [%d] data (%d)[%s]" %
(webSocketHandler.logging_fileno, len(data), str(data[:256])))
webSocketHandler.write_message(data, True)

def read_more():
log.debug("read_more")
assert(not clear_stream.reading())
# 4096 is a number I made up..it works with even low
numbers (but is of course less efficient)
clear_stream.read_bytes(4096, read_callback,
read_streaming_callback)

# start the process
read_more()
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit https://groups.google.com/groups/opt_out.
Ben Darnell
2013-06-01 21:30:04 UTC
Permalink
Post by Davy Durham
So, I successfully added the callback to write_message(), but I have a
Post by Davy Durham
question. When I'm establishing the copy sequence //from// the IOStream
//to// the WebSocketHandler, I need to get a callback from the IOStream for
at least 1 byte, but preferably all bytes that are available so that I can
pass them to WebSocketHandler.write_**message() (then get the callback
to read some more).
However, on IOStream, I only see read_bytes(), read_until(),
read_until_regex(), and read_until_closed()... All these take callbacks,
but how can I have a callback called when at least one byte is available
but with as much data as is available? I would memic the guts of
read_until_close(), but it's calling on all sorts of private stuff. Am I
going to have to implement a new reading method for this?
In general we use read_until_close with a streaming callback when you want
data as it comes in, but this doesn't provide any means of flow control.
I've been reluctant to add a separate read_something() method because it's
redundant, but this is a case that can't be handled cleanly with the
existing methods. We either need a way to read one chunk at a time or a
way to cancel an existing read.
Post by Davy Durham
This seems to work. Do you see any potential problems with it?
def establish_copy_loop_from_stream_to_websocket(webSocketHandler,
assert(data == b'')
if clear_stream.closed() and not
This condition should be split up: if both streams are closed you don't
want to go into read_more.
Post by Davy Durham
# TODO? does doing this now prevent data perhaps
still buffered in the underlying stream from being flushed?
Possibly. Closing an IOStream discards any remaining data in the write
buffer. Closing a websocket connection will attempt to do a clean shutdown
including delivering any remaining messages, but the current implementation
uses a 5 second timeout: if we don't get an acknowledgement of the close
within 5 seconds, the remaining data will be dropped.
Post by Davy Durham
webSocketHandler.close()
read_more()
log.debug("data arrived from stream: [%d] data (%d)[%s]" %
(webSocketHandler.logging_fileno, len(data), str(data[:256])))
webSocketHandler.write_message(data, True)
log.debug("read_more")
assert(not clear_stream.reading())
# 4096 is a number I made up..it works with even low
numbers (but is of course less efficient)
clear_stream.read_bytes(4096, read_callback,
read_streaming_callback)
# start the process
read_more()
This will generally work, but it's not doing any kind of flow control, so
it is essentially equivalent to using read_until_close instead of
read_bytes. To do flow control you'd have to use write callbacks and keep
track of the number of outstanding messages and potentially delay the
read_more call in the next read_callback.

-Ben
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit https://groups.google.com/groups/opt_out.
Davy Durham
2013-06-03 03:49:25 UTC
Permalink
Post by Ben Darnell
This will generally work, but it's not doing any kind of flow control, so
it is essentially equivalent to using read_until_close instead of
read_bytes. To do flow control you'd have to use write callbacks and keep
track of the number of outstanding messages and potentially delay the
read_more call in the next read_callback.
I think it would actually do flow control because each read_bytes() is only
allowed to read 4096 bytes, and we won't call read_bytes() again until we
get the write-completed callback, but it will deliver smaller amounts than
4096 if they come in the meantime. Did i miss something?
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit https://groups.google.com/groups/opt_out.
Ben Darnell
2013-06-03 13:14:18 UTC
Permalink
Post by Davy Durham
Post by Ben Darnell
This will generally work, but it's not doing any kind of flow control, so
it is essentially equivalent to using read_until_close instead of
read_bytes. To do flow control you'd have to use write callbacks and keep
track of the number of outstanding messages and potentially delay the
read_more call in the next read_callback.
I think it would actually do flow control because each read_bytes() is
only allowed to read 4096 bytes, and we won't call read_bytes() again until
we get the write-completed callback, but it will deliver smaller amounts
than 4096 if they come in the meantime. Did i miss something?
There's no write callback in the code you posted, it just starts the read
again once the initial 4096-byte read finishes. If you added a write
callback and only rescheduled the read when both the 4k read and the last
outstanding write callback were finished, that should work.

-Ben
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit https://groups.google.com/groups/opt_out.
Davy Durham
2013-06-03 17:47:04 UTC
Permalink
Oops.. you're right. It got lost in my multiple iterations.. :) I
implemented it into write_message(), but I'll have to re-work that into the
solution to actually use it.
Post by Ben Darnell
Post by Davy Durham
Post by Ben Darnell
This will generally work, but it's not doing any kind of flow control,
so it is essentially equivalent to using read_until_close instead of
read_bytes. To do flow control you'd have to use write callbacks and keep
track of the number of outstanding messages and potentially delay the
read_more call in the next read_callback.
I think it would actually do flow control because each read_bytes() is
only allowed to read 4096 bytes, and we won't call read_bytes() again until
we get the write-completed callback, but it will deliver smaller amounts
than 4096 if they come in the meantime. Did i miss something?
There's no write callback in the code you posted, it just starts the read
again once the initial 4096-byte read finishes. If you added a write
callback and only rescheduled the read when both the 4k read and the last
outstanding write callback were finished, that should work.
-Ben
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit https://groups.google.com/groups/opt_out.
Davy Durham
2013-06-04 16:30:17 UTC
Permalink
So here's a finished version (except for that close() issue) with the
write_callback done correctly... for anyone's reference.

def establish_copy_loop_from_stream_to_websocket(webSocketHandler,
clear_stream):
def read_callback(data):
assert(data == b'')
if clear_stream.closed() and not webSocketHandler.is_closed:
# TODO? does this prevent unflushed data from being
sent?
webSocketHandler.close()

def read_streaming_callback(data):
webSocketHandler.write_message(data, True, write_callback)

def write_callback():
if not clear_stream.reading():
read_more();

def read_more():
assert(not clear_stream.reading())
clear_stream.read_bytes(4096, read_callback,
read_streaming_callback)

read_more()
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit https://groups.google.com/groups/opt_out.
Ben Darnell
2013-06-05 13:37:32 UTC
Permalink
I don't think you're guaranteed to get read_callback before write_callback,
so I would add "if not clear_stream.writing(): read_more()" in
read_callback. You might need to set your own status flags to keep track
of whether you have an outstanding callback - the reading and writing bits
are cleared when the callbacks are scheduled, not when they are run, so you
might need to guard against things happening in various orders.

-Ben
Post by Davy Durham
So here's a finished version (except for that close() issue) with the
write_callback done correctly... for anyone's reference.
def establish_copy_loop_from_stream_to_websocket(webSocketHandler,
assert(data == b'')
if clear_stream.closed() and not
# TODO? does this prevent unflushed data from
being sent?
webSocketHandler.close()
webSocketHandler.write_message(data, True, write_callback)
read_more();
assert(not clear_stream.reading())
clear_stream.read_bytes(4096, read_callback,
read_streaming_callback)
read_more()
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit https://groups.google.com/groups/opt_out.
Davy Durham
2013-06-05 21:05:13 UTC
Permalink
Post by Ben Darnell
I don't think you're guaranteed to get read_callback before
write_callback, so I would add "if not clear_stream.writing(): read_more()"
in read_callback. You might need to set your own status flags to keep
track of whether you have an outstanding callback - the reading and writing
bits are cleared when the callbacks are scheduled, not when they are run,
so you might need to guard against things happening in various orders.
-Ben
I see what you mean.. so for anyone still following along, here's an
updated version:

def establish_copy_loop_from_stream_to_websocket(web_socket_handler,
clear_stream):
# This function calls clear_stream.read_bytes() for 4k, writing all
those bytes
# to the web_socket_handler as soon as they are available, and
only calls
# clear_stream.read_bytes() after the total 4k has been written.
# This keeps us from overrunning the WebSocket side with more data
that it can handle.

# I can't necessarily trust the IOStream::reading()/writing()
methods to return
# the state correctly as of the time I actually obtain a callback.
So, I need to
# track that state myself and base it upon the receipt of callbacks.
clear_stream.my_is_reading = False
web_socket_handler.my_is_writing = False

def read_callback(data):
assert(data == b'')

clear_stream.my_is_reading = False
if clear_stream.closed():
if not web_socket_handler.is_closed:
# TODO? does this prevent unflushed data
from being sent?
web_socket_handler.close()

if not web_socket_handler.is_closed and not
web_socket_handler.my_is_writing:
read_more()

def read_streaming_callback(data):
is_reading_to_web_socket_handler = True
web_socket_handler.write_message(data, True, write_callback)

def write_callback():
web_socket_handler.my_is_writing = False
if not clear_stream.my_is_reading:
read_more()

def read_more():
assert(not clear_stream.my_is_reading)
clear_stream.my_is_reading = True
clear_stream.read_bytes(4096, read_callback,
read_streaming_callback)

# start the process
read_more()
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit https://groups.google.com/groups/opt_out.
Loading...