Getting started¶
Commands Pipelining¶
Commands pipelining is built-in.
Every command is sent to transport at-once
(ofcourse if no TypeError
/ValueError
was raised)
When you making a call with await
/ yield from
you will be waiting result,
and then gather results.
Simple example show both cases (get source code
):
# No pipelining;
async def wait_each_command():
val = await redis.get('foo') # wait until `val` is available
cnt = await redis.incr('bar') # wait until `cnt` is available
return val, cnt
# Sending multiple commands and then gathering results
async def pipelined():
fut1 = redis.get('foo') # issue command and return future
fut2 = redis.incr('bar') # issue command and return future
# block until results are available
val, cnt = await asyncio.gather(fut1, fut2)
return val, cnt
Note
For convenience aioredis
provides
pipeline()
method allowing to execute bulk of commands as one
(get source code
):
# Explicit pipeline async def explicit_pipeline(): pipe = redis.pipeline() fut1 = pipe.get('foo') fut2 = pipe.incr('bar') result = await pipe.execute() val, cnt = await asyncio.gather(fut1, fut2) assert result == [val, cnt] return val, cnt
Multi/Exec transactions¶
aioredis
provides several ways for executing transactions:
- when using raw connection you can issue
Multi
/Exec
commands manually; - when using
aioredis.Redis
instance you can usemulti_exec()
transaction pipeline.
multi_exec()
method creates and returns new
MultiExec
object which is used for buffering commands and
then executing them inside MULTI/EXEC block.
Here is a simple example
(get source code
):
1 2 3 4 5 6 7 | async def transaction():
tr = redis.multi_exec()
future1 = tr.set('foo', '123')
future2 = tr.set('bar', '321')
result = await tr.execute()
assert result == await asyncio.gather(future1, future2)
return result
|
As you can notice await
is only used at line 5 with tr.execute
and not with tr.set(...)
calls.
Warning
It is very important not to await
buffered command
(ie tr.set('foo', '123')
) as it will block forever.
The following code will block forever:
tr = redis.multi_exec()
await tr.incr('foo') # that's all. we've stuck!
Pub/Sub mode¶
aioredis
provides support for Redis Publish/Subscribe messaging.
To switch connection to subscribe mode you must execute subscribe
command
by yield’ing from subscribe()
it returns a list of
Channel
objects representing subscribed channels.
As soon as connection is switched to subscribed mode the channel will receive
and store messages
(the Channel
object is basically a wrapper around asyncio.Queue
).
To read messages from channel you need to use get()
or get_json()
coroutines.
Note
In Pub/Sub mode redis connection can only receive messages or issue (P)SUBSCRIBE / (P)UNSUBSCRIBE commands.
Pub/Sub example (get source code
):
sub = await aioredis.create_redis(
('localhost', 6379))
ch1, ch2 = await sub.subscribe('channel:1', 'channel:2')
assert isinstance(ch1, aioredis.Channel)
assert isinstance(ch2, aioredis.Channel)
async def async_reader(channel):
while await channel.wait_message():
msg = await channel.get(encoding='utf-8')
# ... process message ...
print("message in {}: {}".format(channel.name, msg))
tsk1 = asyncio.ensure_future(async_reader(ch1))
# Or alternatively:
async def async_reader2(channel):
while True:
msg = await channel.get(encoding='utf-8')
if msg is None:
break
# ... process message ...
print("message in {}: {}".format(channel.name, msg))
tsk2 = asyncio.ensure_future(async_reader2(ch2))
Pub/Sub example (get source code
):
async def reader(channel):
while (await channel.wait_message()):
msg = await channel.get(encoding='utf-8')
# ... process message ...
print("message in {}: {}".format(channel.name, msg))
if msg == STOPWORD:
return
with await pool as redis:
channel, = await redis.subscribe('channel:1')
await reader(channel) # wait for reader to complete
await redis.unsubscribe('channel:1')
# Explicit redis usage
redis = await pool.acquire()
try:
channel, = await redis.subscribe('channel:1')
await reader(channel) # wait for reader to complete
await redis.unsubscribe('channel:1')
finally:
pool.release(redis)
Python 3.5 async with
/ async for
support¶
aioredis
is compatible with PEP 492.
Pool
can be used with async with
(get source code
):
pool = await aioredis.create_pool(
('localhost', 6379))
async with pool.get() as conn:
value = await conn.get('my-key')
print('raw value:', value)
It also can be used with await
:
pool = await aioredis.create_pool(
('localhost', 6379))
# This is exactly the same as:
# with (yield from pool) as conn:
with await pool as conn:
value = await conn.get('my-key')
print('raw value:', value)
New scan
-family commands added with support of async for
(get source code
):
redis = await aioredis.create_redis(
('localhost', 6379))
async for key in redis.iscan(match='something*'):
print('Matched:', key)
async for name, val in redis.ihscan(key, match='something*'):
print('Matched:', name, '->', val)
async for val in redis.isscan(key, match='something*'):
print('Matched:', val)
async for val, score in redis.izscan(key, match='something*'):
print('Matched:', val, ':', score)
SSL/TLS support¶
Though Redis server does not support data encryption
it is still possible to setup Redis server behind SSL proxy. For such cases
aioredis
library support secure connections through asyncio
SSL support. See BaseEventLoop.create_connection for details.