flink使用demo

news/2025/2/25 14:19:53

1、添加不同数据源

package com.baidu.keyue.deepsight.memory.test;

import com.baidu.keyue.deepsight.memory.WordCount;
import com.baidu.keyue.deepsight.memory.WordCountData;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class EnvDemo {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(RestOptions.BIND_PORT, "8082");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        // 设置流处理还是批处理
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);

        // 从不同数据源读取数据
//        DataStream<String> text = env.fromElements(WordCountData.WORDS);
//        DataStream<String> text = env.fromCollection(Arrays.asList("hello world", "hello flink"));
//        DataStream<String> text = env.fromSource(crateFileSource(), WatermarkStrategy.noWatermarks(), "file source");
//        DataStream<String> text = env.fromSource(crateKafkaSource(), WatermarkStrategy.noWatermarks(), "kafka source");
        DataStream<String> text = env.fromSource(createDataGeneratorSource(), WatermarkStrategy.noWatermarks(), "datagen source");

        // 处理逻辑
        DataStream<Tuple2<String, Integer>> counts =
                text.flatMap(new WordCount.Tokenizer())
                        .keyBy(0)
                        .sum(1);
        counts.print();
        env.execute("WordCount");
    }

    public static FileSource crateFileSource() {
        FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(),
                new Path("input/word.txt")).build();
        return fileSource;
    }

    public static KafkaSource<String> crateKafkaSource() {
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("ip-port")
                .setTopics("topic")
                .setGroupId("groupId")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        return kafkaSource;
    }

    public static DataGeneratorSource<String> createDataGeneratorSource() {
        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                (GeneratorFunction<Long, String>) value -> "hello" + value,
                100, // 一共100条数据
                RateLimiterStrategy.perSecond(5), // 每秒5条
                Types.STRING
        );
        return dataGeneratorSource;
    }
}

2、数据处理

package com.baidu.keyue.deepsight.memory.test;

import com.baidu.keyue.deepsight.memory.WordCount;
import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class DataProcessDemo {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(RestOptions.BIND_PORT, "8082");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        // 设置流处理还是批处理
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);

        // 读取数据
        DataStream<WaterSensor> sensorDs = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 100L, 100),
                new WaterSensor("s1", 1000L, 1000),
                new WaterSensor("s3", 3L, 3)
        );

        // map算子 : 对数据流中的每个元素执行转换操作,一进一出
        SingleOutputStreamOperator<String> map = sensorDs.map(new MapFunction<WaterSensor, String>() {
            @Override
            public String map(WaterSensor waterSensor) throws Exception {
                return waterSensor.getId() + " : " + waterSensor.getVc();
            }
        });
//        map.print();


        // filter算子 : 对数据流中的每个元素执行过滤操作
        SingleOutputStreamOperator<WaterSensor> filter = sensorDs.filter(new FilterFunction<WaterSensor>() {
            @Override
            public boolean filter(WaterSensor waterSensor) throws Exception {
                return waterSensor.getVc() > 1; // 只保留vc>1的元素
            }
        });
//        filter.print();

        // flatMap算子 : 扁平映射,一个可以有多个输出,在collector里面,然后将其平铺返回
        SingleOutputStreamOperator<String> flatMap = sensorDs.flatMap(new FlatMapFunction<WaterSensor, String>() {
            @Override
            public void flatMap(WaterSensor waterSensor, Collector<String> collector) throws Exception {
                // collector里面是输出的数据
                if ("s1".equals(waterSensor.getId())) {
                    collector.collect(waterSensor.getId());
                } else {
                    collector.collect(waterSensor.getId());
                    collector.collect(waterSensor.getVc().toString());
                }
            }
        });
//        flatMap.print();

        // keyBy 相同key的数据分到同一个分区,用于海量数据聚合操作来提升效率,不对数据进行转换,只是分区
        KeyedStream<WaterSensor, String> keyBy = sensorDs.keyBy(new KeySelector<WaterSensor, String>() {
            @Override
            public String getKey(WaterSensor waterSensor) throws Exception {
                return waterSensor.getId(); // 按照id分组
            }
        });
//        keyBy.print();
        // 在keyBy后可以使用聚合算子,求sum max min等
//        keyBy.sum("vc").print(); // 传实体类的属性名
//        keyBy.maxBy("vc").print(); // 传实体类的属性名


        // reduce算子 : 两两聚合,keyBy后才能操作
        keyBy.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor t2, WaterSensor t1) throws Exception {
                System.out.println("t1=" + t1);
                System.out.println("t2=" + t2);
                return new WaterSensor(t1.getId(), t1.getTs() + t2.getTs(), t1.getVc() + t2.getVc());
            }
        }).print();
        env.execute("WordCount");
    }
}

3、分流/合流

package com.baidu.keyue.deepsight.memory.test;

import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class FenliuDemo {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(RestOptions.BIND_PORT, "8082");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        // 设置流处理还是批处理
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);

        // 读取数据
        DataStream<WaterSensor> sensorDs = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 100L, 100),
                new WaterSensor("s1", 1000L, 1000),
                new WaterSensor("s3", 3L, 3)
        );

        SingleOutputStreamOperator<WaterSensor> oushu = sensorDs.filter(waterSensor -> waterSensor.getVc() % 2 == 0);
        SingleOutputStreamOperator<WaterSensor> jishu = sensorDs.filter(waterSensor -> waterSensor.getVc() % 2 == 1);
        oushu.print("偶数流");
        jishu.print("奇数流");
        // 偶数流和奇数流合并
        oushu.union(jishu).print("合并流");

        env.execute("WordCount");
    }
}

