Old versions of Kafka kept consumer offset information in ZooKeeper. Newer versions keep it on the Broker side.
For the newer versions of Kafka, you need to compare the current offset with the latest offset in each partition.
If your topic has only one partition, then this is what you are looking for:
consumer = KafkaConsumer('mytopic') for input in consumer: behind = int(consumer.highwater(consumer.assignment().pop())) - int(input.offset) - 1 print("Behind by " + str(behind) + " messages.")</pre>
input.offset is the offset of the current message you are processing. consumer.highwater() returns the latest offset in each partition. For input, the first partition found in consumer.assignment() is used. The minus one is because the consumer.highwater() returns the next offset that will be assigned.
If you are consuming more than one partition (or topic), you’ll need to keep track of where your are in each topic/partition during normal processing. This will involve recording input.offset into a dict of input.topic and input.partition. Then when you want to figure out how far you are behind, use consumer.highwater() for each partition in the consumer.assignment() set to find out what the latest offsets are in each partition, and do the math.