Discussion:
[tornado] Can't proxy RFB stream into websockets using Tornado's WebSocketHandler
Pavel
2018-07-06 12:11:01 UTC
Permalink
I'm using Tornado to proxy RFB stream from regular TCP socket into
WebSocket.

That's how I do it:



def initialize(self,executor):
super().initialize(executor)
url = urlparse(opts.url)
username = opts.username
password = opts.password
if ':' in url.netloc:
host, port = url.netloc.split(':')
else:
host = url.netloc
port = 80 #TODO: AS FOR NOW ONLY HTTP IS SUPPORTED

self.host = host
self.port = int(port)
self.auth_token = base64.encodebytes('{0}:{1}'.format
(username,
password).encode())




@tornado.web.asynchronous
def open(self):
'''
This method proxies WebSocket calls to XenServer
'''
self.sock = socket.create_connection((self.host, self.port))
self.sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY,1)
self.sock.setblocking(0)
self.halt = False
self.translate = False
self.key=None


uri = self.request.uri
lines =[
'CONNECT {0} HTTP/1.1'.format(uri), #HTTP 1.1 creates Keep-alive connection
'Host: {0}'.format(self.host),
# 'Authorization: Basic {0}'.format(self.auth_token),
]
self.sock.send('\r\n'.join(lines).encode())
self.sock.send(b'\r\nAuthorization: Basic ' + self.auth_token)
self.sock.send(b'\r\n\r\n')
tornado.ioloop.IOLoop.current().run_in_executor(self.executor, self.server_reading)




def on_message(self, message):
assert(isinstance(message, bytes))
self.sock.send(message)

def select_subprotocol(self, subprotocols):
print("Select subprotocol!", subprotocols)
return subprotocols[0]


def server_reading(self):
try:
http_header_read = False
while self.halt is False:

ready_to_read, ready_to_write, in_error = select.select([self.sock], [], [])
if self.sock in ready_to_read:
data = self.sock.recv(1024)
if not http_header_read:
http_header_read = True
data = data[78:]

self.write_message(data, binary=True)

except:
if self.halt is False:
traceback.print_exc()
else:
pass
self.sock.close()


def on_close(self):
self.halt = True
try:
self.sock.send(b'close\n')
except:
pass
finally:
self.sock.close()

RFB is remote framebuffer, a binary protocol. The source is accessed via HTTP CONNECT method.

The problem is that I encounter the following problem with WebSocket from time to time when scrolling

a remote framebuffer window:
File "/home/pasha/vmemperor/vmemperor.py", line 1510, in server_reading
self.write_message(data, binary=True)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/websocket.py", line 256, in write_message
return self.ws_connection.write_message(message, binary=binary)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/websocket.py", line 801, in write_message
fut = self._write_frame(True, opcode, message, flags=flags)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/websocket.py", line 780, in _write_frame
return self.stream.write(frame)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/iostream.py", line 540, in write
self._handle_write()
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/iostream.py", line 1009, in _handle_write
self._write_buffer.advance(num_bytes)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/iostream.py", line 183, in advance
assert 0 < size <= self._size
AssertionError
And WebSocket connection closes.
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Ben Darnell
2018-07-06 16:40:06 UTC
Permalink
`self.write_message` may only be called from the IOLoop thread. You're
calling it from `server_reading`, which is run from the executor's thread.
You need to refactor things so that data is passed back to the IOLoop
thread to be written to the websocket.

-Ben
Post by Pavel
I'm using Tornado to proxy RFB stream from regular TCP socket into
WebSocket.
super().initialize(executor)
url = urlparse(opts.url)
username = opts.username
password = opts.password
host, port = url.netloc.split(':')
host = url.netloc
port = 80 #TODO: AS FOR NOW ONLY HTTP IS SUPPORTED
self.host = host
self.port = int(port)
self.auth_token = base64.encodebytes('{0}:{1}'.format
(username,
password).encode())
@tornado.web.asynchronous
'''
This method proxies WebSocket calls to XenServer
'''
self.sock = socket.create_connection((self.host, self.port))
self.sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY,1)
self.sock.setblocking(0)
self.halt = False
self.translate = False
self.key=None
uri = self.request.uri
lines =[
'CONNECT {0} HTTP/1.1'.format(uri), #HTTP 1.1 creates Keep-alive connection
'Host: {0}'.format(self.host),
# 'Authorization: Basic {0}'.format(self.auth_token),
]
self.sock.send('\r\n'.join(lines).encode())
self.sock.send(b'\r\nAuthorization: Basic ' + self.auth_token)
self.sock.send(b'\r\n\r\n')
tornado.ioloop.IOLoop.current().run_in_executor(self.executor, self.server_reading)
assert(isinstance(message, bytes))
self.sock.send(message)
print("Select subprotocol!", subprotocols)
return subprotocols[0]
http_header_read = False
ready_to_read, ready_to_write, in_error = select.select([self.sock], [], [])
data = self.sock.recv(1024)
http_header_read = True
data = data[78:]
self.write_message(data, binary=True)
traceback.print_exc()
pass
self.sock.close()
self.halt = True
self.sock.send(b'close\n')
pass
self.sock.close()
RFB is remote framebuffer, a binary protocol. The source is accessed via HTTP CONNECT method.
The problem is that I encounter the following problem with WebSocket from time to time when scrolling
File "/home/pasha/vmemperor/vmemperor.py", line 1510, in server_reading
self.write_message(data, binary=True)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/websocket.py", line 256, in write_message
return self.ws_connection.write_message(message, binary=binary)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/websocket.py", line 801, in write_message
fut = self._write_frame(True, opcode, message, flags=flags)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/websocket.py", line 780, in _write_frame
return self.stream.write(frame)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/iostream.py", line 540, in write
self._handle_write()
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/iostream.py", line 1009, in _handle_write
self._write_buffer.advance(num_bytes)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/iostream.py", line 183, in advance
assert 0 < size <= self._size
AssertionError
And WebSocket connection closes.
--
You received this message because you are subscribed to the Google Groups
"Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Pavel
2018-07-06 21:05:10 UTC
Permalink
OK. I was trying to use Queue like this:

