Getting started

Commands Pipelining

Commands pipelining is built-in.

Every command is sent to transport at-once (ofcourse if no TypeError/ValueError was raised)

When you making a call with await / yield from you will be waiting result, and then gather results.

Simple example show both cases (get source code):

# No pipelining;
async def wait_each_command():
    val = await redis.get('foo')    # wait until `val` is available
    cnt = await redis.incr('bar')   # wait until `cnt` is available
    return val, cnt

# Sending multiple commands and then gathering results
async def pipelined():
    fut1 = redis.get('foo')      # issue command and return future
    fut2 = redis.incr('bar')     # issue command and return future
    # block until results are available
    val, cnt = await asyncio.gather(fut1, fut2)
    return val, cnt

Note

For convenience aioredis provides pipeline() method allowing to execute bulk of commands as one (get source code):

# Explicit pipeline
async def explicit_pipeline():
    pipe = redis.pipeline()
    fut1 = pipe.get('foo')
    fut2 = pipe.incr('bar')
    result = await pipe.execute()
    val, cnt = await asyncio.gather(fut1, fut2)
    assert result == [val, cnt]
    return val, cnt

Multi/Exec transactions

aioredis provides several ways for executing transactions:

  • when using raw connection you can issue Multi/Exec commands manually;
  • when using aioredis.Redis instance you can use multi_exec() transaction pipeline.

multi_exec() method creates and returns new MultiExec object which is used for buffering commands and then executing them inside MULTI/EXEC block.

Here is a simple example (get source code):

1
2
3
4
5
6
7
async def transaction():
    tr = redis.multi_exec()
    future1 = tr.set('foo', '123')
    future2 = tr.set('bar', '321')
    result = await tr.execute()
    assert result == await asyncio.gather(future1, future2)
    return result

As you can notice await is only used at line 5 with tr.execute and not with tr.set(...) calls.

Warning

It is very important not to await buffered command (ie tr.set('foo', '123')) as it will block forever.

The following code will block forever:

tr = redis.multi_exec()
await tr.incr('foo')   # that's all. we've stuck!

Pub/Sub mode

aioredis provides support for Redis Publish/Subscribe messaging.

To switch connection to subscribe mode you must execute subscribe command by yield’ing from subscribe() it returns a list of Channel objects representing subscribed channels.

As soon as connection is switched to subscribed mode the channel will receive and store messages (the Channel object is basically a wrapper around asyncio.Queue). To read messages from channel you need to use get() or get_json() coroutines.

Note

In Pub/Sub mode redis connection can only receive messages or issue (P)SUBSCRIBE / (P)UNSUBSCRIBE commands.

Pub/Sub example (get source code):

sub = await aioredis.create_redis(
     ('localhost', 6379))

ch1, ch2 = await sub.subscribe('channel:1', 'channel:2')
assert isinstance(ch1, aioredis.Channel)
assert isinstance(ch2, aioredis.Channel)

async def async_reader(channel):
    while await channel.wait_message():
        msg = await channel.get(encoding='utf-8')
        # ... process message ...
        print("message in {}: {}".format(channel.name, msg))

tsk1 = asyncio.ensure_future(async_reader(ch1))

# Or alternatively:

async def async_reader2(channel):
    while True:
        msg = await channel.get(encoding='utf-8')
        if msg is None:
            break
        # ... process message ...
        print("message in {}: {}".format(channel.name, msg))

tsk2 = asyncio.ensure_future(async_reader2(ch2))

Pub/Sub example (get source code):

async def reader(channel):
    while (await channel.wait_message()):
        msg = await channel.get(encoding='utf-8')
        # ... process message ...
        print("message in {}: {}".format(channel.name, msg))

        if msg == STOPWORD:
            return

with await pool as redis:
    channel, = await redis.subscribe('channel:1')
    await reader(channel)  # wait for reader to complete
    await redis.unsubscribe('channel:1')

# Explicit redis usage
redis = await pool.acquire()
try:
    channel, = await redis.subscribe('channel:1')
    await reader(channel)  # wait for reader to complete
    await redis.unsubscribe('channel:1')
finally:
    pool.release(redis)

Python 3.5 async with / async for support

aioredis is compatible with PEP 492.

Pool can be used with async with (get source code):

pool = await aioredis.create_pool(
    ('localhost', 6379))
async with pool.get() as conn:
    value = await conn.get('my-key')
    print('raw value:', value)

It also can be used with await:

pool = await aioredis.create_pool(
    ('localhost', 6379))
# This is exactly the same as:
#   with (yield from pool) as conn:
with await pool as conn:
    value = await conn.get('my-key')
    print('raw value:', value)

New scan-family commands added with support of async for (get source code):

redis = await aioredis.create_redis(
    ('localhost', 6379))

async for key in redis.iscan(match='something*'):
    print('Matched:', key)

async for name, val in redis.ihscan(key, match='something*'):
    print('Matched:', name, '->', val)

async for val in redis.isscan(key, match='something*'):
    print('Matched:', val)

async for val, score in redis.izscan(key, match='something*'):
    print('Matched:', val, ':', score)

SSL/TLS support

Though Redis server does not support data encryption it is still possible to setup Redis server behind SSL proxy. For such cases aioredis library support secure connections through asyncio SSL support. See BaseEventLoop.create_connection for details.