登 录
注 册
< 大 数 据
Flink
Hadoop
Spark
Hive
HBase
Kafka
其他框架
Flink本地程序开发
Flink实战-数据准备
Flink实战-自定义Sink
Flink实战-实时计算
Flink实战-部署上线
文件映射为Table
FlinkTable自定义函数
Flink API JOIN总结
Flink SQL JOIN总结
3分钟搭建Flink SQL测试环境
热门推荐>>>
中台架构
中台建设与架构
Hadoop
源码分析-NN启动(三)
HBase
HBased对接Hive
Linux
Nginx高可用
Python
数据导出工具
Kafka
Kafka对接Flume
深度学习
卷积神经网络
数据结构与算法
选择合适的算法
MySQL
数据备份恢复
计算机系统
信号量同步线程
Hive
Hive调优参数大全
其他框架
Azkaban Flow1.0与2.0
ClickHouse
表引擎-其他类型
技术成长
最好的职业建议
精选书单
技术成长书单—机器学习
技术资讯
数据在线:计算将成为公共服务
开发工具
IntelliJ IDEA 20年发展回顾(二)
系统工具
Mac命令行工具
虚拟化
内存虚拟化概述
云原生
云原生构建现代化应用
云服务
一文搞懂公有云、私有云...
Java
Spring Boot依赖注入与Runners
Go
Go函数与方法
SQL
SQL模板
安全常识
一文读懂SSO
当前位置:
首页
>>
Flink
>>
Flink实战-实时计算
Flink实战-实时计算
2020-10-08 16:59:36 星期四 阅读:3726
完成前面两步:数据准备和自定义Sink后,就可以实时消费Kafka数据进行计算。并将结果插入MySQL。 以下是实时计算的Flink代码(Java) ```java package cn.libin; import com.alibaba.fastjson.JSON; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.text.SimpleDateFormat; import java.util.Map; import java.util.Properties; import java.util.Date; import com.aaron.MysqlSink; public class MyStreamWordCount { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String kafkaServer = "192.168.1.101:9092,192.168.1.102:9092,192.168.1.104:9092"; String groupId = "test-consumer"; String topic = "word_count_topics"; String mysqlUserName = "root"; String mysqlPassword = "xxx"; String DBurl = "jdbc:mysql://192.168.1.102:3306/flink-project"; Properties prop = new MyStreamWordCount().getConsumerProp(groupId, kafkaServer); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), prop); consumer.setStartFromEarliest(); // 从最早开始消费 // 根据Kafka消费者构建Flink source DataStream<String> stream = env.addSource(consumer); DataStream<Tuple3<String, Integer, String>> windowCounts = stream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { Map maps = (Map) JSON.parse(value); // 解析字符串为json String word = (String) maps.get("word"); // 获取word字段的单词 collector.collect(Tuple2.of(word, 1)); // 将单词进行归一操作 } }).keyBy(0) .timeWindow(Time.seconds(10)) // 10秒统计一次 .sum(1) // 增加当前时间字段 .map(new MapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>>() { @Override public Tuple3<String, Integer, String> map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { String times = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); // 返回统计好的词频和当前时间(总共3个字段) return Tuple3.of(stringIntegerTuple2.f0, stringIntegerTuple2.f1, times); } }); windowCounts.print().setParallelism(1); MysqlSink mysqlSink = new MysqlSink(mysqlUserName, mysqlPassword, DBurl); // 将本窗口的数据插入自定义的MySQL Sink windowCounts.addSink(mysqlSink); env.execute("flink test program"); } //传入消费者群组,kafka集群的地址,返回消费者属性 public Properties getConsumerProp(String groupId, String kafkaServer) { Properties properties; properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); //设置 offset自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); //key、value反序列化类 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); return properties; } } ``` ##### 下一步 [部署上线](https://libins.cn/detail/moduleName=flink&articleId=beb089606625752e057c44cc0b364055 "部署上线")