kafka拦截器实现队列插队效果
前言突然出现一个任务需要对kafka处理的数据进行插队操作内心小崩溃。。。研究了一下还是可以使用拦截器进行实现这样的效果的。拦截器Interceptor是早在Kafka 0.10.0.0中就已经引入的一个功能Kafka一共有两种拦截器生产者拦截器和消费者拦截器。ProducerInterceptor先看代码代码语言javascriptAI代码解释Slf4j Service public class MyProducerInterceptor implements ProducerInterceptor { //在发送broke之前的一个操作可以对数据进行加工处理或者进行topic pationtion重新指向 Override public ProducerRecord onSend(ProducerRecord producerRecord) { return null; } /** * KafkaProducer 会在消息被应答Acknowledgement之前或消息发送失败时调用生产者拦截器的 onAcknowledgement方法 * * **/ Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { } //close方法主要用于在关闭拦截器时执行一些资源的清理工作。 Override public void close() { } Override public void configure(MapString, ? map) { } }onSend就是在发送之前对数据的处理onAcknowledgement接收kafka服务端接受到消息响应的处理代码语言javascriptAI代码解释服务端应答的模式可以通过acks进行设置: 1 0 -1 1 : 在生产者发送消息之后从节点保存完数据就会进行响应如果消息无法写入leader副本比如在leader 副本崩溃、重新选举新的 leader 副本的过程中那么生产者就会收到一个错误的响应为了避免消息丢失生产者可以选择重发消息。如果消息写入leader副本并返回成功响应给生产者且在被其他follower副本拉取之前leader副本崩溃那么此时消息还是会丢失因为新选举的leader副本中并没有这条对应的消息。acks设置为1是消息可靠性和吞吐量之间的折中方案。 0 : acks0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入Kafka的过程中出现某些异常导致Kafka并没有收到这条消息那么生产者也无从得知消息也就丢失了。在其他配置环境相同的情况下acks 设置为 0 可以达到最大的吞吐量。 -1: acks-1或acksall。生产者在消息发送之后需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下acks 设置为-1all可以达到最强的可靠性。但这并不意味着消息就一定可靠因为ISR中可能只有leader副本这样就退化成了acks1的情况close方法主要用于在关闭拦截器时执行一些资源的清理工作。configure可以获取到生产者的配置ConsumerInterceptor代码语言javascriptAI代码解释AtomicInteger atomicIntegernew AtomicInteger(0); Override public ConsumerRecordsString, String onConsume(ConsumerRecordsString, String consumerRecords) { return new ConsumerRecords( ); } Override public void onCommit(MapTopicPartition, OffsetAndMetadata offsets) { offsets.forEach((k,v)-{ System.out.println(k---v); }); } private String getJsonTrackingMessage(ConsumerRecordString, String record) { return record.value(); } //拦截器关闭做一些操作 Override public void close() { } Override public void configure(MapString, ? configs) { }onConsume在consumer poll 返回之前的一个操作一般我们如果没有设置MAX_POLL_RECORDS_CONFIG这个配置在0.9之后才有用 那么默认poll返回的数据是500条注意这个500也许来自多个分区因为一个消费者可能被分配到多个分区。我们可能对poll到的数据进行重新修改或者过滤然后返回一个新的ConsumerRecords注意poll之后的commit是对服务端提交消费数据的偏移量如果修改或者新增数据需要注意如果修改了offset数据是否会造成重复消费的问题onCommit在commit成功之后这个方法我们可以获取到消费数据的具体偏移量onClose和之前的生产者一样作用实现方案实现一个消费者拦截器并且重新构造返回的消费数据如果是新加入的消费数据就不进行消费如下面onConsume测试代码代码语言javascriptAI代码解释int mark atomicInteger.incrementAndGet(); MapTopicPartition, ListConsumerRecordString, String newRecords new HashMap(); ConsumerRecord consumerRecord new ConsumerRecord(xpp_test,0,0,null,1000); for (TopicPartition tp : records.partitions()) { ListConsumerRecordString, String tpRecords records.records(tp); ListConsumerRecordString, String newTpRecords new ArrayList(); newTpRecords.add(consumerRecord); newTpRecords.addAll(tpRecords); if (!newTpRecords.isEmpty()) { newRecords.put(tp, newTpRecords); } } return new ConsumerRecords(newRecords);