aioredis.pubsub
— Pub/Sub Tools Reference¶
Module provides a Pub/Sub listener interface implementing multi-producers, single-consumer queue pattern.
-
class
aioredis.pubsub.
Receiver
(loop=None, on_close=None)¶ Multi-producers, single-consumer Pub/Sub queue.
Can be used in cases where a single consumer task must read messages from several different channels (where pattern subscriptions may not work well or channels can be added/removed dynamically).
Example use case:
>>> from aioredis.pubsub import Receiver >>> from aioredis.abc import AbcChannel >>> mpsc = Receiver(loop=loop) >>> async def reader(mpsc): ... async for channel, msg in mpsc.iter(): ... assert isinstance(channel, AbcChannel) ... print("Got {!r} in channel {!r}".format(msg, channel)) >>> asyncio.ensure_future(reader(mpsc)) >>> await redis.subscribe(mpsc.channel('channel:1'), ... mpsc.channel('channel:3')) ... mpsc.channel('channel:5')) >>> await redis.psubscribe(mpsc.pattern('hello')) >>> # publishing 'Hello world' into 'hello-channel' >>> # will print this message: Got b'Hello world' in channel b'hello-channel' >>> # when all is done: >>> await redis.unsubscribe('channel:1', 'channel:3', 'channel:5') >>> await redis.punsubscribe('hello') >>> mpsc.stop() >>> # any message received after stop() will be ignored.
Warning
Currently subscriptions implementation has few issues that will be solved eventually, but until then developer must be aware of the following:
Single
Receiver
instance can not be shared between two (or more) connections (or client instances) because any of them can close_Sender
.Several
Receiver
instances can not subscribe to the same channel or pattern. This is a flaw in subscription mode implementation: subsequent subscriptions to some channel always return first-created Channel object.
-
channel
(name)¶ Create a channel.
Returns
_Sender
object implementingAbcChannel
.
-
property
channels
¶ Read-only channels dict.
-
check_stop
(channel, exc=None)¶ TBD
-
property
is_active
¶ Returns True if listener has any active subscription.
-
iter
(*, encoding=None, decoder=None)¶ Returns async iterator.
Usage example:
>>> async for ch, msg in mpsc.iter(): ... print(ch, msg)
-
pattern
(pattern)¶ Create a pattern channel.
Returns
_Sender
object implementingAbcChannel
.
-
property
patterns
¶ Read-only patterns dict.
-
stop
()¶ Stop receiving messages.
All new messages after this call will be ignored, so you must call unsubscribe before stopping this listener.
-
class
aioredis.pubsub.
_Sender
(receiver, name, is_pattern)¶ Write-Only Channel.
Does not allow direct
.get()
calls.Bases:
aioredis.abc.AbcChannel
Not to be used directly, returned by
Receiver.channel()
orReceiver.pattern()
calls.