#! /bin/bash # Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one # or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at # https://securityonion.net/license; you may not use this file except in compliance with the # Elastic License 2.0. if [ -z "$NOROOT" ]; then # Check for prerequisites if [ "$(id -u)" -ne 0 ]; then echo "This script must be run using sudo!" exit 1 fi fi usage() { cat < [parameters] Where is one of the following: topic-partitions: Increase the number of partitions for a Kafka topic Required arguments: topic-partitions <# partitions> Example: $0 topic-partitions suricata-topic 6 list-topics: List of Kafka topics Example: $0 list-topics USAGE_EOF exit 1 } if [[ $# -lt 1 || $1 == --help || $1 == -h ]]; then usage fi kafka_client_config="/opt/kafka/config/kraft/client.properties" too_few_arguments() { echo -e "\nMissing one or more required arguments!\n" usage } get_kafka_brokers() { brokers_cache="/opt/so/state/kafka_brokers" broker_port="9092" if [[ ! -f "$brokers_cache" ]] || [[ $(find "/$brokers_cache" -mmin +120) ]]; then echo "Refreshing Kafka brokers list" salt-call pillar.get kafka:nodes --out=json | jq -r --arg broker_port "$broker_port" '.local | to_entries[] | select(.value.role | contains("broker")) | "\(.value.ip):\($broker_port)"' | paste -sd "," - > "$brokers_cache" else echo "Using cached Kafka brokers list" fi brokers=$(cat "$brokers_cache") } increase_topic_partitions() { get_kafka_brokers command=$(so-kafka-cli kafka-topics.sh --bootstrap-server $brokers --command-config $kafka_client_config --alter --topic $topic --partitions $partition_count) if $command; then echo -e "Successfully increased the number of partitions for topic $topic to $partition_count\n" so-kafka-cli kafka-topics.sh --bootstrap-server $brokers --command-config $kafka_client_config --describe --topic $topic fi } get_kafka_topics_list() { get_kafka_brokers so-kafka-cli kafka-topics.sh --bootstrap-server $brokers --command-config $kafka_client_config --exclude-internal --list | sort } operation=$1 case "${operation}" in "topic-partitions") if [[ $# -lt 3 ]]; then too_few_arguments fi topic=$2 partition_count=$3 increase_topic_partitions ;; "list-topics") get_kafka_topics_list ;; *) usage ;; esac