aioredis.pubsub
module¶
Module provides a Pub/Sub listener interface implementing multi-producers, single-consumer queue pattern.
-
class
aioredis.pubsub.
Receiver
(loop=None)¶ Multi-producers, single-consumer Pub/Sub queue.
Can be used in cases where a single consumer task must read messages from several different channels (where pattern subscriptions may not work well or channels can be added/removed dynamically).
Example use case:
>>> from aioredis.pubsub import Receiver >>> from aioredis.abc import AbcChannel >>> mpsc = Receiver(loop=loop) >>> async def reader(mpsc): ... async for channel, msg in mpsc.iter(): ... assert isinstance(channel, AbcChannel) ... print("Got {!r} in channel {!r}".format(msg, channel)) >>> asyncio.ensure_future(reader(mpsc)) >>> await redis.subscribe(mpsc.channel('channel:1'), ... mpsc.channel('channel:3')) ... mpsc.channel('channel:5')) >>> await redis.psubscribe(mpsc.pattern('hello')) >>> # publishing 'Hello world' into 'hello-channel' >>> # will print this message: Got b'Hello world' in channel b'hello-channel' >>> # when all is done: >>> await redis.unsubscribe('channel:1', 'channel:3', 'channel:5') >>> await redis.punsubscribe('hello') >>> mpsc.stop() >>> # any message received after stop() will be ignored.
To do: few words regarding exclusive channel usage.
-
channel
(name)¶ Create a channel.
Returns
_Sender
object implementingAbcChannel
.
-
channels
¶ Read-only channels dict.
-
coroutine
get
(*, encoding=None, decoder=None)¶ Wait for and return pub/sub message from one of channels.
Return value is either: * tuple of two elements: channel & message; * tuple of three elements: pattern channel, (target channel & message); * or None in case Receiver is stopped.
Raises: aioredis.ChannelClosedError – If listener is stopped and all messages have been received.
-
is_active
¶ Returns True if listener has any active subscription.
-
pattern
(pattern)¶ Create a pattern channel.
Returns
_Sender
object implementingAbcChannel
.
-
patterns
¶ Read-only patterns dict.
-
stop
()¶ Stop receiving messages.
All new messages after this call will be ignored, so you must call unsubscribe before stopping this listener.
-
coroutine
wait_message
()¶ Blocks until new message appear.
-
-
class
aioredis.pubsub.
_Sender
(receiver, name, is_pattern, *, loop)¶ Write-Only Channel.
Does not allow direct
.get()
calls.Bases:
aioredis.abc.AbcChannel
Not to be used directly, returned by
Receiver.channel()
orReceiver.pattern()
calls.