Friday, January 25, 2008

My First Emacs Lisp Function


(defun how-many-chars ()
(interactive)
(message "We are %d chracters into this buffer."
(- (point)
(save-excursion
(goto-char (point-min))
(point)))))

Wednesday, January 9, 2008

Transporting and putting ssh public keys on remote servers the easy way

I login to a lot of Linux boxes, and I use ssh public keys to login. When I create a new account/server, I have to:

  1. copy over my ssh public key to the remote host
  2. login to the remote host
  3. put the public key in the authorized_keys file, set the permissions etc.
  4. delete the copy of the public key

I figured there ought to be an easier way to do this, and here it is:
cat .ssh/id_rsa.pub | ssh parijat@192.168.1.97 "mkdir -p ~/.ssh ; cat - >> ~/.ssh/authorized_keys ; chmod -R go-rwx ~/.ssh"

Tuesday, January 8, 2008

Reactor vs Proactor

I found a comparison of the Reactor and the Proactor pattern here. Both patterns talk about isses that crop up when building a concurrent network server. Both are related alternatives to thread based concurrency (or could work as a complement to thread based concurrency).
Both revolve around the concept of an IO De-multiplexer, event sources and event handlers. The driver program registers some event sources (e.g., sockets) with an IO de-multiplexer (e.g., select() or poll()). When an event occurs on a socket, a corresponding event handler is called. Of course, there must be some map between events from an event source to event handlers.
I found that these patterns are more or less embodied in the Python asyncore and asynchat modules, and want to discuss how the modules implement these patterns.


The Basics