tornado.ioloop.IOLoop.current().run_in_executor(self.executor, self.server_reading)
while True:
item = self.queue.get()
try:
self.write_message(item, binary=True)
finally:
self.queue.task_done()




def on_message(self, message):
assert(isinstance(message, bytes))
self.sock.send(message)

def select_subprotocol(self, subprotocols):
return 'binary'


def server_reading(self):
try:
http_header_read = False
while self.halt is False:
print("before select")
ready_to_read, ready_to_write, in_error = select.select([self.sock], [], [])
print("after select")
if self.sock in ready_to_read:
data = self.sock.recv(1024)
if not http_header_read:
http_header_read = True
data = data[78:]

self.queue.put(data)

except:
if self.halt is False:
traceback.print_exc()
else:
pass
self.sock.close()


It just hangs on select call - does not happen when I use
self.write_message directly... (with both tornado and python queue)
What I am doing wrong?
Post by Ben Darnell
`self.write_message` may only be called from the IOLoop thread. You're
calling it from `server_reading`, which is run from the executor's thread.
You need to refactor things so that data is passed back to the IOLoop
thread to be written to the websocket.
-Ben
Post by Pavel
I'm using Tornado to proxy RFB stream from regular TCP socket into
WebSocket.
super().initialize(executor)
url = urlparse(opts.url)
username = opts.username
password = opts.password
host, port = url.netloc.split(':')
host = url.netloc
port = 80 #TODO: AS FOR NOW ONLY HTTP IS SUPPORTED
self.host = host
self.port = int(port)
self.auth_token = base64.encodebytes('{0}:{1}'.format
(username,
password).encode())
@tornado.web.asynchronous
'''
This method proxies WebSocket calls to XenServer
'''
self.sock = socket.create_connection((self.host, self.port))
self.sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY,1)
self.sock.setblocking(0)
self.halt = False
self.translate = False
self.key=None
uri = self.request.uri
lines =[
'CONNECT {0} HTTP/1.1'.format(uri), #HTTP 1.1 creates Keep-alive connection
'Host: {0}'.format(self.host),
# 'Authorization: Basic {0}'.format(self.auth_token),
]
self.sock.send('\r\n'.join(lines).encode())
self.sock.send(b'\r\nAuthorization: Basic ' + self.auth_token)
self.sock.send(b'\r\n\r\n')
tornado.ioloop.IOLoop.current().run_in_executor(self.executor, self.server_reading)
assert(isinstance(message, bytes))
self.sock.send(message)
print("Select subprotocol!", subprotocols)
return subprotocols[0]
http_header_read = False
ready_to_read, ready_to_write, in_error = select.select([self.sock], [], [])
data = self.sock.recv(1024)
http_header_read = True
data = data[78:]
self.write_message(data, binary=True)
traceback.print_exc()
pass
self.sock.close()
self.halt = True
self.sock.send(b'close\n')
pass
self.sock.close()
RFB is remote framebuffer, a binary protocol. The source is accessed via HTTP CONNECT method.
The problem is that I encounter the following problem with WebSocket from time to time when scrolling
File "/home/pasha/vmemperor/vmemperor.py", line 1510, in server_reading
self.write_message(data, binary=True)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/websocket.py", line 256, in write_message
return self.ws_connection.write_message(message, binary=binary)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/websocket.py", line 801, in write_message
fut = self._write_frame(True, opcode, message, flags=flags)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/websocket.py", line 780, in _write_frame
return self.stream.write(frame)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/iostream.py", line 540, in write
self._handle_write()
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/iostream.py", line 1009, in _handle_write
self._write_buffer.advance(num_bytes)
File "/home/pasha/venv/vmemperor/lib/python3.5/site-packages/tornado/iostream.py", line 183, in advance
assert 0 < size <= self._size
AssertionError
And WebSocket connection closes.
--
You received this message because you are subscribed to the Google Groups
"Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Pierce Lopez
2018-07-06 21:44:05 UTC
Permalink
You can't use a tornado Queue from a separate thread either, that queue
uses the IOLoop.