4、输出流 sink

package com.baidu.keyue.deepsight.memory.test;

import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class SinkDemo {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(RestOptions.BIND_PORT, "8082");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        // 设置流处理还是批处理
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);

        // 读取数据
        DataStream<WaterSensor> sensorDs = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 100L, 100),
                new WaterSensor("s1", 1000L, 1000),
                new WaterSensor("s3", 3L, 3)
        );

        FileSink<WaterSensor> fileSink = FileSink.<WaterSensor>forRowFormat(new Path("/Users/chengyong03/Downloads/output/flink"),
                        new SimpleStringEncoder<>("UTF-8"))
                .build();
        sensorDs.sinkTo(fileSink);
        env.execute("WordCount");
    }
}

flinkflink_sql_280">5、flink流表互转,flink sql

package com.baidu.keyue.deepsight.memory.test;

import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class SqlDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1. 创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStream<WaterSensor> sensorDs = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 100L, 100),
                new WaterSensor("s1", 1000L, 1000),
                new WaterSensor("s3", 3L, 3)
        );
        sensorDs.print();
        // 2.流转表
        Table sensorTable = tableEnv.fromDataStream(sensorDs);
        // 3.注册临时表
        tableEnv.createTemporaryView("sensorTable", sensorTable);

        Table resultTable = tableEnv.sqlQuery("select * from sensorTable where vc > 10");

        // 4. table转流
        DataStream<WaterSensor> waterSensorDataStream = tableEnv.toDataStream(resultTable, WaterSensor.class);
        waterSensorDataStream.print();
        env.execute();
    }
}


http://www.niftyadmin.cn/n/5865595.html

相关文章

Lm studio本地部署DeepSeek

为什么用Lm studio Ollama官网下载过慢或失败&#xff08;Lm默认下载源无法下载&#xff0c;但可以更换下载源&#xff09;Ollama默认安装至C盘一部分Nivida显卡无法吃满显存资源一部分AMD显卡替换rocm文件后无法启动 Lm studio安装 官网下载&#xff1a;LM Studio - Discov…

2步本地安装部署国产之光大模型DeepSeek,附Mac安装教程和安装包!

轻松两步本地运行国产大模型DeepSeek&#xff0c;附Windows与Mac教程及安装包&#xff01; 在短短一夜之间&#xff0c;DeepSeek-R1&#xff0c;中国的AI大模型&#xff0c;以惊人的速度崛起&#xff0c;引发了全球科技界的广泛关注。英伟达AI科学家Jim Fan也对此表示惊讶&…

从工程师到系统架构设计师

在技术领域&#xff0c;从一名初出茅庐的工程师成长为独当一面的系统架构设计师&#xff0c;是一条需要长期积累、持续突破的路径。这一过程不仅需要扎实的技术功底&#xff0c;更需要思维的升级和视野的拓展。以下将结合不同阶段的特征&#xff0c;为你梳理一条清晰的成长路线…

基于PSO-LSTM长短期记忆神经网络的多分类预测【MATLAB】

一、研究背景与意义 在时间序列分类、信号识别、故障诊断等领域&#xff0c;多分类预测任务对模型的时序特征捕捉能力提出了极高要求。传统LSTM网络虽能有效建模长程依赖关系&#xff0c;但其性能高度依赖超参数的选择&#xff0c;例如隐含层神经元数量、学习率、迭代次数等。…

微信小程序 - 条件渲染(wx:if、hidden)与列表渲染(wx:for)

一、条件渲染概述 条件渲染用于根据特定条件决定是否渲染某部分内容 微信小程序提供了两种方式实现条件渲染&#xff0c;分别是 wx:if、hidden 二、条件渲染 1、wx:if &#xff08;1&#xff09;基本介绍 wx:if 根据 condition 的真假决定是否渲染该组件及其子组件 condit…

解决双系统开机显示gnu grub version 2.06 Minimal BASH Like Line Editing is Supported

找了好多教程都没有用&#xff0c;终于解决了&#xff01;&#xff01;我是因为ubuntu分区的时候出问题了 问题描述&#xff1a; 双系统装好&#xff0c;隔天开机找不到引导项&#xff0c;黑屏显示下列 因为我用的D盘划分出来的部分空闲空间&#xff0c;而不是全部&#xff0c…

Oracle Fusion Middleware 12C安装 - 呆瓜式

前言 Oracle Fusion Middleware 简称 FMW&#xff0c;其涵盖Java EE、开发工具、集成服务、商业智能、协作和内容管理等。它用于SOA&#xff08;面向服务的架构&#xff09;的开发、部署和管理&#xff0c;并支持与第三方系统如IBM、Microsoft、SAP、EBS等集成。 核心作用 1…

Spring Boot中整合Flink CDC

Flink CDC&#xff08;Change Data Capture&#xff09;是Flink的一种数据实时获取的扩展&#xff0c;用于捕获数据库中的数据变化&#xff0c;并且通过实时流式处理机制来操作这些变化的数据&#xff0c;在Flink CDC中通过Debezium提供的数据库变更监听器来实现对MySQL数据库的…