Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ The consumer works by launching a process per shard in the stream and then imple

consumer = KinesisConsumer(stream_name='my-stream')
for message in consumer:
print "Received message: {0}".format(message)
print("Received message: {0}".format(message))

Messages received from each of the shard processes are passed back to the main process through a Python Queue where they
are yielded for processing. Messages are not strictly ordered, but this is a property of Kinesis and not this
Expand All @@ -78,11 +78,32 @@ where in the stream we are currently reading from.

consumer = KinesisConsumer(stream_name='my-stream', state=DynamoDB(table_name='my-kinesis-state'))
for message in consumer:
print "Received message: {0}".format(message)
print("Received message: {0}".format(message))


The DynamoDB table must already exist and must have a ``HASH`` key of ``shard``, with type ``S`` (string).

A consumer state can also checkpointed with the stream offset, allowing stateful consumers to pickup where they left
off. Please note that use of this saved state is somewhat expensive, and its size and use should be minimized.

.. code-block:: python

from kinesis.consumer import KinesisConsumer
from kinesis.state import DynamoDB

consumer = KinesisConsumer(stream_name='my-stream', state=DynamoDB(table_name='my-kinesis-state'))

messages = consumer.items_with_state()
state, shard_id, message = next(messages)
while True:
if state is None:
state = {'count': 0}
state['count'] += 1

print("Received message: {0}, #{1}".format(message, state['count']))

state, shard_id, message = messages.send(state)


Producer
~~~~~~~~
Expand Down
24 changes: 20 additions & 4 deletions src/kinesis/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ class KinesisConsumer(object):
"""
LOCK_DURATION = 30

def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_time=None):
def __init__(self, stream_name, consumer_name=None, boto3_session=None, state=None, reader_sleep_time=None):
self.stream_name = stream_name
self.consumer_name = consumer_name
self.error_queue = multiprocessing.Queue()
self.record_queue = multiprocessing.Queue()

Expand All @@ -97,7 +98,10 @@ def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_tim
self.run = True

def state_shard_id(self, shard_id):
return '_'.join([self.stream_name, shard_id])
if self.consumer_name is None:
return '_'.join([self.stream_name, shard_id])
else:
return '_'.join([self.stream_name, self.consumer_name, shard_id])

def shutdown_shard_reader(self, shard_id):
try:
Expand Down Expand Up @@ -179,7 +183,7 @@ def shutdown(self):
self.shards = {}
self.run = False

def __iter__(self):
def items_with_state(self):
try:
# use lock duration - 1 here since we want to renew our lock before it expires
lock_duration_check = self.LOCK_DURATION - 1
Expand All @@ -194,14 +198,22 @@ def __iter__(self):
pass
else:
state_shard_id = self.state_shard_id(shard_id)

try:
c_state = self.state.get_consumer_state(state_shard_id)
except AttributeError:
# no self.state
c_state = None

for item in resp['Records']:
if not self.run:
break

log.debug(item)
yield item
c_state = yield c_state, shard_id, item

try:
self.state.set_consumer_state(state_shard_id, c_state)
self.state.checkpoint(state_shard_id, item['SequenceNumber'])
except AttributeError:
# no self.state
Expand All @@ -228,3 +240,7 @@ def __iter__(self):
self.run = False
finally:
self.shutdown()

def __iter__(self):
for _, _, item in self.items_with_state():
yield item
17 changes: 16 additions & 1 deletion src/kinesis/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,36 @@ def get_iterator_args(self, shard_id):
ShardIteratorType='LATEST'
)

def get_consumer_state(self, shard_id):
try:
return self.shards[shard_id]['consumer_state']
except KeyError:
return None

def set_consumer_state(self, shard_id, consumer_state):
try:
self.shards[shard_id]['consumer_state'] = consumer_state
except KeyError:
pass

def checkpoint(self, shard_id, seq):
fqdn = socket.getfqdn()
consumer_state = self.get_consumer_state(shard_id)

try:
# update the seq attr in our item
# ensure our fqdn still holds the lock and the new seq is bigger than what's already there
self.dynamo_table.update_item(
Key={'shard': shard_id},
UpdateExpression="set seq = :seq",
UpdateExpression="set seq = :seq, consumer_state = :consumer_state",
ConditionExpression="fqdn = :fqdn AND (attribute_not_exists(seq) OR seq < :seq)",
ExpressionAttributeValues={
':fqdn': fqdn,
':seq': seq,
':consumer_state': consumer_state,
}
)

except ClientError as exc:
if exc.response['Error']['Code'] in RETRY_EXCEPTIONS:
log.warn("Throttled while trying to read lock table in Dynamo: %s", exc)
Expand Down