API¶
Sources¶
|
Emit items from a number of Redis lists. |
|
Consume and emit messages from one or more Redis streams. |
|
Consume messages from one or more Redis streams as a member of a consumer group. |
-
class
streamz_redis.sources.from_redis_lists(keys: Union[list, str], client_params: dict = None, timeout: int = 0, left: bool = True, **kwargs)¶ Emit items from a number of Redis lists. Uses
BLPOP, so the lists are used as a volatile queue, FIFO or FILO depending on the combination of how the elements are added to the list andleftparameter.Items are emitted as tuples of (list-name, item).
Note that if there is a crash, there’s no way to retrieve unprocessed items that were popped from the list. If you need durability, consider using
from_redis_consumer_group.-
__init__(keys: Union[list, str], client_params: dict = None, timeout: int = 0, left: bool = True, **kwargs)¶ - Parameters
keys (str or list-like) – One or more Redis lists to read from.
client_params (dict) – Parameters the will be passed to
redis-pyclient. Defaults to{}.timeout (int or float) – Number of seconds to wait if the list is empty. If
0, will block until new items are added to the list. Defaults to0.left (bool) – Use
BLPOPifTrue,BRPOPotherwise. Defaults toTrue.**kwargs – Will be passed to
streamz.Source.
-
-
class
streamz_redis.sources.from_redis_streams(streams: Union[dict, str, list, tuple], client_params: dict = None, timeout: int = 0, count: int = None, default_start_id: int = '$', convert: bool = True, encoding: str = 'UTF-8', **kwargs)¶ Consume and emit messages from one or more Redis streams.
-
__init__(streams: Union[dict, str, list, tuple], client_params: dict = None, timeout: int = 0, count: int = None, default_start_id: int = '$', convert: bool = True, encoding: str = 'UTF-8', **kwargs)¶ - Parameters
streams (dict, str, list or tuple) – A dict of
stream-name: message-id.message-idis an id to start consuming the messages from. The first call to .consume() will use this value, subsequent calls will only return new messages. Non-existing streams will be created when the class is instantiated. Alternatively, can bestrfor a single stream, orlist/tuplefor multiple streams. In these cases message-id is equal todefault_start_id.client_params (dict) – Parameters the will be passed to
redis-pyclient. Defaults to{}.timeout (int or float) – Number of seconds to wait if there are no new messages in the stream. If there are none, this is effectively like a polling interval. Defaults to 0 (wait indefinitely).
count (int) – Number of items to emit at a time. If None, all available items are emitted. Defaults to None.
default_start_id (str) – In cases when
streamsisn’t a dict, this is the default starting message-id that’s used. Defaults to “$”, so will start reading only new messages. Another possible value is “0” to start reading from the beginning of the stream.convert (bool) – Convert
bytesin the messages tostr. Defaults to True.encoding (str) – This is the encoding that will be used to convert
bytestostrifconvertis True. Defaults to “UTF-8”.**kwargs – Will be passed to
streamz.Source.
-
-
class
streamz_redis.sources.from_redis_consumer_group(streams: dict, group_name: str, consumer_name: str, client_params: dict = None, timeout: int = 0, count: int = None, replay_pending: bool = True, heartbeat_interval: int = None, claim_timeout: int = None, **kwargs)¶ Consume messages from one or more Redis streams as a member of a consumer group.
-
__init__(streams: dict, group_name: str, consumer_name: str, client_params: dict = None, timeout: int = 0, count: int = None, replay_pending: bool = True, heartbeat_interval: int = None, claim_timeout: int = None, **kwargs)¶ - Parameters
streams (dict, str, list or tuple) –
A dict of
stream-name: message-id.message-idis an id to start consuming the messages from. The first call to.consume()will use this value, subsequent calls will only return new messages. Non-existing streams will be created when the class is instantiated.Alternatively, can be
strfor a single stream, orlist/tuplefor multiple streams. In these cases message-id is presumed to be0.group_name (str) – Name of Redis consumer group. The group will be created for each stream in
streams.consumer_name (str) – Name of Redis consumer in the consumer group.
client_params (dict) – Parameters the will be passed to
redis-pyclient. Defaults to{}.timeout (int or float) – Number of seconds to wait if there are no new messages in the stream. Defaults to 0 (wait indefinitely).
count (int) – Number of items to emit at a time. If None, all available items are emitted. Defaults to None.
replay_pending (bool) – Retrieve messages from PEL (pending entry list) when started. Defaults to True.
heartbeat_interval (int) – Interval at which this source will send heartbeats to the group’s pub/sub channel. Defaults to None (heartbeats are turned off).
claim_timeout (int) – Number of seconds after which a consumer is considered dead and other consumers are free to steal its unacknowledged messages. The source will not steal messages if it’s not sending heartbeats (the default). Defaults to 10.
convert (bool) – Convert
bytesin the messages tostr. Defaults to True.encoding (str) – This is the encoding that will be used to convert
bytestostrifconvertis True. Defaults to “UTF-8”.**kwargs – Will be passed to
streamz.Source.
-