Skip to content

Getting Started

Installation

pip install aioredis

This will install aioredis, async-timeout.

With hiredis

pip install aioredis[hiredis]

Installing from Git

pip install git+https://github.com/aio-libs/aioredis@master#egg=aioredis

Connecting

import asyncio

import aioredis


async def main():
    redis = aioredis.from_url("redis://localhost")
    await redis.set("my-key", "value")
    value = await redis.get("my-key")
    print(value)


if __name__ == "__main__":
    asyncio.run(main())

aioredis.from_url creates a Redis client backed by a pool of connections. The only required argument is the URL, which should be string representing a TCP or UNIX socket address.

See the high-level API reference for a full list of supported commands.

Connecting to a Specific Database

There are two ways to specify a database index to set your connection pool to:

  1. Pass the index in as a keyword argument when initializing the client

    import aioredis
    
    redis = await aioredis.from_url("redis://localhost",  db=1)
    

  2. Pass the index as a path component in the URI

    import aioredis
    
    redis = await aioredis.from_url("redis://localhost/1")
    

Note

DB index specified in URI will take precedence over db keyword argument.

Connecting to an ACL-Protected Redis Instance

Similarly, the username/password can be specified via a keyword argument or via the URI. The values in the URI will always take precedence.

  1. Via keyword-arguments:

    import aioredis
    
    redis = await aioredis.from_url(
        "redis://localhost", username="user", password="sEcRet"
    )
    

  2. Via the AUTH section of the URI:

    import aioredis
    
    redis = await aioredis.from_url("redis://user:sEcRet@localhost/")
    

Response Decoding

By default aioredis will return bytes for most Redis commands that return string replies. Redis error replies are known to be valid UTF-8 strings so error messages are decoded automatically.

If you know that data in Redis is valid string you can tell aioredis to decode result by passing decode_responses=True in a command call:

import asyncio

import aioredis


async def main():
    redis = aioredis.from_url("redis://localhost")
    await redis.set("key", "string-value")
    bin_value = await redis.get("key")
    assert bin_value == b"string-value"
    redis = aioredis.from_url("redis://localhost", decode_responses=True)
    str_value = await redis.get("key")
    assert str_value == "string-value"

    await redis.close()


if __name__ == "__main__":
    asyncio.run(main())

By default, aioredis will automatically decode lists, hashes, sets, etc:

import asyncio

import aioredis


async def main():
    redis = aioredis.from_url("redis://localhost", decode_responses=True)

    await redis.hset("hash", mapping={"key1": "value1", "key2": "value2", "key3": 123})

    result = await redis.hgetall("hash")
    assert result == {
        "key1": "value1",
        "key2": "value2",
        "key3": "123",  # note that Redis returns int as string
    }

    await redis.close()


if __name__ == "__main__":
    asyncio.run(main())

Transactions (Multi/Exec)

import asyncio

import aioredis


async def main():
    redis = await aioredis.from_url("redis://localhost")
    async with redis.pipeline(transaction=True) as pipe:
        ok1, ok2 = await (pipe.set("key1", "value1").set("key2", "value2").execute())
    assert ok1
    assert ok2


if __name__ == "__main__":
    asyncio.run(main())

The aioredis.Redis.pipeline will return a aioredis.Pipeline object, which will buffer all commands in-memory and compile them into batches using the Redis Bulk String protocol. Additionally, each command will return the Pipeline instance, allowing you to chain your commands, i.e., p.set('foo', 1).set('bar', 2).mget('foo', 'bar').

The commands will not be reflected in Redis until execute() is called & awaited.

Usually, when performing a bulk operation, taking advantage of a “transaction” (e.g., Multi/Exec) is to be desired, as it will also add a layer of atomicity to your bulk operation.

Pub/Sub Mode

aioredis provides support for Redis Publish/Subscribe messaging.

Subscribing to specific channels:

import asyncio

import async_timeout

import aioredis

STOPWORD = "STOP"


async def reader(channel: aioredis.client.PubSub):
    while True:
        try:
            async with async_timeout.timeout(1):
                message = await channel.get_message(ignore_subscribe_messages=True)
                if message is not None:
                    print(f"(Reader) Message Received: {message}")
                    if message["data"].decode() == STOPWORD:
                        print("(Reader) STOP")
                        break
                await asyncio.sleep(0.01)
        except asyncio.TimeoutError:
            pass


async def main():
    redis = aioredis.from_url("redis://localhost")
    pubsub = redis.pubsub()
    await pubsub.subscribe("channel:1", "channel:2")

    future = asyncio.create_task(reader(pubsub))

    await redis.publish("channel:1", "Hello")
    await redis.publish("channel:2", "World")
    await redis.publish("channel:1", STOPWORD)

    await future


if __name__ == "__main__":
    asyncio.run(main())

Subscribing to channels matching a glob-style pattern:

import asyncio

import async_timeout

import aioredis

STOPWORD = "STOP"


async def reader(channel: aioredis.client.PubSub):
    while True:
        try:
            async with async_timeout.timeout(1):
                message = await channel.get_message(ignore_subscribe_messages=True)
                if message is not None:
                    print(f"(Reader) Message Received: {message}")
                    if message["data"].decode() == STOPWORD:
                        print("(Reader) STOP")
                        break
                await asyncio.sleep(0.01)
        except asyncio.TimeoutError:
            pass


async def main():
    redis = await aioredis.from_url("redis://localhost")
    pubsub = redis.pubsub()
    await pubsub.psubscribe("channel:*")

    future = asyncio.create_task(reader(pubsub))

    await redis.publish("channel:1", "Hello")
    await redis.publish("channel:2", "World")
    await redis.publish("channel:1", STOPWORD)

    await future


if __name__ == "__main__":
    asyncio.run(main())

Redis Sentinel Client

import asyncio

import aioredis.sentinel


async def main():
    sentinel = aioredis.sentinel.Sentinel([("localhost", 26379), ("sentinel2", 26379)])
    redis = sentinel.master_for("mymaster")

    ok = await redis.set("key", "value")
    assert ok
    val = await redis.get("key")
    assert val == b"value"


if __name__ == "__main__":
    asyncio.run(main())

The Sentinel client requires a list of Redis Sentinel addresses to connect to and start discovering services.

Calling aioredis.sentinel.Sentinel.master_for or aioredis.sentinel.Sentinel.slave_for methods will return Redis clients connected to specified services monitored by Sentinel.

Sentinel client will detect failover and reconnect Redis clients automatically.