Recipes¶
Below are some useful patterns and starter scripts to get you familiar with aioredis’s interface.
High-level Client (Commands)¶
import asyncio
import aioredis
async def main():
# Redis client bound to single connection (no auto reconnection).
redis = aioredis.from_url(
"redis://localhost", encoding="utf-8", decode_responses=True
)
async with redis.client() as conn:
await conn.set("my-key", "value")
val = await conn.get("my-key")
print(val)
async def redis_pool():
# Redis client bound to pool of connections (auto-reconnecting).
redis = aioredis.from_url(
"redis://localhost", encoding="utf-8", decode_responses=True
)
await redis.set("my-key", "value")
val = await redis.get("my-key")
print(val)
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(redis_pool())
Transactions (Pipeline + Multi/Exec)¶
import asyncio
import aioredis
async def main():
redis = aioredis.from_url("redis://localhost")
await redis.delete("foo", "bar")
async with redis.pipeline(transaction=True) as pipe:
res = await pipe.incr("foo").incr("bar").execute()
print(res)
if __name__ == "__main__":
asyncio.run(main())
Pub/Sub¶
import asyncio
import async_timeout
import aioredis
STOPWORD = "STOP"
async def pubsub():
redis = aioredis.Redis.from_url(
"redis://localhost", max_connections=10, decode_responses=True
)
psub = redis.pubsub()
async def reader(channel: aioredis.client.PubSub):
while True:
try:
async with async_timeout.timeout(1):
message = await channel.get_message(ignore_subscribe_messages=True)
if message is not None:
print(f"(Reader) Message Received: {message}")
if message["data"] == STOPWORD:
print("(Reader) STOP")
break
await asyncio.sleep(0.01)
except asyncio.TimeoutError:
pass
async with psub as p:
await p.subscribe("channel:1")
await reader(p) # wait for reader to complete
await p.unsubscribe("channel:1")
# closing all open connections
await psub.close()
async def main():
tsk = asyncio.create_task(pubsub())
async def publish():
pub = aioredis.Redis.from_url("redis://localhost", decode_responses=True)
while not tsk.done():
# wait for clients to subscribe
while True:
subs = dict(await pub.pubsub_numsub("channel:1"))
if subs["channel:1"] == 1:
break
await asyncio.sleep(0)
# publish some messages
for msg in ["one", "two", "three"]:
print(f"(Publisher) Publishing Message: {msg}")
await pub.publish("channel:1", msg)
# send stop word
await pub.publish("channel:1", STOPWORD)
await pub.close()
await publish()
if __name__ == "__main__":
import os
if "redis_version:2.6" not in os.environ.get("REDIS_VERSION", ""):
asyncio.run(main())
SCAN¶
import asyncio
import aioredis
async def main():
"""Scan command example."""
redis = aioredis.from_url("redis://localhost")
await redis.mset({"key:1": "value1", "key:2": "value2"})
async with redis.client() as conn:
cur = b"0" # set initial cursor to 0
while cur:
cur, keys = await conn.scan(cur, match="key:*")
print("Iteration results:", keys)
if __name__ == "__main__":
import os
if "redis_version:2.6" not in os.environ.get("REDIS_VERSION", ""):
asyncio.run(main())
Redis Sentinel Client¶
import asyncio
import aioredis.sentinel
async def main():
sentinel_client = aioredis.sentinel.Sentinel([("localhost", 26379)])
master_redis: aioredis.Redis = sentinel_client.master_for("mymaster")
info = await master_redis.sentinel_master("mymaster")
print("Master role:", info)
if __name__ == "__main__":
asyncio.run(main())
Low-level Connection Usage¶
import asyncio
import aioredis
async def main():
# Create a redis client bound to a connection pool.
redis = aioredis.from_url(
"redis://localhost", encoding="utf-8", decode_responses=True
)
# get a redis client bound to a single connection.
async with redis.client() as conn:
ok = await conn.execute_command("set", "my-key", "some value")
assert ok is True
str_value = await conn.execute_command("get", "my-key")
assert str_value == "some value"
print("str value:", str_value)
# The connection is automatically release to the pool
async def main_single():
# Create a redis client with only a single connection.
redis = aioredis.Redis(
host="localhost",
encoding="utf-8",
decode_responses=True,
single_connection_client=True,
)
ok = await redis.execute_command("set", "my-key", "some value")
assert ok is True
str_value = await redis.execute_command("get", "my-key")
assert str_value == "some value"
print("str value:", str_value)
# the connection is automatically closed by GC.
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main_single())
Connection Pooling¶
import asyncio
import aioredis
async def main():
redis = aioredis.from_url("redis://localhost", max_connections=10)
await redis.execute_command("set", "my-key", "value")
val = await redis.execute_command("get", "my-key")
print("raw value:", val)
async def main_pool():
pool = aioredis.ConnectionPool.from_url("redis://localhost", max_connections=10)
redis = aioredis.Redis(connection_pool=pool)
await redis.execute_command("set", "my-key", "value")
val = await redis.execute_command("get", "my-key")
print("raw value:", val)
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main_pool())
Blocking Commands¶
import asyncio
import aioredis
async def blocking_commands():
# Redis client bound to pool of connections (auto-reconnecting).
redis = aioredis.Redis.from_url("redis://localhost")
async def get_message():
# Redis blocking commands block the connection they are on
# until they complete. For this reason, the connection must
# not be returned to the connection pool until we've
# finished waiting on future created by brpop(). To achieve
# this, 'await redis' acquires a dedicated connection from
# the connection pool and creates a new Redis command object
# using it. This object is a context manager and the
# connection will be released back to the pool at the end of
# the with block."
async with redis as r:
return await r.brpop("my-key")
future = asyncio.create_task(get_message())
await redis.lpush("my-key", "value")
await future
print(future.result())
# gracefully closing underlying connection
await redis.close()
if __name__ == "__main__":
asyncio.run(blocking_commands())