#! /bin/bash
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at 
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.

if [ -z "$NOROOT" ]; then
	# Check for prerequisites
	if [ "$(id -u)" -ne 0 ]; then
		echo "This script must be run using sudo!"
		exit 1
	fi
fi

usage() {
  cat <<USAGE_EOF

  Usage: $0 <operation> [parameters]

  Where <operation> is one of the following:

    topic-partitions: Increase the number of partitions for a Kafka topic
      Required arguments: topic-partitions <topic name> <# partitions>
      Example: $0 topic-partitions suricata-topic 6

	list-topics: List of Kafka topics
	  Example: $0 list-topics

USAGE_EOF
  exit 1
}

if [[ $# -lt 1 || $1 == --help || $1 == -h ]]; then
  usage
fi

kafka_client_config="/opt/kafka/config/kraft/client.properties"

too_few_arguments() {
  echo -e "\nMissing one or more required arguments!\n"
  usage
}

get_kafka_brokers() {
	brokers_cache="/opt/so/state/kafka_brokers"
  broker_port="9092"
  if [[ ! -f "$brokers_cache" ]] || [[ $(find "/$brokers_cache" -mmin +120) ]]; then
    echo "Refreshing Kafka brokers list"
	  salt-call pillar.get kafka:nodes --out=json | jq -r --arg broker_port "$broker_port" '.local | to_entries[] | select(.value.role | contains("broker")) | "\(.value.ip):\($broker_port)"' | paste -sd "," - > "$brokers_cache"
  else
    echo "Using cached Kafka brokers list"
  fi
  brokers=$(cat "$brokers_cache")
}

increase_topic_partitions() {
  get_kafka_brokers
	command=$(so-kafka-cli kafka-topics.sh --bootstrap-server $brokers --command-config $kafka_client_config --alter --topic $topic --partitions $partition_count)
  if $command; then
    echo -e "Successfully increased the number of partitions for topic $topic to $partition_count\n"
    so-kafka-cli kafka-topics.sh --bootstrap-server $brokers --command-config $kafka_client_config --describe --topic $topic
  fi
}

get_kafka_topics_list() {
	get_kafka_brokers
	so-kafka-cli kafka-topics.sh --bootstrap-server $brokers --command-config $kafka_client_config --exclude-internal --list | sort
}

operation=$1
case "${operation}" in
  "topic-partitions")
	if [[ $# -lt 3 ]]; then
	  too_few_arguments
	fi
	topic=$2
	partition_count=$3
	increase_topic_partitions
	;;
  "list-topics")
	get_kafka_topics_list
	;;
  *)
	usage
	;;
esac

