Kafka 生产者 KafkaProducer 是 Kafka 与开发者交互的媒介之一,肩负接收用户自定义消息(这里的消息指代往 Kafka 发送的各类数据),并投递给目标 topic 分区的职责。在设计上为了提升消息吞吐量,考量降低与服务端交互的压力等,每次发送消息的请求并非是直接与 Kafka 集群进行交互,而是一个异步的过程。
当调用 KafkaProducer#send
方法发送消息时,实际上只是将消息缓存到了本地的消息收集器中,Kafka 定义了一个 RecordAccumulator 收集器用于收集用户提交的消息数据,同时又在后台维护了一个 Sender 线程,以异步的方式不断将收集器中缓存的消息定期定量地投递给 Kafka 集群。
在本篇文章中,我们首先回忆一下 KafkaProducer 的使用方式,然后重点分析消息的收集、缓存、投递,以及响应的过程。
KafkaProducer 使用示例
KafkaProducer 往 Kafka 发送消息需要依赖于客户端 SDK,Kafka 提供了 多种语言的客户端 供开发者选择,这里我们以 Kafka 内置的 java 客户端为例,介绍如何向 Kafka 集群发送消息。示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-demo" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<Integer, String> producer = new KafkaProducer<>(properties); boolean isAsync = true ;for (int i = 0 ; i < 10 ; i++) { if (isAsync) { producer.send(new ProducerRecord<>(DEFAULT_TOPIC, i, "zhenchao" ), (metadata, e) -> { if (null != e) { return ; } printResult(metadata); }); } else { Future<RecordMetadata> future = producer.send(new ProducerRecord<>(DEFAULT_TOPIC, i, "zhenchao" )); RecordMetadata metadata = future.get(10 , TimeUnit.SECONDS); this .printResult(metadata); } }
示例中发送消息依赖于 KafkaProducer 对象,KafkaProducer 类也是我们分析生产者运行机制的入口。创建该对象时我们需要指定 Kafka 集群地址,以及消息 key 和 value 的序列化器,但是客户端 ID 不是必须指定的,后面在分析源码时会看到如果未明确指定客户端 ID,Kafka 会自动为当前客户端创建一个。
接着我们可以调用 KafkaProducer#send
方法向 Kafka 集群指定的 topic 投递消息。消息在被投递之前需要封装成 ProducerRecord 对象,该对象封装了当前消息的目标 topic、目标分区,key、value,以及时间戳等信息。ProducerRecord 的字段定义如下,其中 ProducerRecord#headers
字段在 0.11 版本引入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class ProducerRecord <K , V > { private final String topic; private final Integer partition; private final Headers headers; private final K key; private final V value; private final Long timestamp; }
示例中我们定义了 isAsync 参数,需要说明的一点是,isAsync 参数虽然表面意思是指以异步的方式发送消息,但是本质上不管该参数如何设置,Kafka 当下的版本都只有一种消息发送的方式,即异步发送。参数 isAsync 设置为 true 或者 false 的意义在于指定如何获取消息发送的响应结果,区别在于:
isAsync=false
:以异步方式发送消息,但是通过 Future 模式阻塞等待消息的发送的响应结果。
isAsync=true
:以异步方式发送消息,但是通过 Callback 模式异步获取消息发送的响应结果,即不管消息发送成功还是失败,都会以回调的方式通知客户端,客户端期间不需要阻塞等待。
消息收集与发送过程分析
在具体开始分析消息的发送过程之前,我们需要明确 消息发送是一个异步的过程 ,该过程涉及到 2 个线程的协同工作,其中 1 个线程将待发送的消息写入缓冲区(即收集待发送消息),另外 1 个线程(Sender 线程)负责定期定量将缓冲区中的数据投递给 Kafka 集群,并反馈投递结果。
如上图描绘了生产者运行的基本结构。我们将业务线程称为主线程,业务在调用 KafkaProducer#send
方法向 Kafka 投递消息时,对应的消息会经过拦截器、序列化器,以及分区器等一系列处理,最终被缓存到消息收集器 RecordAccumulator 中。消息收集器为每个 topic 分区设置了一个双端队列用于记录待发往目标 topic 分区的消息集合。Sender 线程异步循环轮询消费消息收集器中的各个队列,将消息按照目标 broker 节点(即分区 Leader 副本所在的 broker 节点)进行分组,并封装成 RPC 请求发往目标节点。
这里我们只是简单概括了 KafkaProducer 的基本运行机制,下面将对各个环节展开进行深入分析。
收集待发送的消息
KafkaProducer 的字段定义与构造方法
首先来看一下 KafkaProducer 类的字段定义,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class KafkaProducer <K , V > implements Producer <K , V > { private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1 ); private String clientId; private final Partitioner partitioner; private final int maxRequestSize; private final long totalMemorySize; private final Metadata metadata; private final RecordAccumulator accumulator; private final Sender sender; private final Thread ioThread; private final CompressionType compressionType; private final Time time; private final Serializer<K> keySerializer; private final Serializer<V> valueSerializer; private final ProducerConfig producerConfig; private final long maxBlockTimeMs; private final int requestTimeoutMs; private final ProducerInterceptors<K, V> interceptors; }
接下来继续看一下 KafkaProducer 类对象的构造过程,KafkaProducer 提供了多个重载版本的构造方法实现,其中最底层的构造方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 private KafkaProducer (ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { try { log.trace("Starting the Kafka producer" ); Map<String, Object> userProvidedConfigs = config.originals(); this .producerConfig = config; this .time = Time.SYSTEM; this .clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0 ) { clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); } this .partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); if (keySerializer == null ) { this .keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); this .keySerializer.configure(config.originals(), true ); } else { config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); this .keySerializer = keySerializer; } if (valueSerializer == null ) { this .valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); this .valueSerializer.configure(config.originals(), false ); } else { config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); this .valueSerializer = valueSerializer; } userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false )) .getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); this .interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList); ClusterResourceListeners clusterResourceListeners = this .configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); this .metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true , clusterResourceListeners); this .maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this .totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this .compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); this .accumulator = new RecordAccumulator( config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this .totalMemorySize, this .compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, metrics, time); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this .metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds()); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); NetworkClient client = new NetworkClient( new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, "producer" , channelBuilder), metadata, clientId, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), requestTimeoutMs, time, true ); this .sender = new Sender( client, metadata, accumulator, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1 , config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short ) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), metrics, Time.SYSTEM, requestTimeoutMs); String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "" ); this .ioThread = new KafkaThread(ioThreadName, this .sender, true ); this .ioThread.start(); config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); log.debug("Kafka producer started" ); } catch (Throwable t) { } }
具体实现如上述代码注释,一条消息的发送需要经过拦截器、序列化器、分区器,最后缓存到消息收集器中,并由 Sender 线程在后台异步往 Kafka 集群投递消息,所以在构造 KafkaProducer 对象时主要就是初始化这些组件。
消息收集的过程
了解了 KafkaProducer 的字段定义和对象的构造过程之后,下面正式开始对消息收集的过程进行分析,相关实现位于 KafkaProducer#send
方法中:
1 2 3 4 5 6 public Future<RecordMetadata> send (ProducerRecord<K, V> record, Callback callback) { ProducerRecord<K, V> interceptedRecord = this .interceptors == null ? record : this .interceptors.onSend(record); return this .doSend(interceptedRecord, callback); }
该方法只是简单应用了注册的 Producer 拦截器对发送的消息进行拦截修改,而具体消息收集的过程则封装在 KafkaProducer#doSend
方法中。先来看一下 Producer 拦截器,我们可以基于该拦截器机制实现对消息的剔除、修改,以及在响应回调之前增加一些定制化的需求等。拦截器 ProducerInterceptor 接口的定义如下:
1 2 3 4 5 public interface ProducerInterceptor <K , V > extends Configurable { ProducerRecord<K, V> onSend (ProducerRecord<K, V> record) ; void onAcknowledgement (RecordMetadata metadata, Exception exception) ; void close () ; }
其中,方法 ProducerInterceptor#onSend
用于对待发送的消息进行前置拦截,具体的拦截时机是在消息被序列化和分配分区(如果未手动指定分区)之前,如上述 KafkaProducer#send
方法所示。方法 ProducerInterceptor#onAcknowledgement
用于对已发送到 Kafka 集群并得到确认的消息,以及发送失败的消息进行后置拦截,具体的拦截时机是在回调用户自定义的 Callback 逻辑之前。需要注意的一点是,方法 ProducerInterceptor#onAcknowledgement
在 Producer 的 I/O 线程中被调用,所以不建议在其中实现一些比较耗时的逻辑,以便影响整体消息发送的性能。
下面继续来看一下收集消息的过程,实现位于 KafkaProducer#doSend
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 private Future<RecordMetadata> doSend (ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null ; try { ClusterAndWaitTime clusterAndWaitTime = this .waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); long remainingWaitMs = Math.max(0 , maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte [] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer" ); } byte [] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer" ); } int partition = this .partition(record, serializedKey, serializedValue, cluster); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); this .ensureValidRecordSize(serializedSize); tp = new TopicPartition(record.topic(), partition); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}" , record, callback, record.topic(), partition); Callback interceptCallback = this .interceptors == null ? callback : new InterceptorCallback<>(callback, this .interceptors, tp); RecordAccumulator.RecordAppendResult result = accumulator.append( tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch" , record.topic(), partition); this .sender.wakeup(); } return result.future; } }
我们可以将消息的发送过程概括为以下 6 个步骤:
获取集群的元数据(Metadata)信息,如果请求的是新 topic,或者指定的分区 ID 超过了已知的合法区间,则触发更新本地缓存的集群元数据信息;
基于注册的 key 序列化器对消息的 key 执行序列化;
基于注册的 value 序列化器对消息的 value 执行序列化;
如果未指定目标 topic 分区,则基于注册的分区器为当前消息计算目标分区;
缓存消息到消息收集器 RecordAccumulator 中;
条件性唤醒消息发送 Sender 线程。
下面逐一对上述过程中的 6 个步骤展开分析。首先来看一下获取集群元数据信息的过程( 步骤 1 ),KafkaProducer 本地会缓存集群的元数据信息,包括集群的 topic 列表、每个 topic 的分区列表、分区 Leader 和 Follower 副本所在节点、分区 AR 和 ISR 集合,以及集群节点信息等,详细信息参考下面的 Metadata 类定义。
当客户端向集群投递消息时实际上是投递到了目标 topic 指定分区的 Leader 副本上。因为集群状态是动态变化的,Leader 副本所在的网络位置也会发生迁移,所以客户端在投递消息之前,需要确保本地所缓存的集群信息是最新的,否则需要标记当前集群信息需要更新,具体的更新操作由 Sender 线程完成。
KafkaProducer 在发送消息之前会先调用 KafkaProducer#waitOnMetadata
方法获取集群元数据信息,如果感知到本地缓存的集群元数据信息已经过期,则会通知 Sender 线程进行更新。首先来看一下保存集群元数据信息的 Metadata 类的字段定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; private int version; private long lastRefreshMs; private long lastSuccessfulRefreshMs; private Cluster cluster; private boolean needUpdate; private final Map<String, Long> topics; private final List<Listener> listeners; private boolean needMetadataForAllTopics; private final boolean topicExpiryEnabled; }
下面继续来看一下 KafkaProducer#waitOnMetadata
方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 private ClusterAndWaitTime waitOnMetadata (String topic, Integer partition, long maxWaitMs) throws InterruptedException { metadata.add(topic); Cluster cluster = metadata.fetch(); Integer partitionsCount = cluster.partitionCountForTopic(topic); if (partitionsCount != null && (partition == null || partition < partitionsCount)) { return new ClusterAndWaitTime(cluster, 0 ); } long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; long elapsed; do { log.trace("Requesting metadata update for topic {}." , topic); int version = metadata.requestUpdate(); sender.wakeup(); try { metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException ex) { throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms." ); } cluster = metadata.fetch(); elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) { throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms." ); } if (cluster.unauthorizedTopics().contains(topic)) { throw new TopicAuthorizationException(topic); } remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null ); if (partition != null && partition >= partitionsCount) { throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d)." , partition, partitionsCount)); } return new ClusterAndWaitTime(cluster, elapsed); }
上述方法首先会尝试将当前 topic 加入到本地缓存的 topic 集合中,因为客户端对于 topic 会有一个过期机制,对于长时间未使用的 topic 会从本地缓存中移除。这里一开始调用 Metadata#add
方法除了标记当前 topic 是活跃的之外,另外一个目的在于判断本地是否有该 topic 的缓存信息,如果没有则需要通知 Sender 线程更新集群元数据信息。通知的过程实际上只是简单将 Metadata#needUpdate
字段设置为 true,Sender 线程会检查该字段以更新集群元数据信息。
接下来会调用 Metadata#fetch
方法获取集群信息 Cluster 对象,Cluster 类是对集群节点、topic、分区等信息的一个封装,其字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public final class Cluster { private final List<Node> nodes; private final Set<String> unauthorizedTopics; private final Set<String> internalTopics; private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; private final Map<String, List<PartitionInfo>> partitionsByTopic; private final Map<String, List<PartitionInfo>> availablePartitionsByTopic; private final Map<Integer, List<PartitionInfo>> partitionsByNode; private final Map<Integer, Node> nodesById; }
其中 Node、TopicPartition 和 PartitionInfo 类定义比较简单,其作用分别为:
Node :封装 Kafka 节点信息,包括 ID、主机名,以及端口号等信息。
TopicPartition :封装分区摘要信息,包含分区所属 topic 和分区编号。
PartitionInfo :封装分区详细信息,包括分区所属 topic、分区编号、Leader 副本所在节点、全部副本所在节点列表,以及 ISR 副本所在节点列表。
继续回到 KafkaProducer#waitOnMetadata
方法。接下来方法会判断是否需要更新集群元数据信息,判断的依据是当前本地缓存的目标 topic 的分区数目不为空,同时如果发送消息时明确指定了分区编号,则此编号必须在本地认为合法的分区编号区间范围内。如果能够满足这些条件,则认为本地缓存的集群信息是合法的,可以直接拿来使用,否则就会触发更新集群元数据的逻辑。如果需要更新集群元数据,则会调用 Metadata#requestUpdate
方法设置标记位,同时唤醒 Sender 线程进行处理,并等待集群元数据更新完成。判定更新完成的策略就是判定本地缓存的集群元数据的版本号(Metadata#version
字段)是否被更新,因为集群元数据每更新成功一次,版本号会加 1。如果等待过程超时则会抛出 TimeoutException 异常。
此外,客户端也会定期触发元数据更新操作,默认元数据有效时间为 5 分钟,可以通过 metadata.max.age.ms
参数进行设定。
回到 KafkaProducer#doSend
方法,在拿到集群信息之后,方法会基于配置的 key 和 value 序列化器分别对消息 ID 和消息内容进行序列化( 步骤 2 和 步骤 3 ),这一过程比较简单。 步骤 4 会为当前消息选择合适的分区,相关实现位于 KafkaProducer#partition
方法中:
1 2 3 4 5 6 7 8 private int partition (ProducerRecord<K, V> record, byte [] serializedKey, byte [] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
如果我们在发送消息时明确指定了分区编号,那么这里只是简单的返回该编号,否则就需要基于注册的分区器计算当前消息对应的分区编号。Partitioner 接口是分区器的抽象,我们可以实现该接口自定义分区器,Kafka 也提供了默认的分区器实现 DefaultPartitioner,分区算法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null ) { int nextValue = this .nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0 ) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { return Utils.toPositive(nextValue) % numPartitions; } } else { return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
默认分区器 DefaultPartitioner 依据消息的 key 计算分区,如果在发送消息时未指定 key,则默认分区器会基于 Round-Robin 算法计算分区编号,以保证目标 topic 分区的负载均衡。否则会基于 32 位的 murmur2 哈希算法计算 key 的哈希值,并与分区数取模得到最后的分区编号。
步骤 5 会计算并校验当前消息的大小,同时为消息附加时间戳,并最终调用 RecordAccumulator#append
方法将消息缓存到收集器 RecordAccumulator 中,等待 Sender 线程投递给 Kafka 集群。RecordAccumulator 是生产者 SDK 中非常重要的一个类,可以将其看做是一个本地缓存消息的队列,消息收集线程将消息最终记录到收集器中,而 Sender 线程会定期定量从收集器中取出缓存的消息,并投递给 Kafka 集群。RecordAccumulator 类字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public final class RecordAccumulator { private volatile boolean closed; private final AtomicInteger flushesInProgress; private final AtomicInteger appendsInProgress; private final int batchSize; private final CompressionType compression; private final long lingerMs; private final long retryBackoffMs; private final BufferPool free; private final Time time; private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches; private final IncompleteRecordBatches incomplete; private final Set<TopicPartition> muted; private int drainIndex; }
既然 RecordAccumulator 可以看做是一个消息缓存队列,那么这里先了解一下其消息存储的模式。这其中涉及到 RecordAccumulator、RecordBatch、MemoryRecords 和 MemoryRecordsBuilder 4 个类。从上面 RecordAccumulator 类的字段列表中我们看到有一个 ConcurrentMap<TopicPartition, Deque<RecordBatch>>
类型的 batches 字段,这里的 key 对应 topic 的某个分区,而 value 是一个 Deque 类型,其中封装了一批 RecordBatch 对象,这些对象中记录了待发送的消息集合,而这些消息的一个共同点就是都是发往相同的 topic 分区。RecordBatch 类字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public final class RecordBatch { final long createdMs; final TopicPartition topicPartition; final ProduceRequestResult produceFuture; private final List<Thunk> thunks = new ArrayList<>(); private final MemoryRecordsBuilder recordsBuilder; volatile int attempts; long lastAttemptMs; int recordCount; int maxRecordSize; long drainedMs; long lastAppendTime; private boolean retry; }
我们可以从字段定义中看到 RecordBatch 持有一个 MemoryRecordsBuilder 类型的字段,MemoryRecordsBuilder 是 MemoryRecords 的构造和管理器,也就是说 RecordBatch 本质上是以 MemoryRecords 作为存储介质。
了解了 RecordAccumulator 类在存储模式上的设计之后,我们接下来分析 RecordAccumulator#append
方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public RecordAppendResult append (TopicPartition tp, long timestamp, byte [] key, byte [] value, Callback callback, long maxTimeToBlock) throws InterruptedException { appendsInProgress.incrementAndGet(); try { Deque<RecordBatch> dq = this .getOrCreateDeque(tp); synchronized (dq) { if (closed) { throw new IllegalStateException("Cannot send after the producer is closed." ); } RecordAppendResult appendResult = this .tryAppend(timestamp, key, value, callback, dq); if (appendResult != null ) { return appendResult; } } int size = Math.max(this .batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}" , size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { if (closed) { throw new IllegalStateException("Cannot send after the producer is closed." ); } RecordAppendResult appendResult = this .tryAppend(timestamp, key, value, callback, dq); if (appendResult != null ) { free.deallocate(buffer); return appendResult; } MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression, TimestampType.CREATE_TIME, this .batchSize); RecordBatch batch = new RecordBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true ); } } finally { appendsInProgress.decrementAndGet(); } }
追加消息到收集器的过程首先会获取指定 topic 分区对应的发送队列,如果不存在则会创建一个。然后同步往该队列的最后一个 RecordBatch 对象中追加数据,追加的过程位于 RecordAccumulator#tryAppend
方法中。如果追加失败,一般都是因为该 RecordBatch 没有足够的空间足以容纳,则方法会尝试申请新的空间,然后继续尝试追加。如果还是失败,则方法会创建一个新的 RecordBatch 对象进行追加。
Kafka 定义了 BufferPool 类以实现对 ByteBuffer 的复用,避免频繁创建和释放所带来的性能开销。不过需要注意的一点是,并不是所有的 ByteBuffer 对象都会被复用,BufferPool 对所管理的 ByteBuffer 对象的大小是有限制的(默认大小为 16KB,可以依据具体的应用场景适当调整 batch.size
配置进行修改),只有大小等于该值的 ByteBuffer 对象才会被 BufferPool 管理。
上述过程多次调用到 RecordAccumulator#tryAppend
方法,下面来看一下该方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private RecordAppendResult tryAppend (long timestamp, byte [] key, byte [] value, Callback callback, Deque<RecordBatch> deque) { RecordBatch last = deque.peekLast(); if (last != null ) { FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); if (future == null ) { last.close(); } else { return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false ); } } return null ; } public FutureRecordMetadata tryAppend (long timestamp, byte [] key, byte [] value, Callback callback, long now) { if (!recordsBuilder.hasRoomFor(key, value)) { return null ; } long checksum = this .recordsBuilder.append(timestamp, key, value); this .maxRecordSize = Math.max(this .maxRecordSize, Record.recordSize(key, value)); this .lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata( produceFuture, recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length); if (callback != null ) { thunks.add(new Thunk(callback, future)); } this .recordCount++; return future; }
上面过程最终调用 MemoryRecordsBuilder#append
方法将消息追加到 MemoryRecords 相应的位置进行存储,并返回消息的 CRC32 校验码,至于 MemoryRecords 存储消息的细节这里不再继续深入。消息追加成功之后,如果在发送消息时指定了 Callback 函数,那么这里会将其封装成 Thunk 类对象,至于其作用这里先不展开分析,等到后面分析 Sender 线程的执行过程时再一探究竟,这里初步猜测 Sender 线程在向集群投递完消息并收到来自集群的响应时会循环遍历 thunks 集合,并应用 Callback 对应的回调方法。
回到 KafkaProducer#doSend
方法,来看最后一步( 步骤 6 )。上面追加的过程会返回一个 RecordAppendResult 对象,该对象通过 RecordAppendResult#batchIsFull
和 RecordAppendResult#newBatchCreated
两个字段分别标记了追加过程中末端的 RecordBatch 是否已满,以及追加过程中是否有创建新的 RecordBatch 对象,如果这两个条件满足其中之一,则会唤醒 Sender 线程尝试向集群投递收集的消息数据。
最后提一点,RecordAccumulator 作为消息的收集器,其内存容量是有上限的,默认为 32MB(可以通过 buffer.memory
参数配置),当容量已满时调用 KafkaProducer#send
方法发送消息会被阻塞,当阻塞超过一定时间(默认为 60 秒,可以通过 max.block.ms
参数配置)则抛出异常。
投递待发送的消息
前面曾提出一个概念,即客户端发送消息的过程实际上是一个异步的过程,由 2 个线程协同执行,其中 1 个线程将待发送的消息写入缓冲区,另外 1 个线程(Sender 线程)负责定期定量将缓冲区中的数据投递给远端 Kafka 集群,并反馈投递结果。上面我们分析了过程 1,下面我们继续分析过程 2,即将缓存的消息发送给 Kafka 集群。
这一过程由 Sender 线程负责执行,前面的分析中曾多次唤醒过该线程,下面来看一下其实现,位于 Sender 类中,该类实现了 java.lang.Runnable
接口,其 Sender#run
方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public void run () { while (running) { try { this .run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: " , e); } } while (!forceClose && (this .accumulator.hasUnsent() || this .client.inFlightRequestCount() > 0 )) { try { this .run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: " , e); } } if (forceClose) { this .accumulator.abortIncompleteBatches(); } try { this .client.close(); } catch (Exception e) { log.error("Failed to close network client" , e); } }
由上述方法实现可知,Sender 线程在启动后会一直循环执行另外一个重载版本的 Sender#run
方法,其中包含了 Sender 线程的主要逻辑。如果客户端被关闭(一般都是调用 KafkaProducer#close
方法),在不是强制关闭的前提下,Sender 线程会继续处理本地未发送和已发送但未收到服务端确认的消息,如果是强制关闭(在调用 KafkaProducer#close
方法时允许指定超时等待时间,如果在既定时间内客户端仍未完成对缓存消息的处理,则会触发强制关闭机制),则会丢弃本地缓存的所有未发送的消息,最后关闭到 Kafka 集群的网络连接。
下面来看一下 Sender 线程的核心实现,即重载版本的 Sender#run
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 void run (long now) { Cluster cluster = metadata.fetch(); RecordAccumulator.ReadyCheckResult result = this .accumulator.ready(cluster, now); if (!result.unknownLeaderTopics.isEmpty()) { for (String topic : result.unknownLeaderTopics) this .metadata.add(topic); this .metadata.requestUpdate(); } Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this .client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this .client.connectionDelay(node, now)); } } Map<Integer, List<RecordBatch>> batches = this .accumulator.drain(cluster, result.readyNodes, this .maxRequestSize, now); if (guaranteeMessageOrder) { for (List<RecordBatch> batchList : batches.values()) { for (RecordBatch batch : batchList) this .accumulator.mutePartition(batch.topicPartition); } } List<RecordBatch> expiredBatches = this .accumulator.abortExpiredBatches(this .requestTimeout, now); long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (!result.readyNodes.isEmpty()) { log.trace("Nodes with data ready to send: {}" , result.readyNodes); pollTimeout = 0 ; } this .sendProduceRequests(batches, now); this .client.poll(pollTimeout, now); }
发送收集器 RecordAccumulator 中缓存的消息到 Kafka 集群的整体执行流程可以概括为如下 7 个步骤:
计算需要向哪些 broker 节点投递消息;
如果步骤 1 中发现一些 topic 分区的 Leader 副本所在 broker 节点失效,则需要标记更新本地缓存的集群元数据信息;
遍历处理步骤 1 中获取到的 broker 节点集合,基于 I/O 检测对应节点是否可用,如果不可用则剔除;
以 broker 节点 ID 为键,获取发往目标节点的消息集合;
如果需要对消息顺序进行强一致性保证,则需要缓存当前目标 topic 分区对象,防止同一时间往同一个 topic 分区发送多条处于未完成状态的消息;
处理本地已过期的消息,返回超时异常,并释放占据的空间;
发送消息到服务端,并处理服务端的响应。
下面就各个步骤展开说明,首先来看 步骤 1 ,该步骤用于计算需要向哪些节点投递消息,实现位于 RecordAccumulator#ready
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public ReadyCheckResult ready (Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set<String> unknownLeaderTopics = new HashSet<>(); boolean exhausted = this .free.queued() > 0 ; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this .batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<RecordBatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); synchronized (deque) { if (leader == null && !deque.isEmpty()) { unknownLeaderTopics.add(part.topic()); } else if (!readyNodes.contains(leader) && !muted.contains(part)) { RecordBatch batch = deque.peekFirst(); if (batch != null ) { boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; long waitedTimeMs = nowMs - batch.lastAttemptMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0 ); boolean full = deque.size() > 1 || batch.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { readyNodes.add(leader); } else { nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); }
整个计算的逻辑就是遍历我们之前缓存到收集器 RecordAccumulator 中的消息集合,并按照下面 5 个条件进行判定,如果满足其中一个则认为需要往目标节点投递消息:
当前 topic 名下的消息队列持有多个 RecordBatch,或者第 1 个 RecordBatch 已满。
当前 topic 分区等待重试的时间过长,如果是首次发送则无需校验重试等待时间。
当前 topic 分区下有其他线程在等待 BufferPool 分配空间,即本地缓存已满。
Producer 被关闭,需要立即投递剩余未完成的消息。
有线程正在等待 flush 操作完成,则需要立即投递消息,避免线程等待时间过长。
如果遍历过程中发现某个 topic 分区对应的 Leader 副本所在节点失效(对应的 topic 分区正在执行 Leader 副本选举,或者对应的 topic 已经失效),但是本地又缓存了发往该分区的消息,则需要标记当前本地缓存的集群元数据需要更新( 步骤 2 )。上面获取目标 broker 节点的过程是站在收集器 RecordAccumulator 的角度看的,对于一个节点是否可用,还需要从网络 I/O 的角度检查其连通性,这也是 步骤 3 所要做的工作,这一步基于 KafkaClient#ready
方法检查目标节点的是否连通,如果目标节点并未准备好接收请求,则需要从待请求节点集合中剔除。
知道了需要向哪些节点投递消息,接下来自然而然就需要获取发往每个节点的数据, 步骤 4 的实现位于 RecordAccumulator#drain
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) { return Collections.emptyMap(); } Map<Integer, List<RecordBatch>> batches = new HashMap<>(); for (Node node : nodes) { int size = 0 ; List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); List<RecordBatch> ready = new ArrayList<>(); int start = drainIndex = drainIndex % parts.size(); do { PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); if (!muted.contains(tp)) { Deque<RecordBatch> deque = this .getDeque(new TopicPartition(part.topic(), part.partition())); if (deque != null ) { synchronized (deque) { RecordBatch first = deque.peekFirst(); if (first != null ) { boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; if (!backoff) { if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) { break ; } else { RecordBatch batch = deque.pollFirst(); batch.close(); size += batch.sizeInBytes(); ready.add(batch); batch.drainedMs = now; } } } } } } this .drainIndex = (this .drainIndex + 1 ) % parts.size(); } while (start != drainIndex); batches.put(node.id(), ready); } return batches; }
上述方法的返回类型是 Map<Integer, List<RecordBatch>>
,其中 key 是目标节点的 ID,value 是本次待发往该节点的消息集合。为了防止饥饿,方法会轮询从当前 topic 的每个分区队列对头取数据,并记录每次轮询的偏移量,下次轮询即从该偏移量位置开始,以保证尽量的公平。
下面来看一下 步骤 5 ,这是客户端保证消息绝对有序的逻辑。在具体分析之前,我们先来看一个导致消息顺序错乱的场景。假设生产者发送了 2 条指向同一个目标 topic 分区的消息 A 和 B,但是 A 发送失败,B 却成功了,此时生产者会重发消息 A,结果就变成了 B 消息排在了 A 消息的前面。解决该问题的方法就是将参数 max.in.flight.requests.per.connection
参数设置为 1,以禁止生产者往同一个分区一次发送多条消息,不过这样会严重降低系统吞吐量,只有在对消息顺序有严格要求时才推荐这样做。步骤 5 的参数 guaranteeMessageOrder=true
对应着 max.in.flight.requests.per.connection=1
,客户端解决上述问题的实现方式也很简单,就是在本地缓存有处于发送中消息对应的目标 topic 分区对象,保证该分区上的消息在被正确响应之前不会再投递第 2 条消息。
下面继续来看 步骤 6 ,这一步会遍历收集器 RecordAccumulator 中缓存的 RecordBatch,并调用 RecordBatch#maybeExpire
方法检测当前 RecordBatch 是否过期,对于已经过期的 RecordBatch 会执行相应的 RecordBatch#done
方法(下一步中会对该方法展开说明),并释放占用的内存空间。
最后我们来看一下消息发送的过程( 步骤 7 ),位于 Sender#sendProduceRequests
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private void sendProduceRequests (Map<Integer, List<RecordBatch>> collated, long now) { for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet()) this .sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()); } private void sendProduceRequest (long now, int destination, short acks, int timeout, List<RecordBatch> batches) { Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size()); final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size()); for (RecordBatch batch : batches) { TopicPartition tp = batch.topicPartition; produceRecordsByPartition.put(tp, batch.records()); recordsByPartition.put(tp, batch); } ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition); RequestCompletionHandler callback = new RequestCompletionHandler() { @Override public void onComplete (ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; String nodeId = Integer.toString(destination); ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0 , callback); client.send(clientRequest, now); log.trace("Sent produce request to {}: {}" , nodeId, requestBuilder); }
这一步主要逻辑就是创建客户端请求 ClientRequest 对象,并通过 NetworkClient#send
方法将请求加入到网络 I/O 通道(KafkaChannel)中。同时将该对象缓存到 InFlightRequests 中,等接收到服务端响应时会通过缓存的 ClientRequest 对象调用对应的 callback 方法。最后调用 NetworkClient#poll
方法执行具体的网络请求和响应。
InFlightRequests 类的主要作用是缓存那些已经发送出去但是还未收到响应的请求,并支持控制对单个节点的最大未完成请求数(默认值为 5,可以通过 max.in.flight.requests.per.connection
参数进行配置,但是上限不允许超过 5 个)。
下面来看一下 NetworkClient#poll
方法的具体实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public List<ClientResponse> poll (long timeout, long now) { long metadataTimeout = metadataUpdater.maybeUpdate(now); try { this .selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O" , e); } long updatedNow = this .time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); this .handleAbortedSends(responses); this .handleCompletedSends(responses, updatedNow); this .handleCompletedReceives(responses, updatedNow); this .handleDisconnections(responses, updatedNow); this .handleConnections(); this .handleInitiateApiVersionRequests(updatedNow); this .handleTimedOutRequests(responses, updatedNow); for (ClientResponse response : responses) { try { response.onComplete(); } catch (Exception e) { log.error("Uncaught error in request completion:" , e); } } return responses; }
整个方法的执行流程可以概括为 4 个步骤:
检测是否需要更新本地缓存的集群元数据信息,如果需要则创建对应的 MetadataRequest 请求,并在下次 Selector#poll
操作时一并送出;
执行 Selector#poll
操作,向服务端发送网络请求;
处理服务端响应;
遍历应用注册的 RequestCompletionHandler#onComplete
方法。
首先来看更新本地缓存的集群元数据信息的过程( 步骤 1 ),前面曾多次提及到更新集群元数据的场景,而这些更新操作实际上都是标记集群元数据需要更新,真正执行更新的操作则发生在这里。实现位于 DefaultMetadataUpdater#maybeUpdate
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public long maybeUpdate (long now) { long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); long waitForMetadataFetch = this .metadataFetchInProgress ? requestTimeoutMs : 0 ; long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch); if (metadataTimeout > 0 ) { return metadataTimeout; } Node node = leastLoadedNode(now); if (node == null ) { log.debug("Give up sending metadata request since no node is available" ); return reconnectBackoffMs; } return this .maybeUpdate(now, node); } private long maybeUpdate (long now, Node node) { String nodeConnectionId = node.idString(); if (canSendRequest(nodeConnectionId)) { this .metadataFetchInProgress = true ; MetadataRequest.Builder metadataRequest; if (metadata.needMetadataForAllTopics()) { metadataRequest = MetadataRequest.Builder.allTopics(); } else { metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics())); } log.debug("Sending metadata request {} to node {}" , metadataRequest, node.id()); sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now); return requestTimeoutMs; } if (isAnyNodeConnecting()) { return reconnectBackoffMs; } if (connectionStates.canConnect(nodeConnectionId, now)) { log.debug("Initialize connection to node {} for sending metadata request" , node.id()); initiateConnect(node, now); return reconnectBackoffMs; } return Long.MAX_VALUE; }
方法首先会依据之前设置的标记,以及上次的更新时间决定是否需要更新集群元数据信息,如果需要则依据本地记录的已发往服务端的请求数目寻找集群中负载最小且可用的节点,并创建对应的 MetadataRequest 请求,但是这里的请求不是立即发出的,而是将请求包装成 ClientRequest 对象,并在下次 Selector#poll
操作时一并送出,也就是接下去即将执行的步骤 2。
步骤 2 是真正发送网络请求的地方,这里的请求是异步的,客户端在发出请求之后继续执行步骤 3。 步骤 3 的逻辑主要是为每一个 ClientRequest 请求构造对应的 ClientResponse 响应对象,这些响应对象有的是依据服务端的响应进行构造,有的则是在本地伪造,因为不是所有的请求都需要等待服务端的响应,也不是所有的请求都能得到服务端的响应。这一步的实现对应了一系列的 handle*
方法:
handleAbortedSends
handleCompletedSends
handleCompletedReceives
handleDisconnections
handleConnections
handleInitiateApiVersionRequests
handleTimedOutRequests
下面逐一来看一下相应方法的实现。
该方法的实现就是简单的将 NetworkClient#abortedSends
字段中记录的 ClientResponse 响应对象添加到结果集合中,并清空该字段。这些 ClientResponse 对象是在 NetworkClient#doSend
时添加的,添加的原因是本地请求与目标节点所支持的 API 版本不匹配。
该方法会遍历客户端已经发送成功的请求,对于那些不期望服务端响应的请求可以直接创建对应的 ClientResponse 响应对象,并添加到结果集合中。实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 private void handleCompletedSends (List<ClientResponse> responses, long now) { for (Send send : this .selector.completedSends()) { InFlightRequest request = this .inFlightRequests.lastSent(send.destination()); if (!request.expectResponse) { this .inFlightRequests.completeLastSent(send.destination()); responses.add(request.completed(null , now)); } } }
该方法会获取并解析服务端的响应结果,并依据响应类型分别处理。实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void handleCompletedReceives (List<ClientResponse> responses, long now) { for (NetworkReceive receive : this .selector.completedReceives()) { String source = receive.source(); InFlightRequest req = inFlightRequests.completeNext(source); AbstractResponse body = parseResponse(receive.payload(), req.header); log.trace("Completed receive from node {}, for key {}, received {}" , req.destination, req.header.apiKey(), body); if (req.isInternalRequest && body instanceof MetadataResponse) { metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); } else if (req.isInternalRequest && body instanceof ApiVersionsResponse) { this .handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body); } else { responses.add(req.completed(body, now)); } } }
如果当前是针对之前请求更新集群元数据信息的响应,则会调用 DefaultMetadataUpdater#handleCompletedMetadataResponse
方法解析响应内容,如果响应正常则会调用 Metadata#update
方法更新本地缓存的集群元数据信息。如果当前是针对请求更新本地 API 版本信息的响应,则会调用 NetworkClient#handleApiVersionsResponse
方法更新本地缓存的目标节点支持的 API 版本信息。对于其它类型的响应,则直接封装成 ClientResponse 对象添加到结果集合中。
该方法会调用 Selector#disconnected
方法获取断开连接的节点 ID 集合,并更新相应节点的连接状态为 DISCONNECTED
,同时会清空本地缓存的与该节点相关的数据,最终创建一个 disconnected 类型的 ClientResponse 对象添加到结果集合中。如果这一步确实发现了已断开的连接,则标记需要更新本地缓存的节点元数据信息。
该方法会调用 Selector#connected
方法获取连接正常的节点 ID 集合,如果当前节点是第一次建立连接,则需要获取节点支持的 API 版本信息,方法会将当前节点的连接状态设置为 CHECKING_API_VERSIONS
,并将节点 ID 添加到 NetworkClient#nodesNeedingApiVersionsFetch
集合中,对于其它节点,则更新相应连接状态为 READY
。
handleInitiateApiVersionRequests
该方法用于处理 NetworkClient#handleConnections
方法中标记的需要获取支持的 API 版本信息的节点,即记录到 NetworkClient#nodesNeedingApiVersionsFetch
集合中的节点。方法会遍历处理集合中的节点,并在判断目标节点允许接收请求的情况下,构建 ApiVersionsRequest 请求以获取目标节点支持的 API 版本信息,该请求会被包装成 ClientRequest 对象,并在下次 Selector#poll
操作时一并送出。
该方法会遍历缓存在 inFlightRequests 中已经超时的相关请求对应的节点集合,针对此类节点将其视作断开连接进行处理。方法会创建一个 disconnected 类型的 ClientResponse 对象添加到结果集合中,并标记需要更新本地缓存的集群元数据信息。
在完成了将各种类型请求对应的响应对象 ClientResponse 添加到结果集合中之后,会继续遍历该集合并应用 ClientResponse#onComplete
方法,该方法最终调用的是我们注册的 RequestCompletionHandler 对应的 RequestCompletionHandler#onComplete
方法。我们在分析 Sender#sendProduceRequest
方法时曾遇到过下面这一段代码:
1 2 3 4 5 6 RequestCompletionHandler callback = new RequestCompletionHandler() { @Override public void onComplete (ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } };
实际上在调用 ClientResponse#onComplete
方法时本质上也就是在调用 Sender#handleProduceResponse
方法,该方法所做的工作就是区分当前的响应类型,并针对每一种响应类型设置对应的参数并回调 Sender#completeBatch
方法,区别仅在于方法的 response 参数设置:
如果是 disconnected 类型的响应,则设置 response=new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION)
。
如果是 API 版本不匹配的响应,则设置 response=new ProduceResponse.PartitionResponse(Errors.INVALID_REQUEST)
。
对于其它响应类型,如果存在响应体则以响应体作为 response 参数;如果不存在响应体则设置 response=new ProduceResponse.PartitionResponse(Errors.NONE)
。
下面来看一下 Sender#completeBatch
方法的具体实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void completeBatch (RecordBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now) { Errors error = response.error; if (error != Errors.NONE && this .canRetry(batch, error)) { log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}" , correlationId, batch.topicPartition, retries - batch.attempts - 1 , error); this .accumulator.reenqueue(batch, now); } else { RuntimeException exception; if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { exception = new TopicAuthorizationException(batch.topicPartition.topic()); } else { exception = error.exception(); } batch.done(response.baseOffset, response.logAppendTime, exception); this .accumulator.deallocate(batch); } if (error.exception() instanceof InvalidMetadataException) { metadata.requestUpdate(); } if (guaranteeMessageOrder) { this .accumulator.unmutePartition(batch.topicPartition); } }
上述方法会判断当前响应是否异常且可以需要重试,如果是则将 RecordBatch 重新添加到收集器 RecordAccumulator 中,等待再次发送。如果是正常响应或不允许重试,则调用 RecordBatch#done
方法结束本次发送消息的过程,并将响应结果传递给用户,同时释放 RecordBatch 占用的空间。下面来看一下方法 RecordBatch#done
的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public void done (long baseOffset, long logAppendTime, RuntimeException exception) { log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}." , topicPartition, baseOffset, exception); if (completed.getAndSet(true )) { throw new IllegalStateException("Batch has already been completed" ); } produceFuture.set(baseOffset, logAppendTime, exception); for (Thunk thunk : thunks) { try { if (exception == null ) { RecordMetadata metadata = thunk.future.value(); thunk.callback.onCompletion(metadata, null ); } else { thunk.callback.onCompletion(null , exception); } } catch (Exception e) { log.error("Error executing user-provided callback on message for topic-partition '{}'" , topicPartition, e); } } produceFuture.done(); }
前面我们曾分析过在消息追加成功之后,如果在发送消息时指定了 Callback 回调函数,会将其封装成 Thunk 类对象,当时我们猜测 Sender 线程在向集群投递完消息并收到来自集群的响应时会循环遍历 thunks 集合,并应用 Callback 相应的回调方法,而上述方法的实现证实了我们的猜想。
方法中的变量 produceFuture 是一个 ProduceRequestResult 类型的对象,用于表示一次消息生产过程是否完成,该类基于 CountDownLatch 实现了类似 Future 的功能,在构造 ProduceRequestResult 对象时会创建一个大小为 1 的 CountDownLatch 对象,并在调用 ProduceRequestResult#done
方法时执行 CountDownLatch#countDown
操作,而 ProduceRequestResult#completed
方法判定消息发送是否完成的依据就是判定 CountDownLatch 对象值是否等于 0。
总结
本文我们介绍了 java 版本的 KafkaProducer 的使用,并深入分析了相关设计和实现。从执行流程上来说,Kafka 生产者运行机制在整体设计上还是比较简单和直观的,但不可否认在实现上也有很多需要注意的细节。Kafka 在老版本的 SDK 中默认使用同步的方式往服务端投递消息,因为采用异步的方式存在消息丢失的问题,直到 0.8.2.0 版本以后才修复了这一问题,并将异步提交设置为默认方式。
了解 KafkaProducer 的设计和实现能够帮助我们在实际开发中更好的使用 Kafka 生产者客户端,知晓如何能够保证消息的强顺序性,以及如何保证消息不丢失,甚至是利用其它编程语言自定义 SDK。下一篇,我们将继续分析消费者的运行机制。