温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何解析Kafka 1.0.0 多消费者示例

发布时间:2021-12-15 16:01:37 来源:亿速云 阅读:137 作者:柒染 栏目:大数据

如何解析Kafka 1.0.0 多消费者示例,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

package kafka.demo;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
 * 
 *  <p>Description: kafka 1.0.0</p> 
 * @author guangshihao
 * @date 2018年9月19日 
 *
 */
public class KafkaProduderDemo {
	public static void main(String[] args) {
		Map<String,Object> props = new HashMap<>();
		/*
         * acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
		 * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。
		 * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
		 * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。
		 * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。
		 * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
		 * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。
		 * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
		 */
		props.put("acks", "1");
		//配置默认的分区方式
		props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
		//配置topic的序列化类
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		//配置value的序列化类
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*
		 * kafka broker对应的主机,格式为host1:port1,host2:port2
		 */
		props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
		//topic
		String topic = "test7";
		KafkaProducer< String, String> producer = new KafkaProducer< String, String>(props);
		for(int i = 1 ;i <= 100 ; i++) {
			String line = i+" this is a test ";
			ProducerRecord<String,String> record = new ProducerRecord<String,String>(topic,line );
			producer.send(record);
		}
		producer.close();
	}
}
package kafka.demo;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
public class MutilConsumerThread implements Runnable{
	private AtomicBoolean closed = new AtomicBoolean(false);
	KafkaConsumer<String, String> consumer = null;
	String topic = null;
	public MutilConsumerThread(KafkaConsumer<String, String> consumer,List<String> topic) {
		this.consumer=consumer;
		consumer.subscribe(topic);
	}
	public void run() {
		try{
			while(!closed.get()) {
				ConsumerRecords<String, String> records = consumer.poll(1000);
				for(ConsumerRecord<String, String> record: records) {
					//一组consumer的时候每个partition对应的线程是固定的
					System.out.println("Thread-Name:"+Thread.currentThread().getName()+"  "+"partition:"+record.partition()+"  "+record.value());
				}
			}
			
		}catch(WakeupException e ) {
			if(!closed.get()) throw e;
		}finally {
			consumer.close();
		}
	}
	
	public void shutdown() {
		closed.set(true);
		consumer.wakeup();
	}
}
package kafka.demo;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class MutiConsumerTest {
	public static void main(String[] args) throws InterruptedException {
		Properties props = new Properties();
		props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
		props.put("group.id", "group_test7");
		//配置topic的序列化类
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		//配置value的序列化类
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		//自动同步offset
        props.put("enable.auto.commit","true");
        //自动同步offset的时间间隔
        props.put("auto.commit.intervals.ms", "2000");
        //当在zookeeper中发现要消费的topic没有或者topic的offset不合法时自动设置为最小值,可以设的值为 latest, earliest, none,默认为largest
        props.put("auto.offset.reset", "earliest ");
        String topic = "test7";
        List<MutilConsumerThread> consumers = new ArrayList<>();
        ExecutorService es = Executors.newFixedThreadPool(3);
        for(int i = 0 ;i<=2;i++) {
        	KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        	MutilConsumerThread cThread = new MutilConsumerThread(consumer,Arrays.asList(topic));
        	consumers.add(cThread);
        	es.submit(cThread);
        }
        
        //Thread.sleep(1000L);
        /* 这个方法的意思就是在JVM中增加一个关闭的钩子,当JVM关闭的时候,
              会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,
          JVM才会关闭。所以这些钩子可以在JVM关闭的时候进行内存清理、对象销毁等操作。*/
        Runtime.getRuntime().addShutdownHook(new Thread() {
			@Override
			public void run() {
				for(MutilConsumerThread consumer :consumers ) {
					consumer.shutdown();
				}
			}
        	
        });
        
	}
}

看完上述内容,你们掌握如何解析Kafka 1.0.0 多消费者示例的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI