Sunday, January 6, 2008

Python's asynchat module

Introduction



As mentioned in the previous post, I was going to look at how to write a network server using Python's asynchat module. To utilize the asynchat module's capabilities, I had to change the semantics of the echo server a little bit. The echo server using asyncore would echo back the data as soon as it got it. The echo server using asynchat will echo data back line by line, where each line should be terminated by the string "\r\n".



The async_chat interface for server writers


asynchat provides a higher level interface than asyncore. In this interface, to write a server, you subclass asynchat.async_chat, and override two methods:


collect_incoming_data(data)

Unlinks asyncore, you don't have to bother with handle_read() event. The framework will read the data for you and call this method with the data. You probably want to save this data somewhere, in preparation for processing it later

found_terminator()

The framework calls this method when it detects that a 'terminator' has been found in the incoming data stream. The framework decides this based on information you give to the framework using the set_terminator() method.



Getting started


So how do you use this module to write a server? Just as with asyncore, you write a server class and instantiate and object; this object's socket is the server socket; you handle the event handle_accept() and create objects of class async_chat (or, rather, a subclass of async_chat that you created) to handle the client connection. The only difference between asyncore and asynchat is, so far, the object that you instantiate to handle the client connection.

Let's get started. First we look at the driver code:

server_main.py:

import asynchat_echo_server
...
server = module.EchoServer((interface, port))
server.serve_forever()


The server class



Our 'EchoServer' class looks pretty much like before:

asynchat_echo_server.py

class EchoServer(asyncore.dispatcher):

allow_reuse_address = False
request_queue_size = 5
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM


def __init__(self, address, handlerClass=EchoHandler):
self.address = address
self.handlerClass = handlerClass

asyncore.dispatcher.__init__(self)
self.create_socket(self.address_family,
self.socket_type)

if self.allow_reuse_address:
self.set_resue_addr()

self.server_bind()
self.server_activate()


def server_bind(self):
self.bind(self.address)
log.debug("bind: address=%s:%s" % (self.address[0], self.address[1]))


def server_activate(self):
self.listen(self.request_queue_size)
log.debug("listen: backlog=%d" % self.request_queue_size)


def fileno(self):
return self.socket.fileno()


def serve_forever(self):
asyncore.loop()
# TODO: try to implement handle_request()

# Internal use
def handle_accept(self):
(conn_sock, client_address) = self.accept()
if self.verify_request(conn_sock, client_address):
self.process_request(conn_sock, client_address)


def verify_request(self, conn_sock, client_address):
return True


def process_request(self, conn_sock, client_address):
log.info("conn_made: client_address=%s:%s" % \
(client_address[0],
client_address[1]))
self.handlerClass(conn_sock, client_address, self)


def handle_close(self):
self.close()


The difference is in the handlerClass, which is defined to be EchoHandler as before, but is coded differently. When we instantiate this object, it gets added to the global map of sockets that loop() is monitoring, and now loop() will monitor events on the client socket as well as the server socket. There can be any number of sockets. This behaviour is the same as that of asyncore.

handling per-client connections



Here is how we start our new EchoHandler:

class EchoHandler(asynchat.async_chat):

LINE_TERMINATOR = "\r\n"

def __init__(self, conn_sock, client_address, server):
asynchat.async_chat.__init__(self, conn_sock)
self.server = server
self.client_address = client_address
self.ibuffer = []

self.set_terminator(self.LINE_TERMINATOR)


As can be seen, the init method calls async_chat.set_terminator() method with a string argument. The string argument tells async_chat that a message or record is terminated when it encounters the string in the data. Now, loop() will wait on this client socket and call async_chat's handle_read() method. async_chat's handle_read() will read the data, look at it, and call the collect_incoming_data() method that you define:

def collect_incoming_data(self, data):
log.debug("collect_incoming_data: [%s]" % data)
self.ibuffer.append(data)


As you can see, we just buffer the data here for later processing.

Now, in the handle_read() method, async_chat will look for the string set by set_terminator(). If it finds it, then it will call the found_terminator() method that we define:

def found_terminator(self):
log.debug("found_terminator")
self.send_data()


When we find that we have a complete line (because it was terminated by "\r\n") we just send the data back. After all, we are writing an echo server.

Sending data



Sending data back to peers is a common task. Using asyncore, we would create the data to be sent back and put it in a buffer. Then we'd wait for handle_write() events, writing as much data from the buffer to the socket as possible in each event.

asynchat makes this easier. We create the data, put it in a so called 'producer' object, and push the producer object to a FIFO. async_chat will then call each producer in turn, get data from it, send it out over the socket, piece by piece, until the producer is exhausted; it will then move on to the next producer.

If it encounters a None object in place of a producer, async_chat will close the connection.

All this can be accomplished with:

def send_data(self):
data = "".join(self.ibuffer)
log.debug("sending: [%s]" % data)
self.push(data+self.LINE_TERMINATOR)
self.ibuffer = []

As you can see, putting the data in a producer object and pushing it on to the FIFO takes just one line of code: self.push(...). We dont have to define a producer class in the normal case because async_chat provides a simple_producer class for us, and the push() method creates an object of that class, populates it with whatever we supply, and then pushes it on to the FIFO. This behaviour can be over-ridden, using the async_chat module API, but we will look at that in another installment.

We have not bothered to push a None onto the FIFO, because we depend on the client closing the connection. We might have put a timer and when the timer expired, close the connection ourselves, to handle clients that go away without properly closing the connection.

Here is the full code:


import logging
import asyncore
import asynchat
import socket

logging.basicConfig(level=logging.DEBUG, format="%(created)-15s %(levelname)8s %(thread)d %(name)s %(message)s")
log = logging.getLogger(__name__)

BACKLOG = 5
SIZE = 1024

class EchoHandler(asynchat.async_chat):

LINE_TERMINATOR = "\r\n"

def __init__(self, conn_sock, client_address, server):
asynchat.async_chat.__init__(self, conn_sock)
self.server = server
self.client_address = client_address
self.ibuffer = []

self.set_terminator(self.LINE_TERMINATOR)


def collect_incoming_data(self, data):
log.debug("collect_incoming_data: [%s]" % data)
self.ibuffer.append(data)


def found_terminator(self):
log.debug("found_terminator")
self.send_data()


def send_data(self):
data = "".join(self.ibuffer)
log.debug("sending: [%s]" % data)
self.push(data+self.LINE_TERMINATOR)
self.ibuffer = []


def handle_close(self):
log.info("conn_closed: client_address=%s:%s" % \
(self.client_address[0],
self.client_address[1]))

asynchat.async_chat.handle_close(self)

class EchoServer(asyncore.dispatcher):

allow_reuse_address = False
request_queue_size = 5
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM


def __init__(self, address, handlerClass=EchoHandler):
self.address = address
self.handlerClass = handlerClass

asyncore.dispatcher.__init__(self)
self.create_socket(self.address_family,
self.socket_type)

if self.allow_reuse_address:
self.set_resue_addr()

self.server_bind()
self.server_activate()


def server_bind(self):
self.bind(self.address)
log.debug("bind: address=%s:%s" % (self.address[0], self.address[1]))


def server_activate(self):
self.listen(self.request_queue_size)
log.debug("listen: backlog=%d" % self.request_queue_size)


def fileno(self):
return self.socket.fileno()


def serve_forever(self):
asyncore.loop()
# TODO: try to implement handle_request()

# Internal use
def handle_accept(self):
(conn_sock, client_address) = self.accept()
if self.verify_request(conn_sock, client_address):
self.process_request(conn_sock, client_address)


def verify_request(self, conn_sock, client_address):
return True


def process_request(self, conn_sock, client_address):
log.info("conn_made: client_address=%s:%s" % \
(client_address[0],
client_address[1]))
self.handlerClass(conn_sock, client_address, self)


def handle_close(self):
self.close()

1 comment:

Unknown said...

Parijat,

I really liked this post. Nice piece of code, thanks for sharing