One way to get stuff back to the IOLoop is to return some value which is
then the result of the Future returned by run_in_executor(). That's not
really what you're going for though. You could pass the thread a reference
to the original IOLoop and it could call add_callback() ("the only method
in IOLoop that makes this thread-safety guarantee"):
http://www.tornadoweb.org/en/stable/ioloop.html#tornado.ioloop.IOLoop.add_callback

Still, it's odd that your other thread is reading from `self.sock`, which
is also what the websocket class is reading/writing, I think ...

- Pierce
Post by Pavel
tornado.ioloop.IOLoop.current().run_in_executor(self.executor, self.server_reading)
item = self.queue.get()
self.write_message(item, binary=True)
self.queue.task_done()
assert(isinstance(message, bytes))
self.sock.send(message)
return 'binary'
http_header_read = False
print("before select")
ready_to_read, ready_to_write, in_error = select.select([self.sock], [], [])
print("after select")
data = self.sock.recv(1024)
http_header_read = True
data = data[78:]
self.queue.put(data)
traceback.print_exc()
pass
self.sock.close()
It just hangs on select call - does not happen when I use
self.write_message directly... (with both tornado and python queue)
What I am doing wrong?
Post by Ben Darnell
`self.write_message` may only be called from the IOLoop thread. You're
calling it from `server_reading`, which is run from the executor's thread.
You need to refactor things so that data is passed back to the IOLoop
thread to be written to the websocket.
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Ben Darnell
2018-07-08 00:24:29 UTC
Permalink
Post by Pierce Lopez
You can't use a tornado Queue from a separate thread either, that queue
uses the IOLoop.
Yeah, using a queue for this is messy because both gets and puts to the
queue have to happen on the IOLoop. The simplest solution (if you're
willing to use an unbounded queue) is to use

def put_to_queue_from_other_thread(queue, value):
IOLoop.current().add_callback(functools.partial(queue.put_nowait,
value))

If your queue is bounded, you need a little more magic. Something like this
(untested):

def put_to_queue_from_other_thread(queue, value):
event = threading.Event()
async def callback():
await queue.put(value)
event.set()
IOLoop.current().add_callback(callback)
event.wait()

Something like this is probably worth adding to the Tornado Queue class
(and/or the asyncio Queue class, which has the same problem). Or one could
tackle the problem from the other side, and add a pipe file descriptor to a
synchronous queue so it could also be used asynchronously.

Note that if your server_reading() method is really as simple as you've
shown here, then you can avoid all this threading complexity by rewriting
it to use IOStream (or other async primitives) instead of blocking socket
calls. The basic loop would look like this:

while self.halt is False:
try:
data = await stream.read_bytes(1024, partial=True)
except StreamClosedError:
break
self.write_message(data, binary=True)

-Ben
Post by Pierce Lopez
One way to get stuff back to the IOLoop is to return some value which is
then the result of the Future returned by run_in_executor(). That's not
really what you're going for though. You could pass the thread a reference
to the original IOLoop and it could call add_callback() ("the only method
http://www.tornadoweb.org/en/stable/ioloop.html#tornado.ioloop.IOLoop.add_callback
Still, it's odd that your other thread is reading from `self.sock`, which
is also what the websocket class is reading/writing, I think ...
- Pierce
Post by Pavel
tornado.ioloop.IOLoop.current().run_in_executor(self.executor, self.server_reading)
item = self.queue.get()
self.write_message(item, binary=True)
self.queue.task_done()
assert(isinstance(message, bytes))
self.sock.send(message)
return 'binary'
http_header_read = False
print("before select")
ready_to_read, ready_to_write, in_error = select.select([self.sock], [], [])
print("after select")
data = self.sock.recv(1024)
http_header_read = True
data = data[78:]
self.queue.put(data)
traceback.print_exc()
pass
self.sock.close()
It just hangs on select call - does not happen when I use
self.write_message directly... (with both tornado and python queue)
What I am doing wrong?
Post by Ben Darnell
`self.write_message` may only be called from the IOLoop thread. You're
calling it from `server_reading`, which is run from the executor's thread.
You need to refactor things so that data is passed back to the IOLoop
thread to be written to the websocket.
--
You received this message because you are subscribed to the Google Groups
"Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Loading...