Jauhar's Blog


Writing Simple Redis Server With Python

2022/10/25

Let’s write a simple Redis server using Python.

Redis is an in memory key-value database. Key-value database is very simple. At very minimum, there are two kind of queries: SET and GET. You can SET a key to a value, and you can GET a key to return a value. Of course, in practice, we need more capabilities than just SET and GET such as delete, timeout, scan, etc. Although, I can argue that most of those operations technically can be achieved by SET and GET operations. To perform delete, we can just SET the key into null or special value indicating that the key is deleted. To add timeout, we can simply put the expired time during the SET operations and check it during GET operation. But, most key value databases support those operation natively because it’s very commonly used by the users. Redis is a very popular key-value database, and it’s protocol is very simple. Let’s write a Redis server implementation in Python.

RESP

Redis documentation is really good and well-written. The wire protocol is also well documented here. Redis called its protocol RESP, which stands for Redis serialization protocol. RESP was designed to be human readable, which means, you can read the protocol without parsing it. It uses newline characters to split items in the protocol and represents number using text instead of its binary representation. But, even though RESP is human readable, it was also designed to be fast to parse and simple to implement.

Redis server runs on TCP connection. A client can connect to the server by creating a TCP connection and talks using RESP. By default, Redis servers use port 6379 as the TCP port, but we will use a different port. RESP works using request-response model, the client sends a request, and the server send back the response. In the context of redis, the requests consist of command and its argument. The list of Redis commands and their arguments are documented here. We are going to implement 4 redis commands: GET, SET, SELECT, and COMMAND.

Let’s start writing a simple TCP server in Python:

 1import socket
 2import logging
 3import dataclasses
 4import asyncio
 5
 6@dataclasses.dataclass
 7class Config:
 8    bind: str
 9    port: int
10
11async def run_server(config: Config):
12    tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
13    server_address = (config.bind, config.port)
14    tcp_socket.bind(server_address)
15    tcp_socket.listen()
16    tcp_socket.setblocking(False)
17
18    loop = asyncio.get_event_loop()
19
20    logging.info("Server is listening to: %s:%d", config.bind, config.port)
21
22    while True:
23        connection, client = await loop.sock_accept(tcp_socket)
24        logging.info("New client is connected: %s", client)
25
26        loop.create_task(handle_client(connection, client))
27
28
29async def handle_client(connection: socket.socket, client: socket.AddressInfo):
30    loop = asyncio.get_event_loop()
31    while True:
32        ch = await loop.sock_recv(connection, 1)
33        await connection.sendall(ch)
34
35
36if __name__ == "__main__":
37    logging.basicConfig(level=logging.INFO)
38    config = Config(bind='0.0.0.0', port=5000)
39    try:
40        asyncio.run(run_server(config))
41    except KeyboardInterrupt:
42        logging.info("Stopping")

Data Types

There are few data types supported in RESP. Some of them are: String, Array, Integer, Null, and Error. The String data type itself technically can be divided into 2 types: Simple String and Bulk String, but we will just refer them as String. The client requests are represented as an Array, where the first index is a String containing the command to be executed, and the rest of elements are the command arguments. For example, the Array ["SET", "key_example", "value_example"] represents a client request to set a key "key_example" to a String "value_example". The server responses are also represented as one of the Redis data types. Depending on the command, some command returns a simple String “Ok”, some returns an Array.

Encoding

Let’s start writing Python implementation to encode the Redis data types. We are going to represents the Redis data types using Python data types. We are going to use Python list for Redis Array, Python bytes for Redis String, Python int for Redis Integer, Python None for Redis Null and an Error class below to represents Redis Error.

1import dataclasses
2
3@dataclasses.dataclass
4class Error:
5    code: str
6    message: str

The documentation of RESP data type is very clear. To encode a String, we can simply write $ character followed by the length of the string, followed by \r\n (a CRLF), followed by the string itself, and followed by \r\n again. So, a String “hello” can be encoded into $5\r\nhello\r\n, and empty string can be encoded as $0\r\n\r\n. Let’s write a Python implementation for that:

1async def write_string_value(connection: socket.socket, value: bytes):
2    loop = asyncio.get_event_loop()
3    await loop.sock_sendall(connection, b'$')
4    await loop.sock_sendall(connection, bytes(str(len(value)), 'ascii'))
5    await loop.sock_sendall(connection, b'\r\n')
6    await loop.sock_sendall(connection, value)
7    await loop.sock_sendall(connection, b'\r\n')

Interestingly, there is a special value in Redis called Null. Null means nothing. And interestingly, we can encode Null by seeing them as a string with -1 length. So we can encode a Null value as: $-1\r\n. Let’s code that:

1async def write_null_value(connection: socket.socket):
2    loop = asyncio.get_event_loop()
3    connection.sendall(b'$-1\r\n')

To encode a number, we need to use : character prefix, followed by the number in text format, and followed by \r\n characters. So a number 1023 can be encoded into :1023\r\n. We can also encode a negative number as it is. For example, a number -10 can be encoded into :-10\r\n. Let’s write a Python implementation to encode numbers:

1async def write_number_value(connection: socket.socket, value: int):
2    loop = asyncio.get_event_loop()
3    await loop.sock_sendall(connection, b':')
4    await loop.sock_sendall(connection, bytes(str(value), 'ascii'))
5    await loop.sock_sendall(connection, b'\r\n')

Next, let’s encode an error type. Errors are quite similar to Strings, they basically just a bunch of characters. But, unlike string, errors has two things in it: the error code and error message. Both the error code and the error message are just a simple string. To encode an error type, we can use - prefix, followed by the error code, followed by a space character, followed by error message, and followed by a \r\n. Let’s write Python implementation to encode errors:

1async def write_error_value(connection: socket.socket, value: Error):
2    loop = asyncio.get_event_loop()
3    await loop.sock_sendall(connection, b'-')
4    await loop.sock_sendall(connection, bytes(value.code, 'ascii'))
5    await loop.sock_sendall(connection, b' ')
6    await loop.sock_sendall(connection, bytes(value.message, 'ascii'))
7    await loop.sock_sendall(connection, b'\r\n')

Finally, let’s encode an array type. Arrays are basically just a collection of multiple values. An Array can also contains another Arrays inside them. Redis encodes Array by specifying it’s length, followed by the encoded value of its elements. To be precise, to encodes an array, we need to write a * prefix, followed by the length of the Array in text format, followed by a \r\n, and followed by the encoded value of its element. We can implement Array encoding recursively like this:

 1from typing import List, Any
 2
 3
 4async def write_array_value(connection: socket.socket, value: List[Any]):
 5    loop = asyncio.get_event_loop()
 6    await loop.sock_sendall(connection, b'*')
 7    await loop.sock_sendall(connection, bytes(str(len(value)), 'ascii'))
 8    await loop.sock_sendall(connection, b'\r\n')
 9    for element in value:
10        await write_value(connection, element)

Now that we have a way to encode various types of Values, let’s write a single function to encode a Value of any type. But before that, we need to know that Redis has more types of Values. We are only implementing a few types that we need to make this simple server works. In our implementation, we will just encode an error if we don’t recognized the Value type.

 1async def write_value(connection: socket.socket, value: Any):
 2    if value is None:
 3        await write_null_value(connection)
 4    elif isinstance(value, bytes):
 5        await write_string_value(connection, value)
 6    elif isinstance(value, int):
 7        await write_number_value(connection, value)
 8    elif isinstance(value, list):
 9        await write_array_value(connection, value)
10    elif isinstance(value, Error):
11        await write_error_value(connection, value)
12    else:
13        logging.error(
14            "Found unrecognized value with type (%s): %s", type(value), value
15        )
16        value = Error(code="ERR", message="internal server error")
17        await write_error_value(connection, value)

Decoding

Now, we have the ability to write any values to Redis client. But, we also need to read a value from a Redis client. To do that, we need to write the decoding logic to decode Redis Values. The decoding logic basically just the inverse of the encoding logic. Let’s implement decoding logic in Python:

 1from typing import List, Any, Union
 2
 3
 4async def read_value(connection: socket.socket) -> Any:
 5    loop = asyncio.get_event_loop()
 6
 7    marker = await loop.sock_recv(connection, 1)
 8    if marker == b'*':
 9        return await read_array(connection)
10    elif marker == b'$':
11        return await read_bulk_string(connection)
12    elif marker == b'+':
13        return await read_simple_string(connection)
14    elif marker == b':':
15        return await read_number(connection)
16    elif marker == b'':
17        raise EOFError
18
19
20async def read_array(connection: socket.socket) -> List[Any]:
21    array_len = await read_number(connection)
22    array_values = [None] * array_len
23    for i in range(array_len):
24        array_values[i] = await read_value(connection)
25    return array_values
26
27
28async def read_bulk_string(connection: socket.socket) -> Union[None, bytes]:
29    loop = asyncio.get_event_loop()
30
31    str_len = await read_number(connection)
32    if str_len < 0:
33        return None
34    str_value = await loop.sock_recv(connection, str_len)
35
36    b = await loop.sock_recv(connection, 2)
37    assert (b == b'\r\n')
38
39    return str_value
40
41
42async def read_simple_string(connection: socket.socket) -> bytes:
43    return await read_until_newline(connection)
44
45
46async def read_number(connection: socket.socket) -> int:
47    return int(await read_until_newline(connection))
48
49
50async def read_until_newline(connection: socket.socket) -> bytes:
51    loop = asyncio.get_event_loop()
52
53    result = bytes()
54    while True:
55        chr = await loop.sock_recv(connection, 1)
56        if chr == b'\r':
57            break
58        result += chr
59
60    b = await loop.sock_recv(connection, 1)
61    assert (b == b'\n')
62
63    return result

Command Processing

Now that we have the encode and decode logics, let’s write the actual client handler. Currently, our handler is very simple, we just read any bytes from client, and write back the same bytes back to the client. The Redis server works in request-response pattern, the client send a Redis Value as a command, and the server should write back a Redis Value as the response. Let’s write a simple client handler to read the command from the client. We won’t be processing the command yet, we just want to see what’s the command looks like.

 1async def handle_client(connection: socket.socket, client: socket.AddressInfo):
 2    while True:
 3        try:
 4            await handle_client_request(connection)
 5        except EOFError:
 6            logging.info("Client is disconnected: %s", client)
 7            return
 8
 9
10async def handle_client_request(connection: socket.socket):
11    request = await read_value(connection)
12    print(request)

Let’s test our server implementation. Redis has a CLI tool called redis-cli that we can use to talk with our server. Let’s run our sever and redis-cli to test it:

1$ redis-cli -h 127.0.0.1 -p 5000

If you look at your server, you will see something like this:

1INFO:root:Server is listening to: 0.0.0.0:5000
2INFO:root:New client is connected: ('127.0.0.1', 45184)
3[b'COMMAND']

In the third line, you’ll see an array with "COMMAND" string in it. When Redis client connect to a Redis server, they will send a command called COMMAND. Yes, there is a command called COMMAND. This COMMAND command is used to fetch all the supported commands from the server. As you can see, Redis uses Arrays to represents a comamnd. The first element of the Array is the command name, and the rest are the command arguments. Next, let’s write the implementation for COMMAND command.

COMMAND Command

The details of COMMAND command are documented here in the Redis documentation. We need to return a list of all commands we supported, in this case we only supports 4 types of commands: COMMAND, GET, SET, and SELECT. A command is represented as an array with this specification:

  1. Name
  2. Arity
  3. Flags
  4. First key
  5. Last key
  6. Step
  7. ACL categories (as of Redis 6.0)
  8. Tips (as of Redis 7.0)
  9. Key specifications (as of Redis 7.0)
  10. Subcommands (as of Redis 7.0)

The first element of the command is the command name. The second element is the number of command arguments. Some commands can have infinite number of arguments and need special handling, but we are not going to implement that. The third element is the command flags. Command flags tells us various information of the command such as: whether the command is slow, whether the command is readonly, whether the command is for processing string, etc. The 4-th, 5-th and 6-th elements are related to each other. They represent the argument index of the first key, the argument index of the last key, and the number of step to get into the next key.

For simplicity reasons, we are going to use only six of them. We are not going to implement the ACL, Tips, Key specifications and Subcommands. And we are not going to support multiple keys.

Since the return values of the COMMAND command is constant, we can just represent it as a single global variable for the response.

 1@dataclasses.dataclass
 2class Command:
 3    name: str
 4    arguments_len: int
 5    flags: List[str]
 6    first_key: int
 7    last_key: int
 8    key_step: int
 9
10commands = {
11    b"COMMAND": Command(
12        name=b"COMMAND",
13        arguments_len=1,
14        flags=[b"readonly", b"random"],
15        first_key=1,
16        last_key=1,
17        key_step=1,
18    ),
19    b"GET": Command(
20        name=b"GET",
21        arguments_len=2,
22        flags=[b"readonly", b"random"],
23        first_key=1,
24        last_key=1,
25        key_step=1,
26    ),
27    b"SET": Command(
28        name=b"SET",
29        arguments_len=1,
30        flags=[b"write", b"string", b"slow"],
31        first_key=1,
32        last_key=1,
33        key_step=1,
34    ),
35    b"SELECT": Command(
36        name=b"SELECT",
37        arguments_len=2,
38        flags=[b"write", b"string", b"slow"],
39        first_key=0,
40        last_key=0,
41        key_step=0,
42    )
43}
44
45
46async def handle_command_command() -> Any:
47    global commands
48
49    return [
50        [command.name, command.arguments_len, command.flags,
51            command.first_key, command.last_key, command.key_step]
52        for command in commands.values()
53    ]

Then, let’s connect our handle_command_command to our handle_client_req:

 1async def handle_client_request(connection: socket.socket):
 2    request = await read_value(connection)
 3
 4    if not isinstance(request, list):
 5        logging.warn("Invalid request from client: %s", request)
 6        return
 7    if len(request) == 0:
 8        logging.warn("Invalid request from client: %s", request)
 9        return
10    if not isinstance(request[0], bytes):
11        logging.warn("Invalid request from client: %s", request)
12        return
13
14    logging.debug("Got request: %s", request)
15
16    command, args = request[0].upper(), request[1:]
17    if command == b'COMMAND':
18        result = await handle_command_command()
19    else:
20        msg = "unknown command `{}`, with args beginning with: {}".format(
21            command, args)
22        result = Error(code="ERR", message=msg)
23
24    await write_value(connection, result)

Now, we have handled our COMMAND command. If you try running redis-cli again, you can try running some commands:

1$ redis-cli -h 127.0.0.1 -p 5000
2127.0.0.1:5000> GET 1
3(error) ERR unknown command `b'GET'`, with args beginning with: [b'1']
4127.0.0.1:5000> SET a b
5(error) ERR unknown command `b'SET'`, with args beginning with: [b'a', b'b']

SELECT Command

Redis server was designed to have multiple databases running at the same time. Normally, there is 16 database numbered from 0 to 15. When a client connects to Redis server, they will use database 0 by default. The client can use SELECT command to change database by passing the database number. The selected database is local to every client, which means two different client can select two different databases. To implement this, we need to save the current selected database for every client. Let’s create a data structure called Session to store any client-scoped information.

 1@dataclasses.dataclass
 2class Config:
 3    bind: str
 4    port: int
 5    databases: int
 6
 7
 8@dataclasses.dataclass
 9class Session:
10    selected_database: int = 0
11
12
13if __name__ == "__main__":
14    ...
15    config = Config(bind='0.0.0.0', port=5000, databases=16)
16    ...

Note that we also added databases field in the Config structure to configure the maximum number of databases.

Now, we have to create and pass the Session to our command handler.

 1async def run_server(config: Config):
 2    ...
 3    loop.create_task(handle_client(connection, client, config))
 4
 5
 6async def handle_client(conn: socket.socket, client: socket.AddressInfo, config: Config):
 7    session = Session()
 8    while True:
 9        ...
10        await handle_client_request(config, session, conn)
11        ...
12
13async def handle_client_request(config: Config, session: Session, connection: socket.socket):
14    ...

Now, let’s implement out SELECT comamnd handler. You can check the detailed specification of SELECT command here. Basically, it will just return a simple “OK” string if the operation succeded and returns error when the operation failed.

 1async def handle_client_request(config: Config, session: Session, connection: socket.socket):
 2    ...
 3    if command == b'COMMAND':
 4        result = await handle_command_command()
 5    elif command == b'SELECT':
 6        result = await handle_select_command(config, session, args)
 7    ...
 8
 9async def handle_select_command(config: Config, session: Session, args: Any) -> Any:
10    if not isinstance(args, list) or len(args) != 1:
11        return Error(code="ERR", message="wrong number of arguments for 'select' command")
12    target_db = args[0]
13    if not isinstance(target_db, bytes):
14        return Error(code="ERR", message="invalid DB index")
15    try:
16        target_db = int(target_db)
17    except:
18        return Error(code="ERR", message="invalid DB index")
19    if target_db >= config.databases:
20        return Error(code="ERR", message="DB index is out of range")
21
22    session.selected_database = target_db
23
24    return b'OK'

Now, if you run the redis-cli again, you can use SELECT command to change the database.

1$ redis-cli -h 127.0.0.1 -p 5000
2127.0.0.1:5000> SELECT 1
3"OK"
4127.0.0.1:5000[1]> SELECT 0
5"OK"
6127.0.0.1:5000> SELECT 16
7(error) ERR DB index is out of range
8127.0.0.1:5000> SELECT 1
9"OK"

SET Command

Let’s implement SET command. SET command is technically quite complex, but we will simplify it in our implementation. The details of Redis’ SET operation is documented here. Redis supports timeout in their SET command, but we won’t support that. We will just accept 2 arguments for SET, the key and the value, and then store it in our server. To store the key-value pairs sent by the clients, we need to have a hash map. Actually, we need to have multiple hash map, one for every database. Remember Redis supports multiple databases.

Let’s create a global variable to store all of our hashmaps. Additionally, let’s also create a global mutex to guard our hashmap for concurrent execution. We are also going to create a function init_database to initialize the databases and the locks.

 1databases: List[Dict[bytes, Any]] = None
 2locks: List[asyncio.Lock] = None
 3
 4
 5def init_database(config: Config):
 6    global databases
 7    global locks
 8
 9    databases = [{} for _ in range(config.databases)]
10    locks = [asyncio.Lock() for _ in range(config.databases)]
11
12
13if __name__ == "__main__":
14    ...
15    config = Config(...)
16    init_database(config)
17    ...

Now, let’s implement the SET command. Implementing SET command is pretty straightforward. What we need to do is just lock the current selected database and put the key and value to the Python dictionary.

 1async def handle_set_command(session: Session, args: Any) -> Any:
 2    global databases
 3    global locks
 4
 5    if not isinstance(args, list) or len(args) < 2:
 6        return Error(code="ERR", message="wrong number of arguments for 'set' command")
 7
 8    key, value = args[0], args[1]
 9    if not isinstance(key, bytes):
10        return Error(code="ERR", message="invalid key")
11    if not isinstance(value, bytes):
12        return Error(code="ERR", message="invalid value")
13
14    async with locks[session.selected_database]:
15        database = databases[session.selected_database]
16        database[key] = value
17
18    return b'OK'
19
20async def handle_client_request(...):
21    ...
22    if command == b'COMMAND':
23        ...
24    elif command == b'SET':
25        result = await handle_set_command(session, args)
26    ...

Now, if you run the redis-cli again, you can use SET command to put a key-value pair to the database:

 1$ redis-cli -h 127.0.0.1 -p 5000
 2127.0.0.1:5001> SET a b
 3"OK"
 4127.0.0.1:5001> SET a c
 5"OK"
 6127.0.0.1:5001> SET a
 7(error) ERR wrong number of arguments for 'set' command
 8127.0.0.1:5001> SET c
 9(error) ERR wrong number of arguments for 'set' command
10127.0.0.1:5001> SET
11(error) ERR wrong number of arguments for 'set' command

GET Command

Finally, let’s implement GET command. GET is quite simple, it accepts one argument which is the key to be fetched, and returns the value of the key.

 1async def handle_get_command(session: Session, args: Any) -> Any:
 2    global databases
 3    global locks
 4
 5    if not isinstance(args, list) or len(args) != 1:
 6        return Error(code="ERR", message="wrong number of arguments for 'get' command")
 7
 8    key = args[0]
 9    if not isinstance(key, bytes):
10        return Error(code="ERR", message="invalid key")
11
12    async with locks[session.selected_database]:
13        database = databases[session.selected_database]
14        return database.get(key)
15
16
17async def handle_client_request(...):
18    ...
19    if command == b'COMMAND':
20        ...
21    elif command == b'GET':
22        result = await handle_get_command(session, args)
23    ...

Now, you have a full implementation of Redis server that supports SET, GET and SELECT commands. You can test your server by using redis-cli:

 1$ redis-cli -h 127.0.0.1 -p 5000
 2127.0.0.1:5002> GET key1
 3(nil)
 4127.0.0.1:5002> SET key1 value1
 5"OK"
 6127.0.0.1:5002> GET key1
 7"value1"
 8127.0.0.1:5002> SELECT 1
 9"OK"
10127.0.0.1:5002[1]> GET key1
11(nil)
12127.0.0.1:5002[1]> SET key1 "value 1 from db 1"
13"OK"
14127.0.0.1:5002[1]> GET key1
15"value 1 from db 1"
16127.0.0.1:5002[1]> SELECT 0
17"OK"
18127.0.0.1:5002> GET key1
19"value1"
20127.0.0.1:5002> SELECT 1
21"OK"
22127.0.0.1:5002[1]> GET key1
23"value 1 from db 1"
24127.0.0.1:5002[1]>

You can find the full code in this gist. This implementation is no match with the actual Redis server implementation. The Redis server is written in C and very optimized for key-value store.