Apache Kafka Component Guide
Also available as:
PDF

Kafka Broker Settings

The following subsections describe configuration settings that influence the performance of Kafka brokers.

Connection Settings

Review the following connection setting in the Advanced kafka-broker category, and modify as needed:

zookeeper.session.timeout.ms

Specifies ZooKeeper session timeout, in milliseconds. The default value is 30000 ms.

If the server fails to signal heartbeat to ZooKeeper within this period of time, the server is considered to be dead. If you set this value too low, the server might be falsely considered dead; if you set it too high it may take too long to recognize a truly dead server.

If you see frequent disconnection from the ZooKeeper server, review this setting. If long garbage collection pauses cause Kafka to lose its ZooKeeper session, you might need to configure longer timeout values.

advertised.listeners

If you have manually set listeners to advertised.listeners=PLAINTEXT://$HOSTNAME:$PORT, after enabling Kerberos, change the listener configuration to advertised.listeners= SASL_PLAINTEXT://$HOSTNAME:$PORT.

[Important]Important

Do not change the following connection settings:

zookeeper.connect

A comma-separated list of ZooKeeper hostname:port pairs. Ambari sets this value. Do not change this setting.

Topic Settings

For each topic, Kafka maintains a structured commit log with one or more partitions. These topic partitions form the basic unit of parallelism in Kafka. In general, the more partitions there are in a Kafka cluster, the more parallel consumers can be added, resulting in higher throughput.

You can calculate the number of partitions based on your throughput requirements. If throughput from a producer to a single partition is P and throughput from a single partition to a consumer is C, and if your target throughput is T, the minimum number of required partitions is

max (T/P, T/C).

Note also that more partitions can increase latency:

  • End-to-end latency in Kafka is defined as the difference in time from when a message is published by the producer to when the message is read by the consumer.

  • Kafka only exposes a message to a consumer after it has been committed, after the message is replicated to all in-sync replicas.

  • Replication of one thousand partitions from one broker to another can take up 20ms. This is too long for some real-time applications.

  • In the new Kafka producer, messages are accumulated on the producer side; producers buffer the message per partition. This approach allows users to set an upper bound on the amount of memory used for buffering incoming messages. After enough data is accumulated or enough time has passed, accumulated messages are removed and sent to the broker. If you define more partitions, messages are accumulated for more partitions on the producer side.

  • Similarly, the consumer fetches batches of messages per partition. Consumer memory requirements are proportional to the number of partitions that the consumer subscribes to.

Important Topic Properties

Review the following settings in the Advanced kafka-broker category, and modify as needed:

auto.create.topics.enable

Enable automatic creation of topics on the server. If this property is set to true, then attempts to produce, consume, or fetch metadata for a nonexistent topic automatically create the topic with the default replication factor and number of partitions. The default is enabled.

default.replication.factor

Specifies default replication factors for automatically created topics. For high availability production systems, you should set this value to at least 3.

num.partitions

Specifies the default number of log partitions per topic, for automatically created topics. The default value is 1. Change this setting based on the requirements related to your topic and partition design.

delete.topic.enable

Allows users to delete a topic from Kafka using the admin tool, for Kafka versions 0.9 and later. Deleting a topic through the admin tool will have no effect if this setting is turned off.

By default this feature is turned off (set to false).

Log Settings

Review the following settings in the Kafka Broker category, and modify as needed:

log.roll.hours

The maximum time, in hours, before a new log segment is rolled out. The default value is 168 hours (seven days).

This setting controls the period of time after which Kafka will force the log to roll, even if the segment file is not full. This ensures that the retention process is able to delete or compact old data.

log.retention.hours

The number of hours to keep a log file before deleting it. The default value is 168 hours (seven days).

When setting this value, take into account your disk space and how long you would like messages to be available. An active consumer can read quickly and deliver messages to their destination.

The higher the retention setting, the longer the data will be preserved. Higher settings generate larger log files, so increasing this setting might reduce your overall storage capacity.

log.dirs

A comma-separated list of directories in which log data is kept. If you have multiple disks, list all directories under each disk.

Review the following setting in the Advanced kafka-broker category, and modify as needed:

log.retention.bytes

The amount of data to retain in the log for each topic partition. By default, log size is unlimited.

Note that this is the limit for each partition, so multiply this value by the number of partitions to calculate the total data retained for the topic.

If log.retention.hours and log.retention.bytes are both set, Kafka deletes a segment when either limit is exceeded.

log.segment.bytes

The log for a topic partition is stored as a directory of segment files. This setting controls the maximum size of a segment file before a new segment is rolled over in the log. The default is 1 GB.

Log Flush Management

