Adventures in Bash: Consume Last N messages from a kafka topic on the command line
This came up while I was helping a developer debug their service. The kafka shell tools are functional, but not super well documented or easy to use. This tail -n 10
type use case is insanely difficult to achieve with just the shell tools, especially if you want human readable timestamps on messages. I realize this code isn't the cleanest, but I had fun writing it and it works like a champ on our 0.10.x Kafka brokers.
Since this script uses the kafka.tools.GetOffsetShell
tool, which accepts timestamps as input it could somewhat easily be adapted to display all messages between two given timestamps.
https://gist.github.com/forestjohnsonpeoplenet/d56e6f6a0d1070bcd74362a76c9dafec
#!/bin/bash
if [ -z "$1" ]
then
echo "Missing first argument, zookeeper host port like: my-zookeeper-hostname:2181"
echo "Usage Example: ./topic-last-messages.sh my-zookeeper-hostname:2181 MY_TOPIC_NAME 10 America/Chicago"
exit 1
fi
if [ -z "$2" ]
then
echo "Missing second argument, topic name like: MY_TOPIC_NAME"
echo "Usage Example: ./topic-last-messages.sh my-zookeeper-hostname:2181 MY_TOPIC_NAME 10 America/Chicago"
exit 1
fi
if [ -z "$3" ]
then
echo "Missing third argument, number of messages to consume from each partition, like: 10"
echo "Usage Example: ./topic-last-messages.sh my-zookeeper-hostname:2181 MY_TOPIC_NAME 10 America/Chicago"
exit 1
fi
ZK=$1
TOPIC=$2
READ_LAST_COUNT=$3
TZ=$4
RAW=$5
function get_broker_list () {
ZK="$1"
GET_BROKER_METADATA_COMMANDS=$(zookeeper-shell "$ZK" <<< "ls /brokers/ids" | grep '^\[' | jq -r .[] | sed 's/\([0-9][0-9]*\)/ get \/brokers\/ids\/\1 /g')
BROKER_LIST=''
while read -r GET_BROKER_METADATA_COMMAND; do
BROKER_JSON=$(zookeeper-shell "$ZK" <<< "$GET_BROKER_METADATA_COMMAND" 2>/dev/null | grep '"host":' )
BROKER_HOST=$(echo "$BROKER_JSON" | jq -r .host)
BROKER_PORT=$(echo "$BROKER_JSON" | jq -r .port)
BROKER_LIST="$BROKER_LIST$BROKER_HOST:$BROKER_PORT,"
done <<< "$GET_BROKER_METADATA_COMMANDS"
echo "$BROKER_LIST" | sed 's/,$//g'
}
BROKER_LIST=$(get_broker_list "$ZK")
ONE_BROKER=$(echo "$BROKER_LIST" | tr ',' '\n' | tail -n 1)
PARTITION_OFFSET_START_LIST=$(kafka-run-class kafka.tools.GetOffsetShell --broker-list "$BROKER_LIST" --time -2 --topic "$TOPIC" | sort)
PARTITION_OFFSET_END_LIST=$(kafka-run-class kafka.tools.GetOffsetShell --broker-list "$BROKER_LIST" --time -1 --topic "$TOPIC" | sort)
PARTITION_COUNT=$(echo "$PARTITION_OFFSET_START_LIST" | wc -l)
for LINE_NUMBER in `seq 1 $PARTITION_COUNT`;
do
OFFSET_START=$(echo "$PARTITION_OFFSET_START_LIST" | head -n $LINE_NUMBER | tail -n -1 | sed 's/\([^:][^:]*\):\([0-9]*\):\([0-9]*\)/\3/g')
OFFSET_END=$(echo "$PARTITION_OFFSET_END_LIST" | head -n $LINE_NUMBER | tail -n -1 | sed 's/\([^:][^:]*\):\([0-9]*\):\([0-9]*\)/\3/g')
PARTITION_ID=$(expr $LINE_NUMBER - 1)
MAX_MESSAGES=$(expr $OFFSET_END - $OFFSET_START)
if [ "$MAX_MESSAGES" -gt "$READ_LAST_COUNT" ]; then
MAX_MESSAGES="$READ_LAST_COUNT"
OFFSET_START=$(expr $OFFSET_END - $MAX_MESSAGES)
fi
PROPERTIES="--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer "
if [ "$RAW" != "raw" ]; then
echo "$TOPIC partition $PARTITION_ID last $MAX_MESSAGES messages:"
PROPERTIES="--property print.timestamp=true $PROPERTIES"
fi
MESSAGES_WITH_TIMESTAMPS=$(kafka-simple-consumer-shell $PROPERTIES --broker-list "$BROKER_LIST" --topic "$TOPIC" --offset $OFFSET_START --max-messages $MAX_MESSAGES --partition $PARTITION_ID --no-wait-at-logend )
LINE_COUNT=$(echo "$MESSAGES_WITH_TIMESTAMPS" | wc -l)
for LINE_NUMBER in `seq 1 $LINE_COUNT`;
do
LINE=$(echo "$MESSAGES_WITH_TIMESTAMPS" | head -n $LINE_NUMBER | tail -n -1)
TIMESTAMP=$(echo "$LINE" | sed -ne 's/CreateTime:\([0-9]*\)\t.*/\1/p')
BYTES=$(echo "$LINE" | sed -ne 's/CreateTime:[0-9]*\t\(.*\)/\1/p' | sed 's/\\x[A-F0-9]\{2\}/ /g')
if [ ! -z "$TIMESTAMP" ]; then
DATETIME_READABLE=$(TZ="$TZ" date -d@"$(expr $TIMESTAMP / 1000)")
echo "$DATETIME_READABLE $BYTES"
else
echo "$LINE"
fi
done
done