温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Flink开发怎样进行实时处理应用程序

发布时间:2021-10-20 16:29:21 来源:亿速云 阅读:135 作者:柒染 栏目:大数据

本篇文章为大家展示了Flink开发怎样进行实时处理应用程序,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

使用Flink + java实现需求

环境

JDK:1.8

Maven:3.6.1(最低Maven 3.0.4

使用上一节中的springboot-flink-train项目

开发步骤

第一步:创建流处理上下文环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

第二步:读取数据,使用socket流方式读取数据

DataStreamSource<String> text = env.socketTextStream("192.168.152.45", 9999);

第三步:transform

        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();

这里我们使用逗号分隔,然后跟批处理不同的是,这里使用keyBy(0),而不是groupBy(0)。timewindow表示每隔多久执行一次。

第四步:执行

env.execute("StreamingWCJavaApp");

整体代码如下:

/**
 * 使用Java API来开发Flink的实时处理应用程序
 * wc统计的数据源自socket
 */
public class StreamingWCJava02App {

    public static void main(String[] args) throws Exception {

        // 获取参数
        int port;
        try{
            ParameterTool tool = ParameterTool.fromArgs(args);
            port = tool.getInt("port");
        } catch (Exception e) {
            System.out.println("端口未设置, 使用默认端口9999");
            port = 9999;
        }


        // step1: 获取流处理上下文环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // step2: 读取数据
        DataStreamSource<String> text = env.socketTextStream("192.168.152.45", port);
        // step3: transform
        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();
        env.execute("StreamingWCJavaApp");
    }

}

运行

首先在192.168.152.45上运行命令

nc -l 9999

然后在运行main方法。在192.168.152.45的nc上输入

abc,def,abc,ddd

在idea控制台输出如下:

4> (abc,2)
1> (def,1)
4> (ddd,1)

这个前面的"4>"表示并行度。我们可以设置setParallelism(1)来忽略这个问题。如下所示:

        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);

这样控制台的打印结果如下:

(abc,2)
(ddd,1)
(def,1)

这样一个简单的demo就成功了!

重构代码

上面的代码中localhost与port需要用参数传递进来。

代码如下:

        // 获取参数
        int port;
        try{
            ParameterTool tool = ParameterTool.fromArgs(args);
            port = tool.getInt("port");
        } catch (Exception e) {
            System.out.println("端口未设置, 使用默认端口9999");
            port = 9999;
        }

使用Flink提供的ParameterTool来接收参数。

我们在运行时就可以指定参数列表了,其中的key必须以“-”或者“--”开头。

在运行时,配置参数:

Flink开发怎样进行实时处理应用程序

这样运行就可以从外界传递参数了

使用Flink + Scala实现需求

接下来使用Scala方式实现,在项目springboot-flink-train-scala中新建StreamingWCScalaApp,内容如下:

/**
  * 使用Scala开发Flink的实时处理应用程序
  */
object StreamingWCScalaApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 引入隐式转换
    import org.apache.flink.api.scala._

    val text = env.socketTextStream("192.168.152.45", 9999)
    text.flatMap(_.split(","))
        .map((_,1))
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1)
        .print()
        .setParallelism(1)

    env.execute("StreamingWCScalaApp");
  }
}

这种方式比java实现更加简洁。

上述内容就是Flink开发怎样进行实时处理应用程序,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI