kafka版本:kafka_2.10-0.10.1.0
spring版本:spring4.3
Kafka和spring集成的支持类库,spring和kafka通信监听
1<dependency> 2<groupId>org.springframework.integration</groupId> 3<artifactId>spring-integration-kafka</artifactId> 4<version>1.3.0.RELEASE</version> 5</dependency>kafka发送消息以及接受消息使用的类库
1<dependency> 2<groupId>org.apache.kafka</groupId> 3<artifactId>kafka-clients</artifactId> 4<version>0.10.1.0</version> 5</dependency>使用高版本是因为低版本无法支持kafka监听,spring和kafka集成不好
1<dependency> 2<groupId>org.springframework</groupId> 3<artifactId>spring-webmvc</artifactId> 4<version>4.3.0.RELEASE</version> 5</dependency>kafka自带监听器,依赖于spring,所以需要和pring-integration-kafka结合使用
1<dependency> 2<groupId>org.springframework.kafka</groupId> 3<artifactId>spring-kafka</artifactId> 4<version>1.0.0.RC1</version> 5</dependency>1.如果你的topic没有设置名称,按照默认的topic的名字生成对应的数据文件夹。
2.producerListener用来判断kafka发送数据是否成功以及发送反馈信息。
1<?xmlversion="1.0"encoding="UTF-8"?> 2<beansxmlns="http://www.springframework.org/schema/beans" 3xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context" 4xsi:schemaLocation="http://www.springframework.org/schema/beans 5http://www.springframework.org/schema/beans/spring-beans.xsd 6http://www.springframework.org/schema/context 7http://www.springframework.org/schema/context/spring-context.xsd"> 8 9<!--定义producer的参数--> 10<beanid="producerProperties"class="java.util.HashMap"> 11<constructor-arg> 12<map> 13<entrykey="bootstrap.servers"value="localhost:7000"/> 14<entrykey="group.id"value="0"/> 15<entrykey="retries"value="1"/> 16<entrykey="batch.size"value="16384"/> 17<entrykey="linger.ms"value="1"/> 18<entrykey="buffer.memory"value="33554432"/> 19<entrykey="key.serializer" 20value="org.apache.kafka.common.serialization.StringSerializer"/> 21<entrykey="value.serializer" 22value="org.apache.kafka.common.serialization.StringSerializer"/> 23</map> 24</constructor-arg> 25</bean> 26 27<!--创建kafkatemplate需要使用的producerfactorybean--> 28<beanid="producerFactory" 29class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 30<constructor-arg> 31<refbean="producerProperties"/> 32</constructor-arg> 33</bean> 34 35<!--创建kafkatemplatebean,使用的时候,只需要注入这个bean,即可使用template的send消息方法--> 36<beanid="KafkaTemplate"class="org.springframework.kafka.core.KafkaTemplate"> 37<constructor-argref="producerFactory"/> 38<constructor-argname="autoFlush"value="true"/> 39<propertyname="defaultTopic"value="defaultTopic"/> 40<propertyname="producerListener"ref="producerListener"/> 41</bean> 42 43<beanid="producerListener"class="com.git.kafka.producer.KafkaProducerListener"/> 44</beans>1.使用kafka的listener进行消息消费监听,如果有消费消息进入会自动调用OnMessage方法进行消息消费以及后续业务处理。
2.如果要配置多个topic,需要创建新的消费者容器,然后统一指向listner的消息处理类,统一让这个类进行后续业务处理。
1<?xmlversion="1.0"encoding="UTF-8"?> 2<beansxmlns="http://www.springframework.org/schema/beans" 3xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4xmlns:context="http://www.springframework.org/schema/context" 5xsi:schemaLocation="http://www.springframework.org/schema/beans 6http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 7http://www.springframework.org/schema/tx 8http://www.springframework.org/schema/tx/spring-tx-3.0.xsd 9http://www.springframework.org/schema/jee 10http://www.springframework.org/schema/jee/spring-jee-3.0.xsd 11http://www.springframework.org/schema/context 12http://www.springframework.org/schema/context/spring-context-3.0.xsd"> 13 14 15<!--定义consumer的参数--> 16<beanid="consumerProperties"class="java.util.HashMap"> 17<constructor-arg> 18<map> 19<entrykey="bootstrap.servers"value="127.0.0.1:7000"/> 20<entrykey="group.id"value="0"/> 21<entrykey="enable.auto.commit"value="false"/> 22<entrykey="auto.commit.interval.ms"value="1000"/> 23<entrykey="session.timeout.ms"value="15000"/> 24<entrykey="key.deserializer"value="org.apache.kafka.common.serialization.StringDeserializer"/> 25<entrykey="value.deserializer"value="org.apache.kafka.common.serialization.StringDeserializer"/> 26</map> 27</constructor-arg> 28</bean> 29 30<!--创建consumerFactorybean--> 31<beanid="consumerFactory"class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> 32<constructor-arg> 33<refbean="consumerProperties"/> 34</constructor-arg> 35</bean> 36 37<!--实际执行消息消费的类--> 38<beanid="messageListernerConsumerService"class="com.git.kafka.consumer.KafkaConsumerServer"/> 39 40<!--消费者容器配置信息--> 41<beanid="containerProperties_trade"class="org.springframework.kafka.listener.config.ContainerProperties"> 42<constructor-argvalue="order_test_topic"/> 43<propertyname="messageListener"ref="messageListernerConsumerService"/> 44</bean> 45<beanid="containerProperties_other"class="org.springframework.kafka.listener.config.ContainerProperties"> 46<constructor-argvalue="other_test_topic"/> 47<propertyname="messageListener"ref="messageListernerConsumerService"/> 48</bean> 49 50<!--创建messageListenerContainerbean,使用的时候,只需要注入这个bean--> 51<beanid="messageListenerContainer_trade"class="org.springframework.kafka.listener.KafkaMessageListenerContainer" 52init-method="doStart"> 53<constructor-argref="consumerFactory"/> 54<constructor-argref="containerProperties_trade"/> 55</bean> 56 57<beanid="messageListenerContainer_other"class="org.springframework.kafka.listener.KafkaMessageListenerContainer" 58init-method="doStart"> 59<constructor-argref="consumerFactory"/> 60<constructor-argref="containerProperties_other"/> 61</bean> 62 63</beans>constant.java//常量类
1packagecom.git.kafka.constant; 2 3/** 4*kafkaMessageConstant 5*@authorwangb 6* 7*/ 8publicclassKafkaMesConstant{ 9 10publicstaticfinalStringSUCCESS_CODE="00000"; 11publicstaticfinalStringSUCCESS_MES="成功"; 12 13/*kakfa-code*/ 14publicstaticfinalStringKAFKA_SEND_ERROR_CODE="30001"; 15publicstaticfinalStringKAFKA_NO_RESULT_CODE="30002"; 16publicstaticfinalStringKAFKA_NO_OFFSET_CODE="30003"; 17 18/*kakfa-mes*/ 19publicstaticfinalStringKAFKA_SEND_ERROR_MES="发送消息超时,联系相关技术人员"; 20publicstaticfinalStringKAFKA_NO_RESULT_MES="未查询到返回结果,联系相关技术人员"; 21publicstaticfinalStringKAFKA_NO_OFFSET_MES="未查到返回数据的offset,联系相关技术人员"; 22 23 24}KafkaConsumerServer.java//消费者监听
1packagecom.git.kafka.consumer; 2 3importorg.apache.kafka.clients.consumer.ConsumerRecord; 4importorg.slf4j.Logger; 5importorg.slf4j.LoggerFactory; 6importorg.springframework.kafka.listener.MessageListener; 7 8/** 9*kafka监听器启动 10*自动监听是否有消息需要消费 11*@authorwangb 12* 13*/ 14publicclassKafkaConsumerServerimplementsMessageListener<String,String>{ 15protectedfinalLoggerLOG=LoggerFactory.getLogger("kafkaConsumer"); 16/** 17*监听器自动执行该方法 18*消费消息 19*自动提交offset 20*执行业务代码 21*(highlevelapi不提供offset管理,不能指定offset进行消费) 22*/ 23@Override 24publicvoidonMessage(ConsumerRecord<String,String>record){ 25LOG.info("=============kafkaConsumer开始消费============="); 26Stringtopic=record.topic(); 27Stringkey=record.key(); 28Stringvalue=record.value(); 29longoffset=record.offset(); 30intpartition=record.partition(); 31LOG.info("-------------topic:"+topic); 32LOG.info("-------------value:"+value); 33LOG.info("-------------key:"+key); 34LOG.info("-------------offset:"+offset); 35LOG.info("-------------partition:"+partition); 36LOG.info("~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~"); 37} 38 39}kafkaProducerListener.java//生产者监听-打印日志
packagecom.git.kafka.producer; importorg.apache.kafka.clients.producer.RecordMetadata; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.kafka.support.ProducerListener; /** *kafkaProducer监听器,在producer配置文件中开启 *@authorwangb * */ @SuppressWarnings("rawtypes") publicclassKafkaProducerListenerimplementsProducerListener{ protectedfinalLoggerLOG=LoggerFactory.getLogger("kafkaProducer"); /** *发送消息成功后调用 */ @Override publicvoidonSuccess(Stringtopic,Integerpartition,Objectkey, Objectvalue,RecordMetadatarecordMetadata){ LOG.info("==========kafka发送数据成功(日志开始)=========="); LOG.info("----------topic:"+topic); LOG.info("----------partition:"+partition); LOG.info("----------key:"+key); LOG.info("----------value:"+value); LOG.info("----------RecordMetadata:"+recordMetadata); LOG.info("~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~"); } /** *发送消息错误后调用 */ @Override publicvoidonError(Stringtopic,Integerpartition,Objectkey, Objectvalue,Exceptionexception){ LOG.info("==========kafka发送数据错误(日志开始)=========="); LOG.info("----------topic:"+topic); LOG.info("----------partition:"+partition); LOG.info("----------key:"+key); LOG.info("----------value:"+value); LOG.info("----------Exception:"+exception); LOG.info("~~~~~~~~~~kafka发送数据错误(日志结束)~~~~~~~~~~"); exception.printStackTrace(); } /** *方法返回值代表是否启动kafkaProducer监听器 */ @Override publicbooleanisInterestedInSuccess(){ LOG.info("///kafkaProducer监听器启动///"); returntrue; } }KafkaProducerServer.java//生产者
packagecom.git.kafka.producer; importjava.util.HashMap; importjava.util.Map; importjava.util.Random; importjava.util.concurrent.ExecutionException; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.kafka.core.KafkaTemplate; importorg.springframework.kafka.support.SendResult; importorg.springframework.stereotype.Component; importorg.springframework.util.concurrent.ListenableFuture; importcom.alibaba.fastjson.JSON; importcom.git.kafka.constant.KafkaMesConstant; /** *kafkaProducer模板 *使用此模板发送消息 *@authorwangb * */ @Component publicclassKafkaProducerServer{ @Autowired privateKafkaTemplate<String,String>kafkaTemplate; /** *kafka发送消息模板 *@paramtopic主题 *@paramvaluemessageValue *@paramifPartition是否使用分区0是\1不是 *@parampartitionNum分区数如果是否使用分区为0,分区数必须大于0 *@paramrole角色:bbcapperp... */ publicMap<String,Object>sndMesForTemplate(Stringtopic,Objectvalue,StringifPartition, IntegerpartitionNum,Stringrole){ Stringkey=role+"-"+value.hashCode(); StringvalueString=JSON.toJSONString(value); if(ifPartition.equals("0")){ //表示使用分区 intpartitionIndex=getPartitionIndex(key,partitionNum); ListenableFuture<SendResult<String,String>>result=kafkaTemplate.send(topic,partitionIndex,key,valueString); Map<String,Object>res=checkProRecord(result); returnres; }else{ ListenableFuture<SendResult<String,String>>result=kafkaTemplate.send(topic,key,valueString); Map<String,Object>res=checkProRecord(result); returnres; } } /** *根据key值获取分区索引 *@paramkey *@parampartitionNum *@return */ privateintgetPartitionIndex(Stringkey,intpartitionNum){ if(key==null){ Randomrandom=newRandom(); returnrandom.nextInt(partitionNum); } else{ intresult=Math.abs(key.hashCode())%partitionNum; returnresult; } } /** *检查发送返回结果record *@paramres *@return */ @SuppressWarnings("rawtypes") privateMap<String,Object>checkProRecord(ListenableFuture<SendResult<String,String>>res){ Map<String,Object>m=newHashMap<String,Object>(); if(res!=null){ try{ SendResultr=res.get();//检查result结果集 /*检查recordMetadata的offset数据,不检查producerRecord*/ LongoffsetIndex=r.getRecordMetadata().offset(); if(offsetIndex!=null&&offsetIndex>=0){ m.put("code",KafkaMesConstant.SUCCESS_CODE); m.put("message",KafkaMesConstant.SUCCESS_MES); returnm; }else{ m.put("code",KafkaMesConstant.KAFKA_NO_OFFSET_CODE); m.put("message",KafkaMesConstant.KAFKA_NO_OFFSET_MES); returnm; } }catch(InterruptedExceptione){ e.printStackTrace(); m.put("code",KafkaMesConstant.KAFKA_SEND_ERROR_CODE); m.put("message",KafkaMesConstant.KAFKA_SEND_ERROR_MES); returnm; }catch(ExecutionExceptione){ e.printStackTrace(); m.put("code",KafkaMesConstant.KAFKA_SEND_ERROR_CODE); m.put("message",KafkaMesConstant.KAFKA_SEND_ERROR_MES); returnm; } }else{ m.put("code",KafkaMesConstant.KAFKA_NO_RESULT_CODE); m.put("message",KafkaMesConstant.KAFKA_NO_RESULT_MES); returnm; } } }KafkaProducerTest.java//kafka生产者测试(消费者使用spring启动监听,自动执行onMessage方法)
packagecom.git.test; importjava.util.Map; importcom.git.kafka.producer.KafkaProducerServer; publicclassKafkaProducerTest{ publicstaticvoidmain(String[]args){ KafkaProducerServerkafkaProducer=newKafkaProducerServer(); Stringtopic="orderTopic"; Stringvalue="test"; StringifPartition="0"; IntegerpartitionNum=3; Stringrole="test";//用来生成key Map<String,Object>res=kafkaProducer.sndMesForTemplate (topic,value,ifPartition,partitionNum,role); System.out.println("测试结果如下:==============="); Stringmessage=(String)res.get("message"); Stringcode=(String)res.get("code"); System.out.println("code:"+code); System.out.println("message:"+message); } }项目地址:https://git.oschina.net/wsmd/kafka-0.10-demo
本文内容总结:准备工作,配置文件,pom文件配置(也可以直接下载jar包),producer配置,consumer配置,applicationContext配置,具体实现,具体项目代码,
原文链接:https://www.cnblogs.com/wangb0402/p/6187796.html