diff --git a/README.rst b/README.rst index 1c6f7ad..90565e9 100644 --- a/README.rst +++ b/README.rst @@ -52,7 +52,7 @@ The consumer works by launching a process per shard in the stream and then imple consumer = KinesisConsumer(stream_name='my-stream') for message in consumer: - print "Received message: {0}".format(message) + print("Received message: {0}".format(message)) Messages received from each of the shard processes are passed back to the main process through a Python Queue where they are yielded for processing. Messages are not strictly ordered, but this is a property of Kinesis and not this @@ -78,11 +78,32 @@ where in the stream we are currently reading from. consumer = KinesisConsumer(stream_name='my-stream', state=DynamoDB(table_name='my-kinesis-state')) for message in consumer: - print "Received message: {0}".format(message) + print("Received message: {0}".format(message)) The DynamoDB table must already exist and must have a ``HASH`` key of ``shard``, with type ``S`` (string). +A consumer state can also checkpointed with the stream offset, allowing stateful consumers to pickup where they left +off. Please note that use of this saved state is somewhat expensive, and its size and use should be minimized. + +.. code-block:: python + + from kinesis.consumer import KinesisConsumer + from kinesis.state import DynamoDB + + consumer = KinesisConsumer(stream_name='my-stream', state=DynamoDB(table_name='my-kinesis-state')) + + messages = consumer.items_with_state() + state, shard_id, message = next(messages) + while True: + if state is None: + state = {'count': 0} + state['count'] += 1 + + print("Received message: {0}, #{1}".format(message, state['count'])) + + state, shard_id, message = messages.send(state) + Producer ~~~~~~~~ diff --git a/src/kinesis/consumer.py b/src/kinesis/consumer.py index d2a3d1b..423dc22 100644 --- a/src/kinesis/consumer.py +++ b/src/kinesis/consumer.py @@ -80,8 +80,9 @@ class KinesisConsumer(object): """ LOCK_DURATION = 30 - def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_time=None): + def __init__(self, stream_name, consumer_name=None, boto3_session=None, state=None, reader_sleep_time=None): self.stream_name = stream_name + self.consumer_name = consumer_name self.error_queue = multiprocessing.Queue() self.record_queue = multiprocessing.Queue() @@ -97,7 +98,10 @@ def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_tim self.run = True def state_shard_id(self, shard_id): - return '_'.join([self.stream_name, shard_id]) + if self.consumer_name is None: + return '_'.join([self.stream_name, shard_id]) + else: + return '_'.join([self.stream_name, self.consumer_name, shard_id]) def shutdown_shard_reader(self, shard_id): try: @@ -179,7 +183,7 @@ def shutdown(self): self.shards = {} self.run = False - def __iter__(self): + def items_with_state(self): try: # use lock duration - 1 here since we want to renew our lock before it expires lock_duration_check = self.LOCK_DURATION - 1 @@ -194,14 +198,22 @@ def __iter__(self): pass else: state_shard_id = self.state_shard_id(shard_id) + + try: + c_state = self.state.get_consumer_state(state_shard_id) + except AttributeError: + # no self.state + c_state = None + for item in resp['Records']: if not self.run: break log.debug(item) - yield item + c_state = yield c_state, shard_id, item try: + self.state.set_consumer_state(state_shard_id, c_state) self.state.checkpoint(state_shard_id, item['SequenceNumber']) except AttributeError: # no self.state @@ -228,3 +240,7 @@ def __iter__(self): self.run = False finally: self.shutdown() + + def __iter__(self): + for _, _, item in self.items_with_state(): + yield item diff --git a/src/kinesis/state.py b/src/kinesis/state.py index 9808fc4..d296bb1 100644 --- a/src/kinesis/state.py +++ b/src/kinesis/state.py @@ -32,21 +32,36 @@ def get_iterator_args(self, shard_id): ShardIteratorType='LATEST' ) + def get_consumer_state(self, shard_id): + try: + return self.shards[shard_id]['consumer_state'] + except KeyError: + return None + + def set_consumer_state(self, shard_id, consumer_state): + try: + self.shards[shard_id]['consumer_state'] = consumer_state + except KeyError: + pass + def checkpoint(self, shard_id, seq): fqdn = socket.getfqdn() + consumer_state = self.get_consumer_state(shard_id) try: # update the seq attr in our item # ensure our fqdn still holds the lock and the new seq is bigger than what's already there self.dynamo_table.update_item( Key={'shard': shard_id}, - UpdateExpression="set seq = :seq", + UpdateExpression="set seq = :seq, consumer_state = :consumer_state", ConditionExpression="fqdn = :fqdn AND (attribute_not_exists(seq) OR seq < :seq)", ExpressionAttributeValues={ ':fqdn': fqdn, ':seq': seq, + ':consumer_state': consumer_state, } ) + except ClientError as exc: if exc.response['Error']['Code'] in RETRY_EXCEPTIONS: log.warn("Throttled while trying to read lock table in Dynamo: %s", exc)