Skip to content

Recipes

Below are some useful patterns and starter scripts to get you familiar with aioredis’s interface.

High-level Client (Commands)

import asyncio

import aioredis


async def main():
    # Redis client bound to single connection (no auto reconnection).
    redis = aioredis.from_url(
        "redis://localhost", encoding="utf-8", decode_responses=True
    )
    async with redis.client() as conn:
        await conn.set("my-key", "value")
        val = await conn.get("my-key")
    print(val)


async def redis_pool():
    # Redis client bound to pool of connections (auto-reconnecting).
    redis = aioredis.from_url(
        "redis://localhost", encoding="utf-8", decode_responses=True
    )
    await redis.set("my-key", "value")
    val = await redis.get("my-key")
    print(val)


if __name__ == "__main__":
    asyncio.run(main())
    asyncio.run(redis_pool())

Transactions (Pipeline + Multi/Exec)

import asyncio

import aioredis


async def main():
    redis = aioredis.from_url("redis://localhost")
    await redis.delete("foo", "bar")
    async with redis.pipeline(transaction=True) as pipe:
        res = await pipe.incr("foo").incr("bar").execute()
    print(res)


if __name__ == "__main__":
    asyncio.run(main())

Pub/Sub

import asyncio

import async_timeout

import aioredis

STOPWORD = "STOP"


async def pubsub():
    redis = aioredis.Redis.from_url(
        "redis://localhost", max_connections=10, decode_responses=True
    )
    psub = redis.pubsub()

    async def reader(channel: aioredis.client.PubSub):
        while True:
            try:
                async with async_timeout.timeout(1):
                    message = await channel.get_message(ignore_subscribe_messages=True)
                    if message is not None:
                        print(f"(Reader) Message Received: {message}")
                        if message["data"] == STOPWORD:
                            print("(Reader) STOP")
                            break
                    await asyncio.sleep(0.01)
            except asyncio.TimeoutError:
                pass

    async with psub as p:
        await p.subscribe("channel:1")
        await reader(p)  # wait for reader to complete
        await p.unsubscribe("channel:1")

    # closing all open connections
    await psub.close()


async def main():
    tsk = asyncio.create_task(pubsub())

    async def publish():
        pub = aioredis.Redis.from_url("redis://localhost", decode_responses=True)
        while not tsk.done():
            # wait for clients to subscribe
            while True:
                subs = dict(await pub.pubsub_numsub("channel:1"))
                if subs["channel:1"] == 1:
                    break
                await asyncio.sleep(0)
            # publish some messages
            for msg in ["one", "two", "three"]:
                print(f"(Publisher) Publishing Message: {msg}")
                await pub.publish("channel:1", msg)
            # send stop word
            await pub.publish("channel:1", STOPWORD)
        await pub.close()

    await publish()


if __name__ == "__main__":
    import os

    if "redis_version:2.6" not in os.environ.get("REDIS_VERSION", ""):
        asyncio.run(main())

SCAN

import asyncio

import aioredis


async def main():
    """Scan command example."""
    redis = aioredis.from_url("redis://localhost")

    await redis.mset({"key:1": "value1", "key:2": "value2"})
    async with redis.client() as conn:
        cur = b"0"  # set initial cursor to 0
        while cur:
            cur, keys = await conn.scan(cur, match="key:*")
            print("Iteration results:", keys)


if __name__ == "__main__":
    import os

    if "redis_version:2.6" not in os.environ.get("REDIS_VERSION", ""):
        asyncio.run(main())

Redis Sentinel Client

import asyncio

import aioredis.sentinel


async def main():
    sentinel_client = aioredis.sentinel.Sentinel([("localhost", 26379)])

    master_redis: aioredis.Redis = sentinel_client.master_for("mymaster")
    info = await master_redis.sentinel_master("mymaster")
    print("Master role:", info)


if __name__ == "__main__":
    asyncio.run(main())

Low-level Connection Usage

import asyncio

import aioredis


async def main():
    # Create a redis client bound to a connection pool.
    redis = aioredis.from_url(
        "redis://localhost", encoding="utf-8", decode_responses=True
    )
    # get a redis client bound to a single connection.
    async with redis.client() as conn:
        ok = await conn.execute_command("set", "my-key", "some value")
        assert ok is True

        str_value = await conn.execute_command("get", "my-key")
        assert str_value == "some value"

        print("str value:", str_value)
    # The connection is automatically release to the pool


async def main_single():
    # Create a redis client with only a single connection.
    redis = aioredis.Redis(
        host="localhost",
        encoding="utf-8",
        decode_responses=True,
        single_connection_client=True,
    )
    ok = await redis.execute_command("set", "my-key", "some value")
    assert ok is True

    str_value = await redis.execute_command("get", "my-key")
    assert str_value == "some value"

    print("str value:", str_value)
    # the connection is automatically closed by GC.


if __name__ == "__main__":
    asyncio.run(main())
    asyncio.run(main_single())

Connection Pooling

import asyncio

import aioredis


async def main():
    redis = aioredis.from_url("redis://localhost", max_connections=10)
    await redis.execute_command("set", "my-key", "value")
    val = await redis.execute_command("get", "my-key")
    print("raw value:", val)


async def main_pool():
    pool = aioredis.ConnectionPool.from_url("redis://localhost", max_connections=10)
    redis = aioredis.Redis(connection_pool=pool)
    await redis.execute_command("set", "my-key", "value")
    val = await redis.execute_command("get", "my-key")
    print("raw value:", val)


if __name__ == "__main__":
    asyncio.run(main())
    asyncio.run(main_pool())

Blocking Commands

import asyncio

import aioredis


async def blocking_commands():
    # Redis client bound to pool of connections (auto-reconnecting).
    redis = aioredis.Redis.from_url("redis://localhost")

    async def get_message():
        # Redis blocking commands block the connection they are on
        # until they complete. For this reason, the connection must
        # not be returned to the connection pool until we've
        # finished waiting on future created by brpop(). To achieve
        # this, 'await redis' acquires a dedicated connection from
        # the connection pool and creates a new Redis command object
        # using it. This object is a context manager and the
        # connection will be released back to the pool at the end of
        # the with block."
        async with redis as r:
            return await r.brpop("my-key")

    future = asyncio.create_task(get_message())
    await redis.lpush("my-key", "value")
    await future
    print(future.result())

    # gracefully closing underlying connection
    await redis.close()


if __name__ == "__main__":
    asyncio.run(blocking_commands())