This came up while I was helping a developer debug their service. The kafka shell tools are functional, but not super well documented or easy to use. This tail -n 10 type use case is insanely difficult to achieve with just the shell tools, especially if you want human readable timestamps on messages. I realize this code isn't the cleanest, but I had fun writing it and it works like a champ on our 0.10.x Kafka brokers.

Since this script uses the kafka.tools.GetOffsetShell tool, which accepts timestamps as input it could somewhat easily be adapted to display all messages between two given timestamps.

https://gist.github.com/forestjohnsonpeoplenet/d56e6f6a0d1070bcd74362a76c9dafec

#!/bin/bash

if [ -z "$1" ]  
  then
    echo "Missing first argument, zookeeper host port like: my-zookeeper-hostname:2181"
    echo "Usage Example:  ./topic-last-messages.sh my-zookeeper-hostname:2181 MY_TOPIC_NAME 10 America/Chicago"
    exit 1
fi

if [ -z "$2" ]  
  then
    echo "Missing second argument, topic name like: MY_TOPIC_NAME"
    echo "Usage Example:  ./topic-last-messages.sh my-zookeeper-hostname:2181 MY_TOPIC_NAME 10 America/Chicago"
    exit 1
fi

if [ -z "$3" ]  
  then
    echo "Missing third argument, number of messages to consume from each partition, like: 10"
    echo "Usage Example:  ./topic-last-messages.sh my-zookeeper-hostname:2181 MY_TOPIC_NAME 10 America/Chicago"
    exit 1
fi

ZK=$1  
TOPIC=$2  
READ_LAST_COUNT=$3  
TZ=$4
RAW=$5


function get_broker_list () {  
  ZK="$1"
  GET_BROKER_METADATA_COMMANDS=$(zookeeper-shell "$ZK" <<< "ls /brokers/ids" | grep '^\[' | jq -r .[] | sed 's/\([0-9][0-9]*\)/ get \/brokers\/ids\/\1 /g')

  BROKER_LIST=''

  while read -r GET_BROKER_METADATA_COMMAND; do
    BROKER_JSON=$(zookeeper-shell "$ZK" <<< "$GET_BROKER_METADATA_COMMAND" 2>/dev/null | grep '"host":' )
    BROKER_HOST=$(echo "$BROKER_JSON" | jq -r .host)
    BROKER_PORT=$(echo "$BROKER_JSON" | jq -r .port)
    BROKER_LIST="$BROKER_LIST$BROKER_HOST:$BROKER_PORT,"
  done <<< "$GET_BROKER_METADATA_COMMANDS"

  echo "$BROKER_LIST" | sed 's/,$//g'
}

BROKER_LIST=$(get_broker_list "$ZK")

ONE_BROKER=$(echo "$BROKER_LIST" | tr ',' '\n' | tail -n 1)

PARTITION_OFFSET_START_LIST=$(kafka-run-class kafka.tools.GetOffsetShell --broker-list "$BROKER_LIST" --time -2 --topic "$TOPIC" | sort)

PARTITION_OFFSET_END_LIST=$(kafka-run-class kafka.tools.GetOffsetShell --broker-list "$BROKER_LIST" --time -1 --topic "$TOPIC" | sort)


PARTITION_COUNT=$(echo "$PARTITION_OFFSET_START_LIST" | wc -l)

for LINE_NUMBER in `seq 1 $PARTITION_COUNT`;  
do  
  OFFSET_START=$(echo "$PARTITION_OFFSET_START_LIST" | head -n $LINE_NUMBER | tail -n -1 | sed 's/\([^:][^:]*\):\([0-9]*\):\([0-9]*\)/\3/g')
  OFFSET_END=$(echo "$PARTITION_OFFSET_END_LIST" | head -n $LINE_NUMBER | tail -n -1 | sed 's/\([^:][^:]*\):\([0-9]*\):\([0-9]*\)/\3/g')
  PARTITION_ID=$(expr $LINE_NUMBER - 1)
  MAX_MESSAGES=$(expr $OFFSET_END - $OFFSET_START)
  if [ "$MAX_MESSAGES" -gt "$READ_LAST_COUNT" ]; then
    MAX_MESSAGES="$READ_LAST_COUNT"
    OFFSET_START=$(expr $OFFSET_END - $MAX_MESSAGES)
  fi

  PROPERTIES="--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer "
  if [ "$RAW" != "raw" ]; then
     echo "$TOPIC partition $PARTITION_ID last $MAX_MESSAGES messages:"
     PROPERTIES="--property print.timestamp=true  $PROPERTIES"
  fi

  MESSAGES_WITH_TIMESTAMPS=$(kafka-simple-consumer-shell $PROPERTIES --broker-list "$BROKER_LIST" --topic "$TOPIC" --offset $OFFSET_START --max-messages $MAX_MESSAGES --partition $PARTITION_ID --no-wait-at-logend )

  LINE_COUNT=$(echo "$MESSAGES_WITH_TIMESTAMPS" | wc -l)

  for LINE_NUMBER in `seq 1 $LINE_COUNT`;
  do
    LINE=$(echo "$MESSAGES_WITH_TIMESTAMPS" | head -n $LINE_NUMBER | tail -n -1)

    TIMESTAMP=$(echo "$LINE" | sed -ne 's/CreateTime:\([0-9]*\)\t.*/\1/p')
    BYTES=$(echo "$LINE" | sed -ne 's/CreateTime:[0-9]*\t\(.*\)/\1/p' | sed 's/\\x[A-F0-9]\{2\}/ /g')
    if [ ! -z  "$TIMESTAMP" ]; then
      DATETIME_READABLE=$(TZ="$TZ" date -d@"$(expr $TIMESTAMP / 1000)")
      echo "$DATETIME_READABLE $BYTES"
    else
      echo "$LINE"
    fi
  done
done  

Comments