Simple Chunk Authentication Protocol
From CodeCodex
Contents
Introduction[edit]
You may need to use a protocol based on Simple Chunk Protocol Format over a network connection, for example over the Internet. In such a situation, you will almost certainly want the two machines to authenticate to each other, to be sure that nobody is accessing your application who shouldn't be doing so. Also, you will probably want to encrypt the data you pass back and forth, to prevent eavesdroppers from getting hold of that data, and also perhaps using it to spoof your application requests and responses in a replay attack.
Note that it is not enough for the client to authenticate itself to the server; the server also needs to authenticate to the client, otherwise it may become possible, depending on the application, for the client to be tricked into giving up sensitive information before it discovers it has connected to the wrong server. In particular, it is not a good idea for the client to send a password to the server, since a false server can then use this password to access the real server.
The proper way to implement such authentication is via a challenge-response or shared-secret protocol. That is, each party sends information (the challenge) to the other that can only be decoded if the latter knows what the shared secret (e.g. the password) is; it then sends back a response proving that it was able to decode the challenge. This must be done in such a way that any eavesdropper is unable to determine what the secret is, or subsequently use the captured data to pretend that it knows what the secret is.
The gold standard for this sort of security is SSL/TLS. However, this requires setting up certificates, which is a level of complication that is not always needed. For some less security-sensitive applications, a simple username/password pair may be sufficient.
The protocol described here is a simplified variant of the Otway-Rees protocol. It is simplified in that full Otway-Rees takes place between three parties: the client, the application server that the client is trying to connect to, and the authentication server, which alone knows everybody's passwords. Here I'm combining the application and authentication servers into one.
The following Python code makes use of the PyCrypto toolkit. This is included as standard in some current Linux distros; check to see if it's present in yours.
The protocol depends on the following pieces of information:
- A client username--this identifies who the client is claiming to be, and is not considered to be a secret.
- A client password--this is associated with the client username. Both the client and the server have to prove to each other that they know this password, so it is important that it is kept secret.
- A server username--this can be used, for example, to identify the service that the client is attempting to access. It is not considered to be a secret. Its main purpose, provided the client and server usernames are different, is to try to make it harder to commit a reflection attack.
- A random 64-bit integer (nonce) generated by the client. This is only sent to the server in an encrypted form, and the response from the server must contain the same number; this helps to guard against a replay attack.
- Randomly-generated session keys for encrypting all the data exchanges that follow the authentication handshake. Why not simply use the client password for all encryption? Because the more material that an attacker can collect that is encrypted with the same key, the more likely it is that they will be able to break the encryption. Thus, the password is used only for the bare minimum, that is to encrypt the initial authentication handshake (including the exchange of the session keys); these session keys are then used for all subsequent encryption. That way, if the session keys are somehow compromised, that destroys the security of that particular session, but it doesn't necessarily mean the password is compromised, or that other sessions are now insecure.
Notes on encryption:
- Probably the easiest way to use a cipher is what's called "ECB" mode, where the same block of data (plaintext) always has the same encrypted representation. However, this is not a very good way to use encryption: the protocol described here will use CBC mode, where in addition to the encryption key, the algorithm also uses a randomly-generated initialization vector (IV), so that the same data encrypted twice will not look the same.
- Note the heavy use of randomly-generated numbers all over the place. It is important that, as far as possible, these numbers be truly random--that is, it must be as hard as possible for an attacker to predict what they might be. PyCrypto includes a built-in source of such cryptographically strong random numbers, and it is important to use this, or something at least as good.
- What happens if an attacker tries randomly changing a few bits in a block of encrypted data as it goes between client and server? For most protocols, it seems very likely that the entire decrypted packet will turn into complete gibberish. Or it may accidentally look like a valid, or almost valid, request or response, albeit a completely different one. In general, it is wise to guard against this possibility (certainly make it extremely unlikely), by the addition of a cryptographic hash to the encrypted data; this can be checked as part of the decryption process, and if the hash doesn't match, then you know the packet has been tampered with, and it can be discarded, an error returned, etc.
Python[edit]
PyCrypto offers a range of encryption and hashing algorithms to choose from. If in doubt, a good choice for encryption is AES. For hashing, commonly-used algorithms like MD5 and SHA-1 seem to be showing weaknesses, but no clear successor has as yet emerged. For illustration I will use RIPE-MD160.
I will refer to the algorithm modules using the generic names Crypter and Hasher; this way, it should be easy to change to other algorithms just by changing the following import statements:
import Crypto.Cipher.AES as Crypter import Crypto.Hash.RIPEMD as Hasher
Chunk IDs[edit]
All the following chunk ID codes are deemed to be defined in a class called ID, just for namespace-management purposes:
class ID :
For details about the structure and manipulation of chunks, see Simple Chunk Protocol Format.
The client sends the initial authentication request to the server as a chunk with the following ID:
request_auth_id = 'HEL1'
This chunk contains the following sub-chunks:
username_id = 'UNAM' # client username username2_id = 'UNA2' # server username randnum_1_id = 'NUM1' # initialization vector for handshake encryption encrypted_id = 'CRYP' # contents encrypted with user password
The encrypted_id chunk is encrypted using the client password as key, and the contents of the randnum_1_id chunk as initialization vector. It contains copies of the client and server usernames and randnum_1_id value for cross-checking purposes, together with the following additional data:
randnum_2_id = 'NUM2' # nonce generated by client session_key_id = 'SKEY' # random session key to be used after initial handshake init_vector_id = 'IV ' # initialization vector to be used with session key
The response from the server to the client is a chunk with the following ID:
reply_auth_id = 'HEL2'
This will contain sub-chunks with ID randnum_1_id and encrypted_id. As before, the contents of the latter are encrypted with the client password as key, and the contents of the randnum_1_id chunk as initialization vector (there is no connection between the randnum_1_id value used by the server and that used by the client!). The encrypted_id chunk returned by the server contains username_id and username2_id with the values swapped as passed by the client--that is, username_id is the server username, and username2_id is the client username. It also contains a randnum_2_id chunk, returning the same value that was passed from the client; and it contains session_key_id and init_vector_id chunks contain the key and initialization vector to be used for its end of the encryption (these bear no relation to the values passed by the client).
#end class ID
Encryption Routines[edit]
The encrypted_id chunk has a special format. Because AES is a block cipher, the data it operates on must be a multiple of 16 bytes in size. Thus, following the sub-chunks in the encrypted_id chunk, there must be 0 to 15 bytes of padding, to ensure that the total length of the chunk contents is a multiple of 16 bytes. This is followed by the RIPE-MD160 digest, which is 20 bytes. This is also encrypted, so it must be taken into account when calculating the length of the padding.
The following utility function calculates how many bytes of padding are needed to add to length bytes of data to ensure it's a multiple of blocksize:
def pad_len(length, blocksize) : """returns smallest n >= 0 such that (length + n) % blocksize = 0.""" return \ (blocksize - length % blocksize) % blocksize #end pad_len
Also, for convenient generation of random numbers:
import Crypto.Util.randpool as randpool random = randpool.RandomPool()
The actual encryption/decryption routines needed for the protocol will be collected into the class crypt_channel:
class crypt_channel :
The following routine takes an initialization vector, encryption key and plaintext contents, and returns the encrypted representation, complete with padding and digest, in the appropriate form for use as the contents of the ID.encrypted_id chunk:
@staticmethod def make_crypt_chunk(iv, key, contents) : """makes a one-off encrypted chunk content block used during the initial handshake. Note that the length of the data returned will be longer than contents, because it includes padding and a digest.""" crypt = Crypter.new(key, Crypter.MODE_CBC, iv) digester = Hasher.new() pad = "\x00" * pad_len(len(contents) + Hasher.digest_size, Crypter.block_size) digester.update(contents + pad) return crypt.encrypt \ ( contents + pad + digester.digest() ) #end make_crypt_chunk
The following routine is the inverse of the above: given the encrypted chunk, and the same initialization vector and key that was used to encrypt it, it will return the decrypted representation, or None if decryption failed.
@staticmethod def decode_crypt_chunk(iv, key, contents) : """tries to decrypt a one-off encrypted chunk content block used during the initial handshake. Returns the plaintext plus pad bytes on success, or None on failure.""" result = None # to begin with if ( len(contents) % Crypter.block_size == 0 and len(contents) >= Hasher.digest_size ) : plaintext = \ Crypter.new(key, Crypter.MODE_CBC, iv).decrypt(contents) actual_digest = plaintext[-Hasher.digest_size:] plaintext = plaintext[:-Hasher.digest_size] digester = Hasher.new() digester.update(plaintext) if actual_digest == digester.digest() : result = plaintext else : sys.stderr.write \ ( "decode_crypt_chunk: digest mismatch, expected" " %r, actual %r\n" % (digester.digest(), actual_digest) ) # debug #end if #end if return result #end decode_crypt_chunk
Note that decode_crypt_chunk will return the contents plus the padding bytes, since it doesn't actually know how many padding bytes there are. But since the ID.encrypted_id chunk consists of a series of sub-chunks, the padding bytes will be discarded when these are extracted, since they do not make up a valid chunk header.
Putting It All Together--Client[edit]
A simple implementation of the client side of the above protocol, suitable for use in single-threaded clients, is the following server_connection class:
class server_connection : """convenient encapsulation of all communication with server.""" def __init__(self) : self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.connect((Host, Port)) temp_iv = random.get_bytes(Crypter.block_size) nonce = random.get_bytes(8) session_iv = random.get_bytes(Crypter.block_size) session_key = random.get_bytes(Crypter.block_size) self.channel = crypt_channel() self.server.sendall \ ( chunk.make ( ID.request_auth_id, chunk.make(ID.username_id, Username) + chunk.make(ID.username2_id, Username2) + chunk.make(ID.randnum_1_id, temp_iv) + chunk.make ( ID.encrypted_id, self.channel.make_crypt_chunk ( iv = temp_iv, key = Password, contents = # make sure this starts with something attacker # will find harder to predict, just for extra # paranoia chunk.make(ID.randnum_2_id, nonce) + chunk.make(ID.username_id, Username) + chunk.make(ID.username2_id, Username2) + chunk.make(ID.randnum_1_id, temp_iv) + chunk.make(ID.init_vector_id, session_iv) + chunk.make(ID.session_key_id, session_key) ) ) ) ) header = receive_all(self.server, 8) response = header + receive_all \ ( self.server, struct.unpack("<4sI", header)[1] ) sys.stderr.write \ ( "server_connection response: %r\n" % response ) # debug response = chunk.extract(response) if response[0] != ID.reply_auth_id : raise IOError("unexpected server response ID") #end if response = dict(chunk.extract_sequence(response[1])) if ( not response.has_key(ID.randnum_1_id) or len(response[ID.randnum_1_id]) != Crypter.block_size or not response.has_key(ID.encrypted_id) ) : raise IOError("incomplete server response") #end if response = self.channel.decode_crypt_chunk \ ( iv = response[ID.randnum_1_id], key = Password, contents = response[ID.encrypted_id] ) response = dict(chunk.extract_sequence(response)) if ( not response.has_key(ID.username_id) or not response.has_key(ID.username2_id) or not response.has_key(ID.randnum_2_id) or not response.has_key(ID.session_key_id) or not response.has_key(ID.init_vector_id) or len(response[ID.init_vector_id]) != Crypter.block_size ) : raise IOError("incomplete server encrypted response") #end if if ( response[ID.username_id] != Username2 or response[ID.username2_id] != Username or response[ID.randnum_2_id] != nonce ) : raise IOError("server response consistency mismatch") #end if common_init_vector = "".join \ ( [chr(ord(session_iv[i]) ^ ord(response[ID.init_vector_id][i])) for i in range(0, Crypter.block_size)] ) sys.stderr.write \ ( "server_connection init: %r, my sess = %r, their sess = %r\n" % (common_init_vector, session_key, response[ID.session_key_id]) ) # debug self.channel.start_session \ ( send_iv = common_init_vector, send_key = session_key, rcv_iv = common_init_vector, rcv_key = response[ID.session_key_id] ) sys.stderr.write \ ( "server_connection: server successfully authenticated.\n" ) # debug #end __init__
This assumes that global variables Username, Username2 and Password have been set to the client username, server username and client password respectively. It also assumes that globals Host and Port have been set to the host name/address and TCP port number to use to connect to the server.
Note that AES requires the key (Password, in this case) to be 16, 24 or 32 bytes long.
The start_session routine, along with details of what happens after the handshake is successful, are described in Simple Chunk Encryption Protocol. Note that the client and server initialization vectors are XORed together before being used for the session encryption; this is a further safeguard against replay attacks.
Putting It All Together--Server[edit]
This example code will not address the multithreading requirements that a realistic server would have to deal with. But assume that an ID.request_auth_id request has been received over a client connection, and parsed into sub-chunks with a sequence equivalent to
args = dict(chunk.extract_sequence(chunk_contents))
and a per-client crypt_channel object created, for example by
client_crypt = crypt_channel()
and a very simple Failure class used to manage error recovery:
class Failure : def __init__(self, Msg) : self.Msg = Msg #end __init__ #end Failure
then the server side of the authentication exchange would look like
try : if not ( args.has_key(ID.username_id) and args.has_key(ID.username2_id) and args.has_key(ID.randnum_1_id) and len(args[ID.randnum_1_id]) == Crypter.block_size and args.has_key(ID.encrypted_id) ) : raise Failure("missing required args") #end if if not ( args[ID.username_id] == Username and args[ID.username2_id] == Username2 ) : raise Failure("incorrect username(s)") #end if encrypted = client_crypt.decode_crypt_chunk \ ( iv = args[ID.randnum_1_id], key = Password, contents = args[ID.encrypted_id] ) if encrypted == None : raise Failure("initial decryption failure") #end if encrypted = dict(chunk.extract_sequence(encrypted)) if not ( encrypted.has_key(ID.username_id) and encrypted.has_key(ID.username2_id) and encrypted.has_key(ID.randnum_1_id) and encrypted.has_key(ID.randnum_2_id) and encrypted[ID.username_id] == args[ID.username_id] and encrypted[ID.username2_id] == args[ID.username2_id] and encrypted[ID.randnum_1_id] == args[ID.randnum_1_id] and encrypted.has_key(ID.randnum_2_id) and encrypted.has_key(ID.session_key_id) and encrypted.has_key(ID.init_vector_id) ) : raise Failure("missing/mismatched required args") #end if temp_iv = random.get_bytes(Crypter.block_size) session_iv = random.get_bytes(Crypter.block_size) session_key = random.get_bytes(Crypter.block_size) common_init_vector = "".join \ ( [chr(ord(session_iv[i]) ^ ord(encrypted[ID.init_vector_id][i])) for i in range(0, Crypter.block_size)] ) sys.stderr.write \ ( "init: %r, my sess = %r, their sess = %r\n" % (common_init_vector, session_key, encrypted[ID.session_key_id]) ) # debug client_crypt.start_session \ ( send_iv = common_init_vector, send_key = session_key, rcv_iv = common_init_vector, rcv_key = encrypted[ID.session_key_id] ) response_chunk = chunk.make \ ( ID.reply_auth_id, chunk.make(ID.randnum_1_id, temp_iv) \ + \ chunk.make \ ( ID.encrypted_id, client_crypt.make_crypt_chunk ( iv = temp_iv, key = Password, contents = # make sure this starts with something attacker # will find harder to predict, just for extra # paranoia chunk.make(ID.session_key_id, session_key) + chunk.make ( ID.username_id, args[ID.username2_id] ) + chunk.make ( ID.username2_id, args[ID.username_id] ) + chunk.make ( ID.randnum_2_id, encrypted[ID.randnum_2_id] ) + chunk.make(ID.init_vector_id, session_iv) ) ) ) ... send response_chunk back to client ... ... further processing to indicate client successfully authenticated ... except Failure, Msg : ... report failure to authenticate, disconnect client connection etc ... #end try