aioredis.pubsub module

Module provides a Pub/Sub listener interface implementing multi-producers, single-consumer queue pattern.

class aioredis.pubsub.Receiver(loop=None)

Multi-producers, single-consumer Pub/Sub queue.

Can be used in cases where a single consumer task must read messages from several different channels (where pattern subscriptions may not work well or channels can be added/removed dynamically).

Example use case:

>>> from aioredis.pubsub import Receiver
>>> from aioredis.abc import AbcChannel
>>> mpsc = Receiver(loop=loop)
>>> async def reader(mpsc):
...     async for channel, msg in mpsc.iter():
...         assert isinstance(channel, AbcChannel)
...         print("Got {!r} in channel {!r}".format(msg, channel))
>>> asyncio.ensure_future(reader(mpsc))
>>> await redis.subscribe(mpsc.channel('channel:1'),
...                       mpsc.channel('channel:3'))
...                       mpsc.channel('channel:5'))
>>> await redis.psubscribe(mpsc.pattern('hello'))
>>> # publishing 'Hello world' into 'hello-channel'
>>> # will print this message:
Got b'Hello world' in channel b'hello-channel'
>>> # when all is done:
>>> await redis.unsubscribe('channel:1', 'channel:3', 'channel:5')
>>> await redis.punsubscribe('hello')
>>> mpsc.stop()
>>> # any message received after stop() will be ignored.

To do: few words regarding exclusive channel usage.

channel(name)

Create a channel.

Returns _Sender object implementing AbcChannel.

channels

Read-only channels dict.

coroutine get(*, encoding=None, decoder=None)

Wait for and return pub/sub message from one of channels.

Return value is either:

  • tuple of two elements: channel & message;
  • tuple of three elements: pattern channel, (target channel & message);
  • or None in case Receiver is stopped.
Raises:aioredis.ChannelClosedError – If listener is stopped and all messages have been received.
is_active

Returns True if listener has any active subscription.

iter(*, encoding=None, decoder=None)

Returns async iterator.

Usage example:

>>> async for ch, msg in mpsc.iter():
...     print(ch, msg)
pattern(pattern)

Create a pattern channel.

Returns _Sender object implementing AbcChannel.

patterns

Read-only patterns dict.

stop()

Stop receiving messages.

All new messages after this call will be ignored, so you must call unsubscribe before stopping this listener.

coroutine wait_message()

Blocks until new message appear.

class aioredis.pubsub._Sender(receiver, name, is_pattern, *, loop)

Write-Only Channel.

Does not allow direct .get() calls.

Bases: aioredis.abc.AbcChannel

Not to be used directly, returned by Receiver.channel() or Receiver.pattern() calls.