Getting started

Installation

$ pip install aioredis

This will install aioredis along with its dependencies:

  • hiredis protocol parser;

  • async-timeout — used in Sentinel client.

Without dependencies

In some cases 1 you might need to install aioredis without hiredis, it is achievable with the following command:

$ pip install --no-deps aioredis async-timeout

Installing latest version from Git

$ pip install git+https://github.com/aio-libs/aioredis@master#egg=aioredis

Connecting

get source code

import asyncio
import aioredis


async def main():
    redis = await aioredis.create_redis_pool('redis://localhost')
    await redis.set('my-key', 'value')
    value = await redis.get('my-key', encoding='utf-8')
    print(value)

    redis.close()
    await redis.wait_closed()

asyncio.run(main())

aioredis.create_redis_pool() creates a Redis client backed by a pool of connections. The only required argument is the address of Redis server. Redis server address can be either host and port tuple (ex: ('localhost', 6379)), or a string which will be parsed into TCP or UNIX socket address (ex: 'unix://var/run/redis.sock', '//var/run/redis.sock', redis://redis-host-or-ip:6379/1).

Closing the client. Calling redis.close() and then redis.wait_closed() is strongly encouraged as this will methods will shutdown all open connections and cleanup resources.

See the commands reference for the full list of supported commands.

Connecting to specific DB

There are several ways you can specify database index to select on connection:

  1. explicitly pass db index as db argument:

    redis = await aioredis.create_redis_pool(
     'redis://localhost', db=1)
    
  2. pass db index in URI as path component:

    redis = await aioredis.create_redis_pool(
        'redis://localhost/2')
    

    Note

    DB index specified in URI will take precedence over db keyword argument.

  3. call select() method:

    redis = await aioredis.create_redis_pool(
        'redis://localhost/')
    await redis.select(3)
    

Connecting to password-protected Redis instance

The password can be specified either in keyword argument or in address URI:

redis = await aioredis.create_redis_pool(
    'redis://localhost', password='sEcRet')

redis = await aioredis.create_redis_pool(
    'redis://:sEcRet@localhost/')

redis = await aioredis.create_redis_pool(
    'redis://localhost/?password=sEcRet')

Note

Password specified in URI will take precedence over password keyword.

Also specifying both password as authentication component and query parameter in URI is forbidden.

# This will cause assertion error
await aioredis.create_redis_pool(
    'redis://:sEcRet@localhost/?password=SeCreT')

Result messages decoding

By default aioredis will return bytes for most Redis commands that return string replies. Redis error replies are known to be valid UTF-8 strings so error messages are decoded automatically.

If you know that data in Redis is valid string you can tell aioredis to decode result by passing keyword-only argument encoding in a command call:

get source code

import asyncio
import aioredis


async def main():
    redis = await aioredis.create_redis_pool('redis://localhost')
    await redis.set('key', 'string-value')
    bin_value = await redis.get('key')
    assert bin_value == b'string-value'

    str_value = await redis.get('key', encoding='utf-8')
    assert str_value == 'string-value'

    redis.close()
    await redis.wait_closed()

asyncio.run(main())

aioredis can decode messages for all Redis data types like lists, hashes, sorted sets, etc:

get source code

import asyncio
import aioredis


async def main():
    redis = await aioredis.create_redis_pool('redis://localhost')

    await redis.hmset_dict('hash',
                           key1='value1',
                           key2='value2',
                           key3=123)

    result = await redis.hgetall('hash', encoding='utf-8')
    assert result == {
        'key1': 'value1',
        'key2': 'value2',
        'key3': '123',  # note that Redis returns int as string
    }

    redis.close()
    await redis.wait_closed()

asyncio.run(main())

Multi/Exec transactions

get source code

import asyncio
import aioredis


async def main():
    redis = await aioredis.create_redis_pool('redis://localhost')

    tr = redis.multi_exec()
    tr.set('key1', 'value1')
    tr.set('key2', 'value2')
    ok1, ok2 = await tr.execute()
    assert ok1
    assert ok2

asyncio.run(main())

multi_exec() method creates and returns new MultiExec object which is used for buffering commands and then executing them inside MULTI/EXEC block.

Warning

It is very important not to await buffered command (ie tr.set('foo', '123')) as it will block forever.

The following code will block forever:

tr = redis.multi_exec()
await tr.incr('foo')   # that's all. we've stuck!

Pub/Sub mode

aioredis provides support for Redis Publish/Subscribe messaging.

To start listening for messages you must call either subscribe() or psubscribe() method. Both methods return list of Channel objects representing subscribed channels.

Right after that the channel will receive and store messages (the Channel object is basically a wrapper around asyncio.Queue). To read messages from channel you need to use get() or get_json() coroutines.

Example subscribing and reading channels:

get source code

import asyncio
import aioredis


async def main():
    redis = await aioredis.create_redis_pool('redis://localhost')

    ch1, ch2 = await redis.subscribe('channel:1', 'channel:2')
    assert isinstance(ch1, aioredis.Channel)
    assert isinstance(ch2, aioredis.Channel)

    async def reader(channel):
        async for message in channel.iter():
            print("Got message:", message)
    asyncio.get_running_loop().create_task(reader(ch1))
    asyncio.get_running_loop().create_task(reader(ch2))

    await redis.publish('channel:1', 'Hello')
    await redis.publish('channel:2', 'World')

    redis.close()
    await redis.wait_closed()

asyncio.run(main())

Subscribing and reading patterns:

get source code

import asyncio
import aioredis


async def main():
    redis = await aioredis.create_redis_pool('redis://localhost')

    ch, = await redis.psubscribe('channel:*')
    assert isinstance(ch, aioredis.Channel)

    async def reader(channel):
        async for ch, message in channel.iter():
            print("Got message in channel:", ch, ":", message)
    asyncio.get_running_loop().create_task(reader(ch))

    await redis.publish('channel:1', 'Hello')
    await redis.publish('channel:2', 'World')

    redis.close()
    await redis.wait_closed()

asyncio.run(main())

Sentinel client

get source code

import asyncio
import aioredis


async def main():
    sentinel = await aioredis.create_sentinel(
        ['redis://localhost:26379', 'redis://sentinel2:26379'])
    redis = sentinel.master_for('mymaster')

    ok = await redis.set('key', 'value')
    assert ok
    val = await redis.get('key', encoding='utf-8')
    assert val == 'value'

asyncio.run(main())

Sentinel client requires a list of Redis Sentinel addresses to connect to and start discovering services.

Calling master_for() or slave_for() methods will return Redis clients connected to specified services monitored by Sentinel.

Sentinel client will detect failover and reconnect Redis clients automatically.

See detailed reference here


1

Celery hiredis issues (#197, #317)