Kafka writes topic messages to a log file immediately upon receipt, but the data is initially buffered in page cache. A log flush forces Kafka to flush topic messages from page cache, writing the messages to disk.

We recommend using the default flush settings, which rely on background flushes done by Linux and Kafka. Default settings provide high throughput and low latency, and they guarantee recovery through the use of replication.

If you decide to specify your own flush settings, you can force a flush after a period of time, or after a specified number of messages, or both (whichever limit is reached first). You can set property values globally and override them on a per-topic basis.

There are several important considerations related to log file flushing:

  • Durability: unflushed data is at greater risk of loss in the event of a crash. A failed broker can recover topic partitions from its replicas, but if a follower does not issue a fetch request or consume from the leader's log-end offset within the time specified by replica.lag.time.max.ms (which defaults to 10 seconds), the leader removes the follower from the in-sync replica ("ISR"). When this happens there is a slight chance of message loss if you do not explicitly set log.flush.interval.messages. If the leader broker fails and the follower is not caught up with the leader, the follower can still be under ISR for those 10 seconds and messages during leader transition to follower can be lost.

  • Increased latency: data is not available to consumers until it is flushed (the fsync implementation in most Linux filesystems blocks writes to the file system).

  • Throughput: a flush operation is typically an expensive operation.

  • Disk usage patterns are less efficient.

  • Page-level locking in background flushing is much more granular.

log.flush.interval.messages specifies the number of messages to accumulate on a log partition before Kafka forces a flush of data to disk.

log.flush.scheduler.interval.ms specifies the amount of time (in milliseconds) after which Kafka checks to see if a log needs to be flushed to disk.

log.segment.bytes specifies the size of the log file. Kafka flushes the log file to disk whenever a log file reaches its maximum size.

log.roll.hours specifies the maximum length of time before a new log segment is rolled out (in hours); this value is secondary to log.roll.ms. Kafka flushes the log file to disk whenever a log file reaches this time limit.

Compaction Settings

Review the following settings in the Advanced kafka-broker category, and modify as needed:

log.cleaner.dedupe.buffer.size

Specifies total memory used for log deduplication across all cleaner threads.

By default, 128 MB of buffer is allocated. You may want to review this and other log.cleaner configuration values, and adjust settings based on your use of compacted topics (__consumer_offsets and other compacted topics).

log.cleaner.io.buffer.size

Specifies the total memory used for log cleaner I/O buffers across all cleaner threads. By default, 512 KB of buffer is allocated. You may want to review this and other log.cleaner configuration values, and adjust settings based on your usage of compacted topics (__consumer_offsets and other compacted topics).

General Broker Settings

Review the following settings in the Advanced kafka-broker category, and modify as needed:

auto.leader.rebalance.enable

Enables automatic leader balancing. A background thread checks and triggers leader balancing (if needed) at regular intervals. The default is enabled.

unclean.leader.election.enable

This property allows you to specify a preference of availability or durability. This is an important setting: If availability is more important than avoiding data loss, ensure that this property is set to true. If preventing data loss is more important than availability, set this property to false.

This setting operates as follows:

  • If unclean.leader.election.enable is set to true (enabled), an out-of-sync replica will be elected as leader when there is no live in-sync replica (ISR). This preserves the availability of the partition, but there is a chance of data loss.

  • If unclean.leader.election.enable is set to false and there are no live in-sync replicas, Kafka returns an error and the partition will be unavailable.

This property is set to true by default, which favors availability.

If durability is preferable to availability, set unclean.leader.election to false.

controlled.shutdown.enable

Enables controlled shutdown of the server. The default is enabled.

min.insync.replicas

When a producer sets acks to "all", min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception.

When used together, min.insync.replicas and producer acks allow you to enforce stronger durability guarantees.

You should set min.insync.replicas to 2 for replication factor equal to 3.

message.max.bytes

Specifies the maximum size of message that the server can receive. It is important that this property be set with consideration for the maximum fetch size used by your consumers, or a producer could publish messages too large for consumers to consume.

Note that there are currently two versions of consumer and producer APIs. The value of message.max.bytes must be smaller than the max.partition.fetch.bytes setting in the new consumer, or smaller than the fetch.message.max.bytes setting in the old consumer. In addition, the value must be smaller than replica.fetch.max.bytes.

replica.fetch.max.bytes

Specifies the number of bytes of messages to attempt to fetch. This value must be larger than message.max.bytes.

broker.rack

The rack awareness feature distributes replicas of a partition across different racks. You can specify that a broker belongs to a particular rack through the "Custom kafka-broker" menu option. For more information about the rack awareness feature, see http://kafka.apache.org/documentation.html#basic_ops_racks.