We'll first compare the terminology of the patterns with that of the Python modules.

  • blocking IO: this would translate to a read()/write() on a blocking socket. The call would block until there was some data available to read or the socket was closed. The thread making the call cannot do anything else.
  • non-blocking, synchronous IO: this would translate to a read()/write() on a non-blocking socket. The call would return immediately, either with the data read/written, or with a signal that the IO operation could not complete (e.g., read() returns with -1, and errno set to EWOULBLOCK/EAGAIN. It is then the caller's responsibility to keep calling repeatedly until the operation succeeds.
  • non-blocking, asynchronous IO: this would translate to Unix SIGIO mechanisms (unfortunately, I am not familiar with this), or posix aio_* functions (not familiar with these either). Essentially, these IO calls return immediately, and the OS starts doing the operation in a separate (kernel level) thread; when the operation is ready, the user code is given some notification.

The Reactor Pattern: asyncore

According to the authors, here is how the Reactor pattern, which usually would use non-blocking synchronous IO, would work:
Here's a read in Reactor:
  1. An event handler declares interest in I/O events that indicate readiness for read on a particular socket
  2. The event de-multiplexer waits for events
  3. An event comes in and wakes-up the demultiplexor, and the demultiplexor calls the appropriate handler
  4. The event handler performs the actual read operation, handles the data read, declares renewed interest in I/O events, and returns control to the dispatcher
How does this work in Python? Its done using the asyncore module.
  1. The IO demux is the asyncore.loop() function; it listens for events on sockets using either the select() or poll() OS call. It uses a global or user supplied dictionary to map sockets to event handlers (see below). Event handlers are instances of asyncore.dispatcher (or its subclasses). A dispatcher contains a socket and registers itself in the global map, letting loop() know that its methods should be called in response to events on its sockets. It also, through its readable() and writable() methods, lets loop() know what events it is interested in handling.
  2. loop() uses select() or poll() to wait for events on the sockets it knows about.
  3. select()/poll() returns; loop() goes through each socket that has an event, find the corresponding dispatcher object, determines the type of event, and calls a method corresponding to the event on the dispatcher object. In fact, loop() translates raw readable/writable events on sockets to slightly higher-level events using state information about the socket.
  4. The dispatcher object's method is supposed to perform the actual IO: for example, in handle_read() we would read() the data off the socket and process it. Control then returns to loop(). Of course, one problem is that we should not do lengthy tasks in our handler, because then our server would not behave very concurrently and be unable to process other events in time. But what if we did need to do time-taking tasks in response to the event? Thats a subject for another post. For now we assume that our handlers can return quickly enough that as a whole the server behaves pretty concurrently.


The Proactor pattern: a psuedo-implementation in asynchat


According to the authors, here is how the Proactor pattern, which would usually use true asynchronous IO operations provided by the OS, would work:
Here is a read operation in Proactor (true async):
  1. A handler initiates an asynchronous read operation (note: the OS must support asynchronous I/O). In this case, the handler does not care about I/O readiness events, but instead registers interest in receiving completion events.
  2. The event demultiplexor waits until the operation is completed
  3. While the event demultiplexor waits, the OS executes the read operation in a parallel kernel thread, puts data into a user-defined buffer, and notifies the event demultiplexor that the read is complete
  4. The event demultiplexor calls the appropriate handler;
  5. The event handler handles the data from user defined buffer, starts a new asynchronous operation, and returns control to the event demultiplexor.
How does this work in Python? Using the asynchat module.
  1. Event handlers are instances of asynchat.async_chat (or rather, its subclasses). Taking read as an example, the handler would register interest in reading data by providing a readable() method that returns True.
  2. loop() would then use it to wait on its socket until the socket was readable. When the socket become readable, instead of calling some OS function to read the data, async_chat.handle_read() is called.
  3. This method will slurp up all available data.
  4. Then, handle_read() would call the collect_incoming_data() method of the subclass. From the subclass's point of view, someone else has done the job of doing the actual IO, and it is being signaled that the IO operation is complete.
  5. collect_incoming_data() processes the data, and by returning, implicitly starts a new async IO cycle.
The similarity between asynchat and Proactor is that from the application writer's point of view, he only has to write code to collect_incoming_data(). The difference is that, with asynchat, user level code is doing the IO, instead of true async facilities provided by the OS. The difference is greater when considering write operations. In a true Proactor, the event handler would initiate the write, and the event demultiplexer would wait for the completion event. However, in asynchat, the event handler (the subclass of async_chat) does not initiate the write per-se: it creates the data and pushes it onto a fifo, and loop(), indirectly through async_chat, writes it to the socket using synchronous non-blocking IO.


A Unified API

Basically, Python's asynchat is providing an emulated Proactor interface to application writers. It would be good if asynchat could be redone so that it could use true async IO operations on OSes that support them, and fall back to synchronous IO when it is not available.



Sunday, January 6, 2008

Python's asynchat module

Introduction



As mentioned in the previous post, I was going to look at how to write a network server using Python's asynchat module. To utilize the asynchat module's capabilities, I had to change the semantics of the echo server a little bit. The echo server using asyncore would echo back the data as soon as it got it. The echo server using asynchat will echo data back line by line, where each line should be terminated by the string "\r\n".



The async_chat interface for server writers


asynchat provides a higher level interface than asyncore. In this interface, to write a server, you subclass asynchat.async_chat, and override two methods:


collect_incoming_data(data)

Unlinks asyncore, you don't have to bother with handle_read() event. The framework will read the data for you and call this method with the data. You probably want to save this data somewhere, in preparation for processing it later

found_terminator()

The framework calls this method when it detects that a 'terminator' has been found in the incoming data stream. The framework decides this based on information you give to the framework using the set_terminator() method.



Getting started


So how do you use this module to write a server? Just as with asyncore, you write a server class and instantiate and object; this object's socket is the server socket; you handle the event handle_accept() and create objects of class async_chat (or, rather, a subclass of async_chat that you created) to handle the client connection. The only difference between asyncore and asynchat is, so far, the object that you instantiate to handle the client connection.

Let's get started. First we look at the driver code:

server_main.py:

import asynchat_echo_server
...
server = module.EchoServer((interface, port))
server.serve_forever()


The server class



Our 'EchoServer' class looks pretty much like before:

asynchat_echo_server.py

class EchoServer(asyncore.dispatcher):

allow_reuse_address = False
request_queue_size = 5
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM


def __init__(self, address, handlerClass=EchoHandler):
self.address = address
self.handlerClass = handlerClass

asyncore.dispatcher.__init__(self)
self.create_socket(self.address_family,
self.socket_type)

if self.allow_reuse_address:
self.set_resue_addr()

self.server_bind()
self.server_activate()


def server_bind(self):
self.bind(self.address)
log.debug("bind: address=%s:%s" % (self.address[0], self.address[1]))


def server_activate(self):
self.listen(self.request_queue_size)
log.debug("listen: backlog=%d" % self.request_queue_size)


def fileno(self):
return self.socket.fileno()


def serve_forever(self):
asyncore.loop()
# TODO: try to implement handle_request()

# Internal use
def handle_accept(self):
(conn_sock, client_address) = self.accept()
if self.verify_request(conn_sock, client_address):
self.process_request(conn_sock, client_address)


def verify_request(self, conn_sock, client_address):
return True


def process_request(self, conn_sock, client_address):
log.info("conn_made: client_address=%s:%s" % \
(client_address[0],
client_address[1]))
self.handlerClass(conn_sock, client_address, self)


def handle_close(self):
self.close()


The difference is in the handlerClass, which is defined to be EchoHandler as before, but is coded differently. When we instantiate this object, it gets added to the global map of sockets that loop() is monitoring, and now loop() will monitor events on the client socket as well as the server socket. There can be any number of sockets. This behaviour is the same as that of asyncore.

handling per-client connections



Here is how we start our new EchoHandler:

class EchoHandler(asynchat.async_chat):

LINE_TERMINATOR = "\r\n"

def __init__(self, conn_sock, client_address, server):
asynchat.async_chat.__init__(self, conn_sock)
self.server = server
self.client_address = client_address
self.ibuffer = []

self.set_terminator(self.LINE_TERMINATOR)


As can be seen, the init method calls async_chat.set_terminator() method with a string argument. The string argument tells async_chat that a message or record is terminated when it encounters the string in the data. Now, loop() will wait on this client socket and call async_chat's handle_read() method. async_chat's handle_read() will read the data, look at it, and call the collect_incoming_data() method that you define:

def collect_incoming_data(self, data):
log.debug("collect_incoming_data: [%s]" % data)
self.ibuffer.append(data)


As you can see, we just buffer the data here for later processing.

Now, in the handle_read() method, async_chat will look for the string set by set_terminator(). If it finds it, then it will call the found_terminator() method that we define:

def found_terminator(self):
log.debug("found_terminator")
self.send_data()


When we find that we have a complete line (because it was terminated by "\r\n") we just send the data back. After all, we are writing an echo server.

Sending data



Sending data back to peers is a common task. Using asyncore, we would create the data to be sent back and put it in a buffer. Then we'd wait for handle_write() events, writing as much data from the buffer to the socket as possible in each event.

asynchat makes this easier. We create the data, put it in a so called 'producer' object, and push the producer object to a FIFO. async_chat will then call each producer in turn, get data from it, send it out over the socket, piece by piece, until the producer is exhausted; it will then move on to the next producer.

If it encounters a None object in place of a producer, async_chat will close the connection.

All this can be accomplished with:

def send_data(self):
data = "".join(self.ibuffer)
log.debug("sending: [%s]" % data)
self.push(data+self.LINE_TERMINATOR)
self.ibuffer = []

As you can see, putting the data in a producer object and pushing it on to the FIFO takes just one line of code: self.push(...). We dont have to define a producer class in the normal case because async_chat provides a simple_producer class for us, and the push() method creates an object of that class, populates it with whatever we supply, and then pushes it on to the FIFO. This behaviour can be over-ridden, using the async_chat module API, but we will look at that in another installment.

We have not bothered to push a None onto the FIFO, because we depend on the client closing the connection. We might have put a timer and when the timer expired, close the connection ourselves, to handle clients that go away without properly closing the connection.

Here is the full code:


import logging
import asyncore
import asynchat
import socket

logging.basicConfig(level=logging.DEBUG, format="%(created)-15s %(levelname)8s %(thread)d %(name)s %(message)s")
log = logging.getLogger(__name__)

BACKLOG = 5
SIZE = 1024

class EchoHandler(asynchat.async_chat):

LINE_TERMINATOR = "\r\n"

def __init__(self, conn_sock, client_address, server):
asynchat.async_chat.__init__(self, conn_sock)
self.server = server
self.client_address = client_address
self.ibuffer = []

self.set_terminator(self.LINE_TERMINATOR)


def collect_incoming_data(self, data):
log.debug("collect_incoming_data: [%s]" % data)
self.ibuffer.append(data)


def found_terminator(self):
log.debug("found_terminator")
self.send_data()


def send_data(self):
data = "".join(self.ibuffer)
log.debug("sending: [%s]" % data)
self.push(data+self.LINE_TERMINATOR)
self.ibuffer = []


def handle_close(self):
log.info("conn_closed: client_address=%s:%s" % \
(self.client_address[0],
self.client_address[1]))

asynchat.async_chat.handle_close(self)

class EchoServer(asyncore.dispatcher):

allow_reuse_address = False
request_queue_size = 5
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM


def __init__(self, address, handlerClass=EchoHandler):
self.address = address
self.handlerClass = handlerClass

asyncore.dispatcher.__init__(self)
self.create_socket(self.address_family,
self.socket_type)

if self.allow_reuse_address:
self.set_resue_addr()

self.server_bind()
self.server_activate()


def server_bind(self):
self.bind(self.address)
log.debug("bind: address=%s:%s" % (self.address[0], self.address[1]))


def server_activate(self):
self.listen(self.request_queue_size)
log.debug("listen: backlog=%d" % self.request_queue_size)


def fileno(self):
return self.socket.fileno()


def serve_forever(self):
asyncore.loop()
# TODO: try to implement handle_request()

# Internal use
def handle_accept(self):
(conn_sock, client_address) = self.accept()
if self.verify_request(conn_sock, client_address):
self.process_request(conn_sock, client_address)


def verify_request(self, conn_sock, client_address):
return True


def process_request(self, conn_sock, client_address):
log.info("conn_made: client_address=%s:%s" % \
(client_address[0],
client_address[1]))
self.handlerClass(conn_sock, client_address, self)


def handle_close(self):
self.close()

Thursday, January 3, 2008

Writing a server with Python's asyncore module

The Python asyncore and aynchat modules

The Python standard library provides two modules---asyncore and
asynchat---to help in writing concurrent network servers using
event-based designs. The documentation does not give good examples,
so I am making some notes.

Overview

The basic idea behind the asyncore module is that:

  • there is a function, asyncore.loop() that does select() on a bunch of 'channels'. Channels are thin wrappers around sockets.
  • when select reports an event on any socket, loop() examines the event and the socket's state to create a higher level event;
  • it then calls a method on the channel corresponding to the higher level event.
asyncore provides a low-level, but flexible API to build network
servers. asynchat builds upon asyncore and provides an API that is
more suitable for request/response type of protocols.

aysncore

The asyncore module's API consists of:

  • the loop method, to be called by a driver program;
  • the dispatcher class, to be subclassed to do useful stuff. The dispatcher class is what is called 'channel' elsewhere.


+-------------+ +--------+
| driver code |---------> | loop() |
+-------------+ +--------+
| |
| | loop-dispatcher API (a)
| |
| +--------------+
| | dispatcher |
+----------------->| subclass |
+--------------+
|
| dispatcher-logic API (b)
|
+--------------+
| server logic |
+--------------+


This is all packaged nicely in an object oriented way. So, we have
the dispatcher class, that extends/wraps around the socket class (from
the socket module in the Python standard library). It provides all
the socket class' methods, as well as methods to handle the higher
level events. You are supposed to subclass dispatcher and implement
the event handling methods to do something useful.


The loop-dispatcher API

The loop function looks like this:

loop( [timeout[, use_poll[, map[,count]]]])

What is the map? It is a dictionary whose keys are the
file-descriptors, or fds, of the socket (i.e., socket.fileno()), and
whose values are the dispatcher objects.

When we create a dispatcher object, it automatically gets added to a
global list of sockets. The loop() function does a select() on this
list unless we provide an explicit map. (Hmm... we might always want
to use explicit maps; then our loop calls will be thread safe and we
will be able to launch multiple threads, each calling loop on
different maps.)

Methods a dispatcher subclass should implement

loop() needs some methods from the dispatcher object:

  • readable(): should return True, if you want the fd to be observed for read events;
  • writable(): should return True, if you want the fd to be observed for write events;

If either read or write is true, the corresponding fd will be examined
for errors also. Obviously, it makes no sense to have a dispatcher
which returns False for both readable() and writable().

  • handle_read: socket is readable; dispatcher.recv() can be used to actually get the data
  • handle_write: socket is writable; dispatcher.send(data) can be used to actually send the data
  • handle_error: socket encountered an error
  • handle_expt: socket received OOB data (not really used in practice)
  • handle_close: socket was closed remotely or locally
Server sockets get one more event.

  • handle_accept: a new incoming connection can be accept()ed. Call the accept() method really accept the connection. To create a server socket, call the bind() and listen() methods on it first.
Client sockets get this event:

  • handle_connect: connection to remote endpoint has been made. To initiate the connection, first call the connect() method on it.

Other socket methods are available in dispatch: create_socket(),
close(), set_resue_addr().


How to write a server using asyncore

The standard library documentation gives a client example, but not a
server example. Here are some notes on the latter.

  1. Subclass dispatched to create a listening socket
  2. In its handle_accept method, create new dispatchers. They'll get added to the global socket map.

Note: the handlers must not block or take too much time... or the
server won't be concurrent.

These socket-like functions that dispatcher extends should not be bypassed. They do funky things to detect higher level events. For e.g., how does asyncore figure out that the socket is closed? If I remember correctly, there are two ways to detect whether a non-blocking socket is closed:

  • select() returns a read event, but when you call recv()/read() you get zero bytes;
  • you call send()/write() and it fails with an error (sending zero bytes is not an error).

(I wish I had a copy of Unix Network Programming by Stevens handy
right now.)

Will look at asynchat in another post.

The code for the server is below:


asyncore_echo_server.py

import logging
import asyncore
import socket

logging.basicConfig(level=logging.DEBUG, format="%(created)-15s %(msecs)d %(levelname)8s %(thread)d %(name)s %(message)s")
log = logging.getLogger(__name__)

BACKLOG = 5
SIZE = 1024

class EchoHandler(asyncore.dispatcher):


def __init__(self, conn_sock, client_address, server):
self.server = server
self.client_address = client_address
self.buffer = ""

# We dont have anything to write, to start with
self.is_writable = False

# Create ourselves, but with an already provided socket
asyncore.dispatcher.__init__(self, conn_sock)
log.debug("created handler; waiting for loop")

def readable(self):
return True # We are always happy to read


def writable(self):
return self.is_writable # But we might not have
# anything to send all the time


def handle_read(self):
log.debug("handle_read")
data = self.recv(SIZE)
log.debug("after recv")
if data:
log.debug("got data")
self.buffer += data
self.is_writable = True # sth to send back now
else:
log.debug("got null data")

def handle_write(self):
log.debug("handle_write")
if self.buffer:
sent = self.send(self.buffer)
log.debug("sent data")
self.buffer = self.buffer[sent:]
else:
log.debug("nothing to send")
if len(self.buffer) == 0:
self.is_writable = False


# Will this ever get called? Does loop() call
# handle_close() if we called close, to start with?
def handle_close(self):
log.debug("handle_close")
log.info("conn_closed: client_address=%s:%s" % \
(self.client_address[0],
self.client_address[1]))
self.close()
#pass


class EchoServer(asyncore.dispatcher):

allow_reuse_address = False
request_queue_size = 5
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM


def __init__(self, address, handlerClass=EchoHandler):
self.address = address
self.handlerClass = handlerClass

asyncore.dispatcher.__init__(self)
self.create_socket(self.address_family,
self.socket_type)

if self.allow_reuse_address:
self.set_resue_addr()

self.server_bind()
self.server_activate()


def server_bind(self):
self.bind(self.address)
log.debug("bind: address=%s:%s" % (self.address[0], self.address[1]))


def server_activate(self):
self.listen(self.request_queue_size)
log.debug("listen: backlog=%d" % self.request_queue_size)


def fileno(self):
return self.socket.fileno()


def serve_forever(self):
asyncore.loop()


# TODO: try to implement handle_request()

# Internal use
def handle_accept(self):
(conn_sock, client_address) = self.accept()
if self.verify_request(conn_sock, client_address):
self.process_request(conn_sock, client_address)


def verify_request(self, conn_sock, client_address):
return True


def process_request(self, conn_sock, client_address):
log.info("conn_made: client_address=%s:%s" % \
(client_address[0],
client_address[1]))
self.handlerClass(conn_sock, client_address, self)


def handle_close(self):
self.close()


and to use it:

server = asyncore_echo_server.EchoServer((interface, port))
server.serve_forever()