Simple Chunk Encryption Protocol

From CodeCodex

This article follows on from Simple Chunk Authentication Protocol to describe one way to handle encryption of actual requests and responses once the initial authentication handshake has been completed.

Here is the start_session method of the crypt_channel class, which sets up the session encryption contexts in both directions:

# continuation of class crypt_channel :

    def start_session(self, send_iv, send_key, rcv_iv, rcv_key) :
        """must be called after initial handshake, before encrypt_chunk
        and decrypt_chunk_header/contents routines can be used."""
        self.send_crypt = Crypter.new(send_key, Crypter.MODE_CBC, send_iv)
        self.rcv_crypt = Crypter.new(rcv_key, Crypter.MODE_CBC, rcv_iv)
    #end start_session

As previously explained, it is important not only to encrypt the data being sent back and forth, but to apply a cryptographic hash to guard against malicious modification of the data. Thus, a couple of tweaks need to be made to the original Simple Chunk Protocol Format in order to provide a bit more robust encryption support.

First, immediately following the chunk header and contents, there will be 20 bytes of RIPE-MD160 hash digest (not counted in the chunk length). Since the block cipher wants its data to be an exact multiple of 16 bytes, this is followed by 0 to 15 bytes of padding, also not counted in the chunk length. But since we know the chunk length and digest length, it's easy for the receiver to calculate how much padding will be following, and drop that number of bytes off the end after decryption.

However, there is one further issue to beware of. If an attacker maliciously modifies the encrypted data, the hash digest is of course virtually guaranteed to pick this up. But what if this causes the chunk length field to have some strange, possibly very large value? Then the recipient could end up reading a great deal of data before getting to what it thinks is the hash digest. It could even run out of memory before that point.

Therefore it seems worth while to add some extra protection up front for the chunk length field, so that corruption of this can be picked up fairly quickly, before any more garbage data is read.

Rather than compute another 20-byte digest, I simply add another 4-byte header field, which contains the ones-complement of the chunk length. This is included in the encryption, but not in the hash digest computation (it doesn't need to be): this way, as soon as the first 12 bytes of a chunk have been decrypted (actually the first 16 bytes, because of the cipher block size requirement), it is possible to do a quick check that the two versions of the length field agree, before continuing to try to decrypt the rest of the chunk.

Here is the routine that implements the construction and encryption of the chunk, all in one step:

# continuation of class crypt_channel :

    def encrypt_chunk(self, id, contents) :
        """returns an encrypted representation of the chunk with the
        specified ID and contents."""
        header = struct.pack("<4sI", id[0:4], len(contents))
        header2 = struct.pack("<I", 0xFFFFFFFF ^ len(contents))
        pad = \
                "\x00" \
            * \
                pad_len(12 + len(contents) + Hasher.digest_size, Crypter.block_size)
        digester = Hasher.new()
        digester.update(header + contents)
        result = self.send_crypt.encrypt \
          (
                header
            +
                header2
            +
                contents
            +
                digester.digest()
            +
                pad
          )
        sys.stderr.write \
          (
                "chunk id %r, contents %r digest %r encrypts to %r\n"
            %
                (id, contents, digester.digest(), result)
          ) # debug
        return result
    #end encrypt_chunk

The decryption of the chunk by the receiver is slightly more complicated. With unencrypted chunks, you had to read the first eight bytes of chunk header, then extract the length from that to decide how many more bytes to read. With encrypted chunks, you have to read the first 16 bytes, decode that into the regular chunk header, the additional length chunk, and the first four bytes of the rest of the data. (There will always be at least four bytes of the rest of the data, because even if the chunk content is zero-length, there will still be the 20 bytes of hash digest.) This is implemented by the following decrypt_chunk_header routine:

# continuation of class crypt_channel :

    def decrypt_chunk_header(self, header) :
        """tries to decode the specified header, which must be 16
        bytes long, as a chunk header. Returns the ID, length,
        first four bytes of the rest of the chunk, and how many
        bytes remaining to be read (including digest and padding)
        as a tuple on success, or None on failure. The channel
        should probably be abandoned on failure."""
        result = None # initial assumption
        if len(header) == 16 :
            plaintext = self.rcv_crypt.decrypt(header)
            (id, length, notlength, first_contents) = \
                struct.unpack("<4sII4s", plaintext)
            sys.stderr.write \
              (
                    "decrypt_chunk_header: id = %r, length = %u = %u\n"
                %
                    (id, length, 0xFFFFFFFF ^ notlength)
              ) # debug
            if length == 0xFFFFFFFF ^ notlength :
                to_read_rest = (
                        length
                    +
                        Hasher.digest_size
                    -
                        4
                    +
                        pad_len
                          (
                            12 + length + Hasher.digest_size,
                            Crypter.block_size
                          )

                )
                result = (id, length, first_contents, to_read_rest)
            #end if
        #end if
        return result
    #end decrypt_chunk_header

Having successfully decrypted the header and verified the length looks OK, you can read the rest of the chunk and assemble the complete decrypted contents with decrypt_chunk_contents:

# conclusion of class crypt_channel :

    def decrypt_chunk_contents \
      (
        self, id, length, first_contents, rest_contents
      ) :
        """tries to decode rest_contents as the rest of the chunk
        contents, where id, length and first_contents are the
        previously-decrypted values as returned from decrypt_chunk_header.
        Returns the complete decrypted contents (including first_contents),
        sans digest and padding, on success. Returns None if the digest
        failed to verify."""
        sys.stderr.write \
          (
                "decrypt_chunk_contents: id = %r, length = %r, first_contents = %r, rest_contents = %r\n"
            %
                (id, length, first_contents, rest_contents)
          ) # debug
        result = None # initial assumption
        if (
                len(rest_contents) + 4 >= length + Hasher.digest_size
            and
                len(rest_contents) % Crypter.block_size == 0
        ) :
            header = struct.pack("<4sI", id[0:4], length)
            contents = first_contents + self.rcv_crypt.decrypt(rest_contents)
            sys.stderr.write \
              (
                    "decrypt_chunk_contents: decrypted contents = %r\n"
                %
                    contents
              ) # debug
            actual_digest = contents[length:length + Hasher.digest_size]
              # note that padding is ignored
            contents = contents[:length]
            digester = Hasher.new()
            digester.update(header + contents)
            if actual_digest == digester.digest() :
                result = contents
            else :
                sys.stderr.write \
                  (
                        "decrypt_chunk_contents: digest mismatch, expected"
                        " %r, actual %r\n"
                    %
                        (digester.digest(), actual_digest)
                  ) # debug
            #end if
        #end if
        return result
    #end decrypt_chunk_contents

#end crypt_channel

To see how these routines are used in practice, here is the rest of the server_connection class, containing the request method which the client calls to send a request to the server and receive the response:

# conclusion of class server_connection :

    def request(self, id, contents, parse = False) :
        """sends a request to the server and returns the response."""
        if type(contents) == dict :
            contents = "".join \
              (
                chunk.make(id, str(v)) for v in contents.values()
              )
        else :
            contents = str(contents)
        #end if
        self.server.sendall(self.channel.encrypt_chunk(id, contents))
        response = self.channel.decrypt_chunk_header \
          (
            receive_all(self.server, 16)
          )
        if response == None :
            raise IOError("server_connection: failed server response header")
        #end if
        (response_id, response_length, first_response, to_read) = response
        response = self.channel.decrypt_chunk_contents \
          (
            id = response_id,
            length = response_length,
            first_contents = first_response,
            rest_contents = receive_all(self.server, to_read)
          )
        if response == None :
            raise IOError("server_connection: failed server response contents")
        #end if
        if parse :
            response = dict(chunk.extract_sequence(response))
        #end if
        return (response_id, response)
    #end request

    def close(self) :
        if self.server != None :
            self.server.close()
            self.server = None
        #end if
        self.channel = None
    #end close

#end server_connection