温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何使用kafka connect将数据批量写到hdfs

发布时间:2021-12-09 14:03:28 来源:亿速云 阅读:172 作者:小新 栏目:云计算

小编给大家分享一下如何使用kafka connect将数据批量写到hdfs,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

 

kafka-connect是以单节点模式运行,即standalone。

一. 首先,先对kafka和kafka connect做一个简单的介绍

  kafka:Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。比较直观的解释就是其有一个生产者(producer)和一个消费者(consumer)。可以将kafka想象成一个数据容器,生产者负责发送数据到这个容器中,而消费者从容器中取出数据,在将数据做处理,如存储到hdfs。

  kafka connect:Kafka Connect是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出Kafka的连接器变得简单。即适合批量数据导入导出操作。

二. 下面将介绍如何用kafka connect将数据写入到hdfs中。包括在这个过程中可能碰到的一些问题说明。

首先启动kafka-connect:

bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties
这个命令后面两个参数,
  第一个是指定启动的模式,有分布式和单节点两种,这里是单节点。kafka自带,放于config目录下。
  第二个参数指向描述connector的属性的文件,可以有多个,这里只有一个connector用来写入到hdfs。需要自己创建。

接下来看看connector1.properties的内容,


name="test"    #该connector的名字
#将自己按connect接口规范编写的代码打包后放在kafka/libs目录下,再根据项目结构引用对应connector
connector.class=hdfs.HdfsSinkConnector
#Task是导入导出的具体实现,这里是指定多少个task来并行运行导入导出作业,由多线程实现。由于hdfs中一个文件每次只能又一个文件操作,所以这里只能是1
tasks.max=1 
#指定从哪个topic读取数据,这些其实是用来在connector或者task的代码中读取的。
topics=test
#指定key以那种方式转换,需和Producer发送方指定的序列化方式一致
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.json.JsonConverter     #同上
hdfs.url=hdfs://127.0.0.1:9000  #hdfs的url路径,在Connector中会被读取
hdfs.path=/test/file  #hdfs文件路径,同样Connector中被读取

key.converter.schemas.enable=true  #稍后介绍,可以true也可以false,影响传输格式
value.converter.schemas.enable=true  #稍后介绍,可以true也可以false

三. 接下来看代码,connect主要是导入导出两个概念,导入是source,导出时Sink。这里只使用Sink,不过Source和Sink的实现其实基本相同。


实现Sink其实不难,实现对应的接口,即SinkConnector和SinkTask两个接口,再打包放到kafka/libs目录下即可。其中SinkConnector只有一个,而Task可以有多
先是Connector
public class HdfsSinkConnector extends SinkConnector {
    //这两项为配置hdfs的urlh和路径的配置项,需要在connector1.properties中指定
    public static final String HDFS_URL = "hdfs.url";
    public static final String HDFS_PATH = "hdfs.path";
    private static final ConfigDef CONFIG_DEF = new ConfigDef()
            .define(HDFS_URL, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs url")
            .define(HDFS_PATH, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs path");
    private String hdfsUrl;
    private String hdfsPath;
    @Override
    public String version() {
        return AppInfoParser.getVersion();
    }
  //start方法会再初始的时候执行一次,这里主要用于配置    @Override    public void start(Map<String, String> props) {        hdfsUrl = props.get(HDFS_URL);        hdfsPath = props.get(HDFS_PATH);    }   //这里指定了Task的类    @Override    public Class<? extends Task> taskClass() {        return HdfsSinkTask.class;    }   //用于配置Task的config,这些都是会在Task中用到    @Override    public List<Map<String, String>> taskConfigs(int maxTasks) {        ArrayList<Map<String, String>> configs = new ArrayList<>();        for (int i = 0; i < maxTasks; i++) {            Map<String, String> config = new HashMap<>();            if (hdfsUrl != null)                config.put(HDFS_URL, hdfsUrl);            if (hdfsPath != null)                config.put(HDFS_PATH, hdfsPath);            configs.add(config);        }        return configs;    }   //关闭时的操作,一般是关闭资源。    @Override    public void stop() {        // Nothing to do since FileStreamSinkConnector has no background monitoring.    }    @Override    public ConfigDef config() {        return CONFIG_DEF;    } }

接下来是Task

public class HdfsSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(HdfsSinkTask.class);

    private String filename;

    public static String hdfsUrl;
    public static String hdfsPath;
    private Configuration conf;
    private FSDataOutputStream os;
    private FileSystem hdfs;


    public HdfsSinkTask(){

    }

    @Override
    public String version() {
        return new HdfsSinkConnector().version();
    }
  //Task开始会执行的代码,可能有多个Task,所以每个Task都会执行一次
    @Override
    public void start(Map<String, String> props) {
        hdfsUrl = props.get(HdfsSinkConnector.HDFS_URL);
        hdfsPath = props.get(HdfsSinkConnector.HDFS_PATH);
        System.out.println("----------------------------------- start--------------------------------");

        conf = new Configuration();
        conf.set("fs.defaultFS", hdfsUrl);
        //这两个是与hdfs append相关的设置
        conf.setBoolean("dfs.support.append", true);
        conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
        try{
            hdfs = FileSystem.get(conf);
//            connector.hdfs = new Path(HDFSPATH).getFileSystem(conf);
            os = hdfs.append(new Path(hdfsPath));
        }catch (IOException e){
            System.out.println(e.toString());
        }

    }
  //核心操作,put就是将数据从kafka中取出,存放到其他地方去
    @Override
    public void put(Collection<SinkRecord> sinkRecords) {
        for (SinkRecord record : sinkRecords) {
            log.trace("Writing line to {}: {}", logFilename(), record.value());
            try{
                System.out.println("write info------------------------" + record.value().toString() + "-----------------");
                os.write((record.value().toString()).getBytes("UTF-8"));
                os.hsync();
            }catch(Exception e){
                System.out.print(e.toString());
            }
        }
    }

    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
        try{
            os.hsync();
        }catch (Exception e){
            System.out.print(e.toString());
        }

    }
  //同样是结束时候所执行的代码,这里用于关闭hdfs资源    @Override    public void stop() {        try {            os.close();        }catch(IOException e){            System.out.println(e.toString());        }    }    private String logFilename() {        return filename == null ? "stdout" : filename;    } }

这里重点提一下,因为在connector1.propertise中设置了key.converter=org.apache.kafka.connect.converters.ByteArrayConverter,所以不能用命令行形式的
producer发送数据,而是要用程序的方式,并且在producer总也要设置key的序列化形式为org.apache.kafka.common.serialization.ByteArraySerializer。
编码完成,先用idea以开发程序与依赖包分离的形式打包成jar包,然后将程序对应的jar包(一般就是“项目名.jar”)放到kafka/libs目录下面,这样就能被找到。
四. 接下来对这个过程的问题做一个汇总。
1.在connector1.properties中的key.converter.schemas.enable=false和value.converter.schemas.enable=false的问题。
这个选项默认在connect-standalone.properties中是true的,这个时候发送给topic的Json格式是需要使用avro格式,具体情况可以百度,这里给出一个样例。
{
    "schema": {
        "type": "struct",
        "fields": [{
            "type": "int32",
            "optional": true,
            "field": "c1"
        }, {
            "type": "string",
            "optional": true,
            "field": "c2"
        }, {
            "type": "int64",
            "optional": false,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "create_ts"
        }, {
            "type": "int64",
            "optional": false,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "update_ts"
        }],
        "optional": false,
        "name": "foobar"
    },
    "payload": {
        "c1": 10000,
        "c2": "bar",
        "create_ts": 1501834166000,
        "update_ts": 1501834166000
    }
}

主要就是schema和payload这两个,不按照这个格式会报错如下

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

   at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)

如果想发送普通的json格式而不是avro格式的话,很简单key.converter.schemas.enable和value.converter.schemas.enable设置为false就行。这样就能发送普通的json格式数据。

2.在启动的过程中出现各种各样的java.lang.ClassNotFoundException。

在启动connector的时候,一开始总是会报各个各样的ClassNotFoundException,不是这个包就是那个包,查找问题一直说要么缺少包要么是包冲突。这个是什么原因呢?

其实归根结底还是依赖冲突的问题,因为kafka程序自定义的类加载器加载类的目录是在kafka/libs中,而写到hdfs需要hadoop的包。

我一开始的做法是将hadoop下的包路径添加到CLASSPATH中,这样子问题就来了,因为kafka和hadoop的依赖包是有冲突的,比如hadoop是guava-11.0.2.jar,而kafka是guava-20.0.jar,两个jar包版本不同,而我们是在kafka程序中调用hdfs,所以当jar包冲突时应该优先调用kafka的。但是注意kafka用的是程序自定义的类加载器,其优先级是低于CLASSPATH路径下的类的,就是说加载类时会优先加载CLASSPATH下的类。这样子就有问题了。

我的解决方案时将kafka和hadoop加载的jar包路径都添加到CLASSPATH中,并且kafka的路径写在hadoop前面,这样就可以启动connector成功。


以上是“如何使用kafka connect将数据批量写到hdfs”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI