Simple Chunk Authentication Protocol

From CodeCodex

Introduction[edit]

You may need to use a protocol based on Simple Chunk Protocol Format over a network connection, for example over the Internet. In such a situation, you will almost certainly want the two machines to authenticate to each other, to be sure that nobody is accessing your application who shouldn't be doing so. Also, you will probably want to encrypt the data you pass back and forth, to prevent eavesdroppers from getting hold of that data, and also perhaps using it to spoof your application requests and responses in a replay attack.

Note that it is not enough for the client to authenticate itself to the server; the server also needs to authenticate to the client, otherwise it may become possible, depending on the application, for the client to be tricked into giving up sensitive information before it discovers it has connected to the wrong server. In particular, it is not a good idea for the client to send a password to the server, since a false server can then use this password to access the real server.

The proper way to implement such authentication is via a challenge-response or shared-secret protocol. That is, each party sends information (the challenge) to the other that can only be decoded if the latter knows what the shared secret (e.g. the password) is; it then sends back a response proving that it was able to decode the challenge. This must be done in such a way that any eavesdropper is unable to determine what the secret is, or subsequently use the captured data to pretend that it knows what the secret is.

The gold standard for this sort of security is SSL/TLS. However, this requires setting up certificates, which is a level of complication that is not always needed. For some less security-sensitive applications, a simple username/password pair may be sufficient.

The protocol described here is a simplified variant of the Otway-Rees protocol. It is simplified in that full Otway-Rees takes place between three parties: the client, the application server that the client is trying to connect to, and the authentication server, which alone knows everybody's passwords. Here I'm combining the application and authentication servers into one.

The following Python code makes use of the PyCrypto toolkit. This is included as standard in some current Linux distros; check to see if it's present in yours.

The protocol depends on the following pieces of information:

  • A client username--this identifies who the client is claiming to be, and is not considered to be a secret.
  • A client password--this is associated with the client username. Both the client and the server have to prove to each other that they know this password, so it is important that it is kept secret.
  • A server username--this can be used, for example, to identify the service that the client is attempting to access. It is not considered to be a secret. Its main purpose, provided the client and server usernames are different, is to try to make it harder to commit a reflection attack.
  • A random 64-bit integer (nonce) generated by the client. This is only sent to the server in an encrypted form, and the response from the server must contain the same number; this helps to guard against a replay attack.
  • Randomly-generated session keys for encrypting all the data exchanges that follow the authentication handshake. Why not simply use the client password for all encryption? Because the more material that an attacker can collect that is encrypted with the same key, the more likely it is that they will be able to break the encryption. Thus, the password is used only for the bare minimum, that is to encrypt the initial authentication handshake (including the exchange of the session keys); these session keys are then used for all subsequent encryption. That way, if the session keys are somehow compromised, that destroys the security of that particular session, but it doesn't necessarily mean the password is compromised, or that other sessions are now insecure.

Notes on encryption:

  • Probably the easiest way to use a cipher is what's called "ECB" mode, where the same block of data (plaintext) always has the same encrypted representation. However, this is not a very good way to use encryption: the protocol described here will use CBC mode, where in addition to the encryption key, the algorithm also uses a randomly-generated initialization vector (IV), so that the same data encrypted twice will not look the same.
  • Note the heavy use of randomly-generated numbers all over the place. It is important that, as far as possible, these numbers be truly random--that is, it must be as hard as possible for an attacker to predict what they might be. PyCrypto includes a built-in source of such cryptographically strong random numbers, and it is important to use this, or something at least as good.
  • What happens if an attacker tries randomly changing a few bits in a block of encrypted data as it goes between client and server? For most protocols, it seems very likely that the entire decrypted packet will turn into complete gibberish. Or it may accidentally look like a valid, or almost valid, request or response, albeit a completely different one. In general, it is wise to guard against this possibility (certainly make it extremely unlikely), by the addition of a cryptographic hash to the encrypted data; this can be checked as part of the decryption process, and if the hash doesn't match, then you know the packet has been tampered with, and it can be discarded, an error returned, etc.

Python[edit]

PyCrypto offers a range of encryption and hashing algorithms to choose from. If in doubt, a good choice for encryption is AES. For hashing, commonly-used algorithms like MD5 and SHA-1 seem to be showing weaknesses, but no clear successor has as yet emerged. For illustration I will use RIPE-MD160.

I will refer to the algorithm modules using the generic names Crypter and Hasher; this way, it should be easy to change to other algorithms just by changing the following import statements:

import Crypto.Cipher.AES as Crypter
import Crypto.Hash.RIPEMD as Hasher

Chunk IDs[edit]

All the following chunk ID codes are deemed to be defined in a class called ID, just for namespace-management purposes:

class ID :

For details about the structure and manipulation of chunks, see Simple Chunk Protocol Format.

The client sends the initial authentication request to the server as a chunk with the following ID:

    request_auth_id = 'HEL1'

This chunk contains the following sub-chunks:

    username_id = 'UNAM' # client username
    username2_id = 'UNA2' # server username
    randnum_1_id = 'NUM1' # initialization vector for handshake encryption
    encrypted_id = 'CRYP' # contents encrypted with user password

The encrypted_id chunk is encrypted using the client password as key, and the contents of the randnum_1_id chunk as initialization vector. It contains copies of the client and server usernames and randnum_1_id value for cross-checking purposes, together with the following additional data:


    randnum_2_id = 'NUM2' # nonce generated by client
    session_key_id = 'SKEY' # random session key to be used after initial handshake
    init_vector_id = 'IV  ' # initialization vector to be used with session key

The response from the server to the client is a chunk with the following ID:

    reply_auth_id = 'HEL2'

This will contain sub-chunks with ID randnum_1_id and encrypted_id. As before, the contents of the latter are encrypted with the client password as key, and the contents of the randnum_1_id chunk as initialization vector (there is no connection between the randnum_1_id value used by the server and that used by the client!). The encrypted_id chunk returned by the server contains username_id and username2_id with the values swapped as passed by the client--that is, username_id is the server username, and username2_id is the client username. It also contains a randnum_2_id chunk, returning the same value that was passed from the client; and it contains session_key_id and init_vector_id chunks contain the key and initialization vector to be used for its end of the encryption (these bear no relation to the values passed by the client).

#end class ID


Encryption Routines[edit]

The encrypted_id chunk has a special format. Because AES is a block cipher, the data it operates on must be a multiple of 16 bytes in size. Thus, following the sub-chunks in the encrypted_id chunk, there must be 0 to 15 bytes of padding, to ensure that the total length of the chunk contents is a multiple of 16 bytes. This is followed by the RIPE-MD160 digest, which is 20 bytes. This is also encrypted, so it must be taken into account when calculating the length of the padding.

The following utility function calculates how many bytes of padding are needed to add to length bytes of data to ensure it's a multiple of blocksize:

def pad_len(length, blocksize) :
    """returns smallest n >= 0 such that (length + n) % blocksize = 0."""
    return \
        (blocksize - length % blocksize) % blocksize
#end pad_len

Also, for convenient generation of random numbers:

import Crypto.Util.randpool as randpool

random = randpool.RandomPool()

The actual encryption/decryption routines needed for the protocol will be collected into the class crypt_channel:

class crypt_channel :

The following routine takes an initialization vector, encryption key and plaintext contents, and returns the encrypted representation, complete with padding and digest, in the appropriate form for use as the contents of the ID.encrypted_id chunk:

    @staticmethod
    def make_crypt_chunk(iv, key, contents) :
        """makes a one-off encrypted chunk content block used
        during the initial handshake. Note that the length of the
        data returned will be longer than contents, because it
        includes padding and a digest."""
        crypt = Crypter.new(key, Crypter.MODE_CBC, iv)
        digester = Hasher.new()
        pad = "\x00" * pad_len(len(contents) + Hasher.digest_size, Crypter.block_size)
        digester.update(contents + pad)
        return crypt.encrypt \
          (
                contents
            +
                pad
            +
                digester.digest()
          )
    #end make_crypt_chunk

The following routine is the inverse of the above: given the encrypted chunk, and the same initialization vector and key that was used to encrypt it, it will return the decrypted representation, or None if decryption failed.

    @staticmethod
    def decode_crypt_chunk(iv, key, contents) :
        """tries to decrypt a one-off encrypted chunk content
        block used during the initial handshake. Returns the
        plaintext plus pad bytes on success, or None on failure."""
        result = None # to begin with
        if (
                len(contents) % Crypter.block_size == 0
            and
                len(contents) >= Hasher.digest_size
        ) :
            plaintext = \
                Crypter.new(key, Crypter.MODE_CBC, iv).decrypt(contents)
            actual_digest = plaintext[-Hasher.digest_size:]
            plaintext = plaintext[:-Hasher.digest_size]
            digester = Hasher.new()
            digester.update(plaintext)
            if actual_digest == digester.digest() :
                result = plaintext
            else :
                sys.stderr.write \
                  (
                        "decode_crypt_chunk: digest mismatch, expected"
                        " %r, actual %r\n"
                    %
                        (digester.digest(), actual_digest)
                  ) # debug
            #end if
        #end if
        return result
    #end decode_crypt_chunk

Note that decode_crypt_chunk will return the contents plus the padding bytes, since it doesn't actually know how many padding bytes there are. But since the ID.encrypted_id chunk consists of a series of sub-chunks, the padding bytes will be discarded when these are extracted, since they do not make up a valid chunk header.


Putting It All Together--Client[edit]

A simple implementation of the client side of the above protocol, suitable for use in single-threaded clients, is the following server_connection class:

class server_connection :
    """convenient encapsulation of all communication with server."""

    def __init__(self) :
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.connect((Host, Port))
        temp_iv = random.get_bytes(Crypter.block_size)
        nonce = random.get_bytes(8)
        session_iv = random.get_bytes(Crypter.block_size)
        session_key = random.get_bytes(Crypter.block_size)
        self.channel = crypt_channel()
        self.server.sendall \
          (
            chunk.make
              (
                ID.request_auth_id,
                    chunk.make(ID.username_id, Username)
                +
                    chunk.make(ID.username2_id, Username2)
                +
                    chunk.make(ID.randnum_1_id, temp_iv)
                +
                    chunk.make
                      (
                        ID.encrypted_id,
                        self.channel.make_crypt_chunk
                          (
                            iv = temp_iv,
                            key = Password,
                            contents =
                              # make sure this starts with something attacker
                              # will find harder to predict, just for extra
                              # paranoia
                                    chunk.make(ID.randnum_2_id, nonce)
                                +
                                    chunk.make(ID.username_id, Username)
                                +
                                    chunk.make(ID.username2_id, Username2)
                                +
                                    chunk.make(ID.randnum_1_id, temp_iv)
                                +
                                    chunk.make(ID.init_vector_id, session_iv)
                                +
                                    chunk.make(ID.session_key_id, session_key)
                          )
                      )
              )
          )
        header = receive_all(self.server, 8)
        response = header + receive_all \
          (
            self.server,
            struct.unpack("<4sI", header)[1]
          )
        sys.stderr.write \
          (
                "server_connection response: %r\n"
            %
                response
          ) # debug
        response = chunk.extract(response)
        if response[0] != ID.reply_auth_id :
            raise IOError("unexpected server response ID")
        #end if
        response = dict(chunk.extract_sequence(response[1]))
        if (
                not response.has_key(ID.randnum_1_id)
            or
                len(response[ID.randnum_1_id]) != Crypter.block_size
            or
                not response.has_key(ID.encrypted_id)
        ) :
            raise IOError("incomplete server response")
        #end if
        response = self.channel.decode_crypt_chunk \
          (
            iv = response[ID.randnum_1_id],
            key = Password,
            contents = response[ID.encrypted_id]
          )
        response = dict(chunk.extract_sequence(response))
        if (
                not response.has_key(ID.username_id)
            or
                not response.has_key(ID.username2_id)
            or
                not response.has_key(ID.randnum_2_id)
            or
                not response.has_key(ID.session_key_id)
            or
                not response.has_key(ID.init_vector_id)
            or
                len(response[ID.init_vector_id]) != Crypter.block_size
        ) :
            raise IOError("incomplete server encrypted response")
        #end if
        if (
                response[ID.username_id] != Username2
            or
                response[ID.username2_id] != Username
            or
                response[ID.randnum_2_id] != nonce
        ) :
            raise IOError("server response consistency mismatch")
        #end if
        common_init_vector = "".join \
          (
            [chr(ord(session_iv[i]) ^ ord(response[ID.init_vector_id][i]))
                for i in range(0, Crypter.block_size)]
          )
        sys.stderr.write \
          (
                "server_connection init: %r, my sess = %r, their sess = %r\n"
            %
                (common_init_vector, session_key, response[ID.session_key_id])
          ) # debug
        self.channel.start_session \
          (
            send_iv = common_init_vector,
            send_key = session_key,
            rcv_iv = common_init_vector,
            rcv_key = response[ID.session_key_id]
          )
        sys.stderr.write \
          (
            "server_connection: server successfully authenticated.\n"
          ) # debug
    #end __init__

This assumes that global variables Username, Username2 and Password have been set to the client username, server username and client password respectively. It also assumes that globals Host and Port have been set to the host name/address and TCP port number to use to connect to the server.

Note that AES requires the key (Password, in this case) to be 16, 24 or 32 bytes long.

The start_session routine, along with details of what happens after the handshake is successful, are described in Simple Chunk Encryption Protocol. Note that the client and server initialization vectors are XORed together before being used for the session encryption; this is a further safeguard against replay attacks.

Putting It All Together--Server[edit]

This example code will not address the multithreading requirements that a realistic server would have to deal with. But assume that an ID.request_auth_id request has been received over a client connection, and parsed into sub-chunks with a sequence equivalent to

args = dict(chunk.extract_sequence(chunk_contents))

and a per-client crypt_channel object created, for example by

client_crypt = crypt_channel()

and a very simple Failure class used to manage error recovery:

class Failure :
    def __init__(self, Msg) :
        self.Msg = Msg
    #end __init__
#end Failure

then the server side of the authentication exchange would look like

    try :
        if not (
                args.has_key(ID.username_id)
            and
                args.has_key(ID.username2_id)
            and
                args.has_key(ID.randnum_1_id)
            and
                len(args[ID.randnum_1_id]) == Crypter.block_size
            and
                args.has_key(ID.encrypted_id)
        ) :
            raise Failure("missing required args")
        #end if
        if not (
                args[ID.username_id] == Username
            and
                args[ID.username2_id] == Username2
        ) :
            raise Failure("incorrect username(s)")
        #end if
        encrypted = client_crypt.decode_crypt_chunk \
          (
            iv = args[ID.randnum_1_id],
            key = Password,
            contents = args[ID.encrypted_id]
          )
        if encrypted == None :
            raise Failure("initial decryption failure")
        #end if
        encrypted = dict(chunk.extract_sequence(encrypted))
        if not (
                encrypted.has_key(ID.username_id)
            and
                encrypted.has_key(ID.username2_id)
            and
                encrypted.has_key(ID.randnum_1_id)
            and
                encrypted.has_key(ID.randnum_2_id)
            and
                encrypted[ID.username_id] == args[ID.username_id]
            and
                encrypted[ID.username2_id] == args[ID.username2_id]
            and
                encrypted[ID.randnum_1_id] == args[ID.randnum_1_id]
            and
                encrypted.has_key(ID.randnum_2_id)
            and
                encrypted.has_key(ID.session_key_id)
            and
                encrypted.has_key(ID.init_vector_id)
        ) :
            raise Failure("missing/mismatched required args")
        #end if
        temp_iv = random.get_bytes(Crypter.block_size)
        session_iv = random.get_bytes(Crypter.block_size)
        session_key = random.get_bytes(Crypter.block_size)
        common_init_vector = "".join \
          (
            [chr(ord(session_iv[i]) ^ ord(encrypted[ID.init_vector_id][i]))
                for i in range(0, Crypter.block_size)]
          )
        sys.stderr.write \
          (
                "init: %r, my sess = %r, their sess = %r\n"
            %
                (common_init_vector, session_key, encrypted[ID.session_key_id])
          ) # debug
        client_crypt.start_session \
          (
            send_iv = common_init_vector,
            send_key = session_key,
            rcv_iv = common_init_vector,
            rcv_key = encrypted[ID.session_key_id]
          )
        response_chunk = chunk.make \
          (
            ID.reply_auth_id,
                chunk.make(ID.randnum_1_id, temp_iv) \
            + \
                chunk.make \
                  (
                    ID.encrypted_id,
                    client_crypt.make_crypt_chunk
                      (
                        iv = temp_iv,
                        key = Password,
                        contents =
                          # make sure this starts with something attacker
                          # will find harder to predict, just for extra
                          # paranoia
                                chunk.make(ID.session_key_id, session_key)
                            +
                                chunk.make
                                  (
                                    ID.username_id,
                                    args[ID.username2_id]
                                  )
                            +
                                chunk.make
                                  (
                                    ID.username2_id,
                                    args[ID.username_id]
                                  )
                            +
                                chunk.make
                                  (
                                    ID.randnum_2_id,
                                    encrypted[ID.randnum_2_id]
                                  )
                            +
                            chunk.make(ID.init_vector_id, session_iv)
                      )
                  )
          )
        ... send response_chunk back to client ...
        ... further processing to indicate client successfully authenticated ...
    except Failure, Msg :
        ... report failure to authenticate, disconnect client connection etc ...
    #end try