librelist archives

« back to archive

Problem with event streaming and aborted connections

Problem with event streaming and aborted connections

From:
Steve Brown
Date:
2012-12-16 @ 15:40
Hi

I'm using code like the following:

@app.route('/events')
def events():
    current_event_queue = Queue.Queue()
    events_queue.append(current_event_queue)
    return Response(pop_queue(current_event_queue),
mimetype='text/event-stream')

def pop_queue(current_event_queue):
    while True:
        yield current_event_queue.get()

So when someone calls /events, a new data queue is created for them and it
starts yielding from that queue. In another bit of the code, it's calling:

 for event_queue in events_queue:
        event_queue.put(formatted_json)
which adds to the current list of queues.
However - it's not allowing me to add a decorator/anything after this
function to allow me to kill the event queue for that thread after it
finishes. I was hoping to add a callback to make it remove that event_queue.

It gives the error:
Traceback (most recent call last):
  File "C:\Python27\lib\SocketServer.py", line 582, in
process_request_thread
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 51031)
    self.finish_request(request, client_address)
  File "C:\Python27\lib\SocketServer.py", line 323, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "C:\Python27\lib\SocketServer.py", line 640, in __init__
    self.finish()
  File "C:\Python27\lib\SocketServer.py", line 693, in finish
    self.wfile.flush()
  File "C:\Python27\lib\socket.py", line 303, in flush
    self._sock.sendall(view[write_offset:write_offset+buffer_size])
error: [Errno 10053] An established connection was aborted by the software
in your host machine

in the console.

Any ideas?

Thanks,
Steve

Re: [flask] Problem with event streaming and aborted connections

From:
Andy D'Arcy Jewell
Date:
2012-12-16 @ 22:39
On 16/12/12 15:40, Steve Brown wrote:
> Hi
>
> I'm using code like the following:
>
> @app.route('/events')
> def events():
>     current_event_queue = Queue.Queue()
>     events_queue.append(current_event_queue)
>
Hi Steve,

Where are you defining events_queue in the fourth line? If it's defined 
elsewhere, is it in scope? If you want to use a global variable, you 
need to put it into the "g" object, like:

def events():
     current_event_queue = Queue.Queue()
     g.events_queue.append(current_event_queue)

But you'd have to make sure you initialised g.events_queue some where 
like within your before_request function.

HTH,
-Andy