From 6217a7b9a94a972ecab3e76a436cadf76ce3f4ab Mon Sep 17 00:00:00 2001 From: m0duspwnens Date: Tue, 9 Apr 2024 09:27:21 -0400 Subject: [PATCH] add defaults and jijafy kafka config --- salt/kafka/config.map.jinja | 8 ++ salt/kafka/config.sls | 12 +-- salt/kafka/defaults.yaml | 39 ++++++++ salt/kafka/enabled.sls | 17 ++-- salt/kafka/etc/client.properties.jinja | 2 + salt/kafka/etc/server.properties.jinja | 130 +------------------------ salt/kafka/map.jinja | 15 +++ 7 files changed, 81 insertions(+), 142 deletions(-) create mode 100644 salt/kafka/config.map.jinja create mode 100644 salt/kafka/defaults.yaml create mode 100644 salt/kafka/etc/client.properties.jinja create mode 100644 salt/kafka/map.jinja diff --git a/salt/kafka/config.map.jinja b/salt/kafka/config.map.jinja new file mode 100644 index 000000000..ab43d84a9 --- /dev/null +++ b/salt/kafka/config.map.jinja @@ -0,0 +1,8 @@ +{% from 'vars/globals.map.jinja' import GLOBALS %} +{% from 'kafka/map.jinja' import KAFKAMERGED %} + +{% set KAFKACONFIG = {} %} +{% for k, v in KAFKAMERGED.config.keys() %} +{% do KAFKACONFIG.update({k | replace("_x_", "."): v}) %} +{% endfor %} + diff --git a/salt/kafka/config.sls b/salt/kafka/config.sls index dedc68fe8..523681ba0 100644 --- a/salt/kafka/config.sls +++ b/salt/kafka/config.sls @@ -94,19 +94,17 @@ kafka_keystore_perms: - user: 960 - group: 939 -kafka_kraft_server_properties: +{% for sc in ['server', 'client'] %} +kafka_kraft_{{type}}_properties: file.managed: - - source: salt://kafka/etc/server.properties.jinja - - name: /opt/so/conf/kafka/server.properties + - source: salt://kafka/etc/{{sc}}.properties.jinja + - name: /opt/so/conf/kafka/{{sc}}.properties - template: jinja - - defaults: - kafka_nodeid: {{ kafka_nodeid }} - kraft_controller_quorum_voters: {{ kraft_controller_quorum_voters }} - kafka_ip: {{ kafka_ip }} - user: 960 - group: 960 - makedirs: True - show_changes: False +{% endfor %} {% else %} diff --git a/salt/kafka/defaults.yaml b/salt/kafka/defaults.yaml new file mode 100644 index 000000000..7828f0536 --- /dev/null +++ b/salt/kafka/defaults.yaml @@ -0,0 +1,39 @@ +kafka: + enabled: False + config: + server: + advertised_x_listeners: BROKER://10.66.166.231:9092 + auto_x_create_x_topics_x_enable: true + controller_x_listener_x_names: CONTROLLER + controller_x_quorum_x_voters: + inter_x_broker_x_listener_x_name: BROKER + listeners: BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 + listener_x_security_x_protocol_x_map: CONTROLLER:SSL,BROKER:SSL + log_x_dirs: /nsm/kafka/data + log_x_retention_x_check_x_interval_x_ms: 300000 + log_x_retention_x_hours: 168 + log_x_segment_x_bytes: 1073741824 + node_x_id: + num_x_io_x_threads: 8 + num_x_network_x_threads: 3 + num_x_partitions: 1 + num_x_recovery_x_threads_x_per_x_data_x_dir: 1 + offsets_x_topic_x_replication_x_factor: 1 + process_x_roles: broker,controller + socket_x_receive_x_buffer_x_bytes: 102400 + socket_x_request_x_max_x_bytes: 104857600 + socket_x_send_x_buffer_x_bytes: 102400 + ssl_x_keystore_x_location: /etc/pki/kafka.jks + ssl_x_keystore_x_password: changeit + ssl_x_keystore_x_type: JKS + ssl_x_truststore_x_location: /etc/pki/java/sos/cacerts + ssl_x_truststore_x_password: changeit + transaction_x_state_x_log_x_min_x_isr: 1 + transaction_x_state_x_log_x_replication_x_factor: 1 + client: + security_x_protocol: SSL + ssl_x_truststore_x_location: /etc/pki/java/sos/cacerts + ssl_x_truststore_x_password: changeit + ssl_x_keystore_x_location: /etc/pki/kafka.jks + ssl_x_keystore_x_type: JKS + ssl_x_keystore_x_password: changeit diff --git a/salt/kafka/enabled.sls b/salt/kafka/enabled.sls index 49a0a9bbd..c2fca70db 100644 --- a/salt/kafka/enabled.sls +++ b/salt/kafka/enabled.sls @@ -26,14 +26,14 @@ so-kafka: - environment: - KAFKA_HEAP_OPTS=-Xmx2G -Xms1G - extra_hosts: - {% for node in KAFKANODES %} + {% for node in KAFKANODES %} - {{ node }}:{{ KAFKANODES[node].ip }} - {% endfor %} - {% if DOCKER.containers['so-kafka'].extra_hosts %} - {% for XTRAHOST in DOCKER.containers['so-kafka'].extra_hosts %} + {% endfor %} + {% if DOCKER.containers['so-kafka'].extra_hosts %} + {% for XTRAHOST in DOCKER.containers['so-kafka'].extra_hosts %} - {{ XTRAHOST }} - {% endfor %} - {% endif %} + {% endfor %} + {% endif %} - port_bindings: {% for BINDING in DOCKER.containers['so-kafka'].port_bindings %} - {{ BINDING }} @@ -43,8 +43,11 @@ so-kafka: - /opt/so/conf/ca/cacerts:/etc/pki/java/sos/cacerts - /nsm/kafka/data/:/nsm/kafka/data/:rw - /opt/so/conf/kafka/server.properties:/kafka/config/kraft/server.properties + - /opt/so/conf/kafka/client.properties:/kafka/config/kraft/client.properties - watch: - - file: kafka_kraft_server_properties + {% for sc in ['server', 'client'] %} + - file: kafka_kraft_{{sc}}_properties + {% endfor %} delete_so-kafka_so-status.disabled: file.uncomment: diff --git a/salt/kafka/etc/client.properties.jinja b/salt/kafka/etc/client.properties.jinja new file mode 100644 index 000000000..9f01904e4 --- /dev/null +++ b/salt/kafka/etc/client.properties.jinja @@ -0,0 +1,2 @@ +{%- from 'kafka/config.map.jinja' import KAFKACONFIG %} +{{ KAFKACONFIG.client }} diff --git a/salt/kafka/etc/server.properties.jinja b/salt/kafka/etc/server.properties.jinja index 486feb214..a18262ac2 100644 --- a/salt/kafka/etc/server.properties.jinja +++ b/salt/kafka/etc/server.properties.jinja @@ -1,128 +1,2 @@ -# This configuration file is intended for use in KRaft mode, where -# Apache ZooKeeper is not present. See config/kraft/README.md for details. -# - -############################# Server Basics ############################# - -# The role of this server. Setting this puts us in KRaft mode -process.roles=broker,controller - -# The node id associated with this instance's roles -node.id={{ kafka_nodeid }} - -# The connect string for the controller quorum -controller.quorum.voters={{ kraft_controller_quorum_voters }} - -############################# Socket Server Settings ############################# - -# The address the socket server listens on. -# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum. -# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(), -# with PLAINTEXT listener name, and port 9092. -# FORMAT: -# listeners = listener_name://host_name:port -# EXAMPLE: -# listeners = PLAINTEXT://your.host.name:9092 - -# using 0.0.0.0 eliminates issues with binding to 9092 and 9093 in initial testing -listeners=BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 -#listeners=BROKER://{{ kafka_ip }}:9092,CONTROLLER://{{ kafka_ip }}:9093 - -auto.create.topics.enable=true - -# Name of listener used for communication between brokers. -inter.broker.listener.name=BROKER - -# Listener name, hostname and port the broker will advertise to clients. -# If not set, it uses the value for "listeners". -advertised.listeners=BROKER://{{ kafka_ip }}:9092 - -# A comma-separated list of the names of the listeners used by the controller. -# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol -# This is required if running in KRaft mode. -controller.listener.names=CONTROLLER - -# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details -listener.security.protocol.map=CONTROLLER:SSL,BROKER:SSL - -#SSL configuration -ssl.keystore.location=/etc/pki/kafka.jks -ssl.keystore.password=changeit -ssl.keystore.type=JKS -ssl.truststore.location=/etc/pki/java/sos/cacerts -ssl.truststore.password=changeit - -# The number of threads that the server uses for receiving requests from the network and sending responses to the network -num.network.threads=3 - -# The number of threads that the server uses for processing requests, which may include disk I/O -num.io.threads=8 - -# The send buffer (SO_SNDBUF) used by the socket server -socket.send.buffer.bytes=102400 - -# The receive buffer (SO_RCVBUF) used by the socket server -socket.receive.buffer.bytes=102400 - -# The maximum size of a request that the socket server will accept (protection against OOM) -socket.request.max.bytes=104857600 - - -############################# Log Basics ############################# - -# A comma separated list of directories under which to store log files -log.dirs=/nsm/kafka/data - -# The default number of log partitions per topic. More partitions allow greater -# parallelism for consumption, but this will also result in more files across -# the brokers. -num.partitions=1 - -# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. -# This value is recommended to be increased for installations with data dirs located in RAID array. -num.recovery.threads.per.data.dir=1 - -############################# Internal Topic Settings ############################# -# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" -# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. -offsets.topic.replication.factor=1 -transaction.state.log.replication.factor=1 -transaction.state.log.min.isr=1 - -############################# Log Flush Policy ############################# - -# Messages are immediately written to the filesystem but by default we only fsync() to sync -# the OS cache lazily. The following configurations control the flush of data to disk. -# There are a few important trade-offs here: -# 1. Durability: Unflushed data may be lost if you are not using replication. -# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. -# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. -# The settings below allow one to configure the flush policy to flush data after a period of time or -# every N messages (or both). This can be done globally and overridden on a per-topic basis. - -# The number of messages to accept before forcing a flush of data to disk -#log.flush.interval.messages=10000 - -# The maximum amount of time a message can sit in a log before we force a flush -#log.flush.interval.ms=1000 - -############################# Log Retention Policy ############################# - -# The following configurations control the disposal of log segments. The policy can -# be set to delete segments after a period of time, or after a given size has accumulated. -# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens -# from the end of the log. - -# The minimum age of a log file to be eligible for deletion due to age -log.retention.hours=168 - -# A size-based retention policy for logs. Segments are pruned from the log unless the remaining -# segments drop below log.retention.bytes. Functions independently of log.retention.hours. -#log.retention.bytes=1073741824 - -# The maximum size of a log segment file. When this size is reached a new log segment will be created. -log.segment.bytes=1073741824 - -# The interval at which log segments are checked to see if they can be deleted according -# to the retention policies -log.retention.check.interval.ms=300000 +{%- from 'kafka/config.map.jinja' import KAFKACONFIG %} +{{ KAFKACONFIG.server }} diff --git a/salt/kafka/map.jinja b/salt/kafka/map.jinja new file mode 100644 index 000000000..f0e389e4a --- /dev/null +++ b/salt/kafka/map.jinja @@ -0,0 +1,15 @@ +{% import_yaml 'kafka/defaults.yaml' as KAFKADEFAULTS %} +{% set KAFKAMERGED = salt['pillar.get']('kafka', KAFKADEFAULTS.kafka, merge=True) %} +{% from 'vars/globals.map.jinja' import GLOBALS %} + +{% set KAFKAMERGED.config.server.node_x_id = salt['pillar.get']('kafka:nodes:' ~ GLOBALS.hostname ~ ':nodeid') %} +{% set KAFKAMERGED.config.server.advertised_x_listeners = 'BROKER://' ~ GLOBALS.node_ip ~ ':9092' %} + +{% set nodes = salt['pillar.get']('kafka:nodes', {}) %} +{% set combined = [] %} +{% for hostname, data in nodes.items() %} + {% do combined.append(data.nodeid ~ "@" ~ hostname ~ ":9093") %} +{% endfor %} +{% set kraft_controller_quorum_voters = ','.join(combined) %} + +{% set KAFKAMERGED.config.server.controller_x_quorum_x_voters = kraft_controller_quorum_voters %}