API

Sources

from_redis_lists(keys, str], client_params, …)

Emit items from a number of Redis lists.

from_redis_streams(streams, str, list, …)

Consume and emit messages from one or more Redis streams.

from_redis_consumer_group(streams, …)

Consume messages from one or more Redis streams as a member of a consumer group.

class streamz_redis.sources.from_redis_lists(keys: Union[list, str], client_params: dict = None, timeout: int = 0, left: bool = True, **kwargs)

Emit items from a number of Redis lists. Uses BLPOP, so the lists are used as a volatile queue, FIFO or FILO depending on the combination of how the elements are added to the list and left parameter.

Items are emitted as tuples of (list-name, item).

Note that if there is a crash, there’s no way to retrieve unprocessed items that were popped from the list. If you need durability, consider using from_redis_consumer_group.

__init__(keys: Union[list, str], client_params: dict = None, timeout: int = 0, left: bool = True, **kwargs)
Parameters
  • keys (str or list-like) – One or more Redis lists to read from.

  • client_params (dict) – Parameters the will be passed to redis-py client. Defaults to {}.

  • timeout (int or float) – Number of seconds to wait if the list is empty. If 0, will block until new items are added to the list. Defaults to 0.

  • left (bool) – Use BLPOP if True, BRPOP otherwise. Defaults to True.

  • **kwargs – Will be passed to streamz.Source.

class streamz_redis.sources.from_redis_streams(streams: Union[dict, str, list, tuple], client_params: dict = None, timeout: int = 0, count: int = None, default_start_id: int = '$', convert: bool = True, encoding: str = 'UTF-8', **kwargs)

Consume and emit messages from one or more Redis streams.

__init__(streams: Union[dict, str, list, tuple], client_params: dict = None, timeout: int = 0, count: int = None, default_start_id: int = '$', convert: bool = True, encoding: str = 'UTF-8', **kwargs)
Parameters
  • streams (dict, str, list or tuple) – A dict of stream-name: message-id. message-id is an id to start consuming the messages from. The first call to .consume() will use this value, subsequent calls will only return new messages. Non-existing streams will be created when the class is instantiated. Alternatively, can be str for a single stream, or list/tuple for multiple streams. In these cases message-id is equal to default_start_id.

  • client_params (dict) – Parameters the will be passed to redis-py client. Defaults to {}.

  • timeout (int or float) – Number of seconds to wait if there are no new messages in the stream. If there are none, this is effectively like a polling interval. Defaults to 0 (wait indefinitely).

  • count (int) – Number of items to emit at a time. If None, all available items are emitted. Defaults to None.

  • default_start_id (str) – In cases when streams isn’t a dict, this is the default starting message-id that’s used. Defaults to “$”, so will start reading only new messages. Another possible value is “0” to start reading from the beginning of the stream.

  • convert (bool) – Convert bytes in the messages to str. Defaults to True.

  • encoding (str) – This is the encoding that will be used to convert bytes to str if convert is True. Defaults to “UTF-8”.

  • **kwargs – Will be passed to streamz.Source.

class streamz_redis.sources.from_redis_consumer_group(streams: dict, group_name: str, consumer_name: str, client_params: dict = None, timeout: int = 0, count: int = None, replay_pending: bool = True, heartbeat_interval: int = None, claim_timeout: int = None, **kwargs)

Consume messages from one or more Redis streams as a member of a consumer group.

__init__(streams: dict, group_name: str, consumer_name: str, client_params: dict = None, timeout: int = 0, count: int = None, replay_pending: bool = True, heartbeat_interval: int = None, claim_timeout: int = None, **kwargs)
Parameters
  • streams (dict, str, list or tuple) –

    A dict of stream-name: message-id. message-id is an id to start consuming the messages from. The first call to .consume() will use this value, subsequent calls will only return new messages. Non-existing streams will be created when the class is instantiated.

    Alternatively, can be str for a single stream, or list/tuple for multiple streams. In these cases message-id is presumed to be 0.

  • group_name (str) – Name of Redis consumer group. The group will be created for each stream in streams.

  • consumer_name (str) – Name of Redis consumer in the consumer group.

  • client_params (dict) – Parameters the will be passed to redis-py client. Defaults to {}.

  • timeout (int or float) – Number of seconds to wait if there are no new messages in the stream. Defaults to 0 (wait indefinitely).

  • count (int) – Number of items to emit at a time. If None, all available items are emitted. Defaults to None.

  • replay_pending (bool) – Retrieve messages from PEL (pending entry list) when started. Defaults to True.

  • heartbeat_interval (int) – Interval at which this source will send heartbeats to the group’s pub/sub channel. Defaults to None (heartbeats are turned off).

  • claim_timeout (int) – Number of seconds after which a consumer is considered dead and other consumers are free to steal its unacknowledged messages. The source will not steal messages if it’s not sending heartbeats (the default). Defaults to 10.

  • convert (bool) – Convert bytes in the messages to str. Defaults to True.

  • encoding (str) – This is the encoding that will be used to convert bytes to str if convert is True. Defaults to “UTF-8”.

  • **kwargs – Will be passed to streamz.Source.

Sinks