登 录
注 册
< 大 数 据
Flink
Hadoop
Spark
Hive
HBase
Kafka
其他框架
Flink本地程序开发
Flink实战-数据准备
Flink实战-自定义Sink
Flink实战-实时计算
Flink实战-部署上线
文件映射为Table
FlinkTable自定义函数
Flink API JOIN总结
Flink SQL JOIN总结
3分钟搭建Flink SQL测试环境
热门推荐>>>
中台架构
中台建设与架构
Hadoop
源码分析-NN启动(三)
HBase
HBased对接Hive
Linux
Nginx高可用
Python
数据导出工具
Kafka
Kafka对接Flume
深度学习
卷积神经网络
数据结构与算法
选择合适的算法
MySQL
数据备份恢复
计算机系统
信号量同步线程
Hive
Hive调优参数大全
其他框架
Azkaban Flow1.0与2.0
ClickHouse
表引擎-其他类型
技术成长
最好的职业建议
精选书单
技术成长书单—机器学习
技术资讯
数据在线:计算将成为公共服务
开发工具
IntelliJ IDEA 20年发展回顾(二)
系统工具
Mac命令行工具
虚拟化
内存虚拟化概述
云原生
云原生构建现代化应用
云服务
一文搞懂公有云、私有云...
Java
Spring Boot依赖注入与Runners
Go
Go函数与方法
SQL
SQL模板
安全常识
一文读懂SSO
当前位置:
首页
>>
Flink
>>
Flink实战-数据准备
Flink实战-数据准备
2020-10-08 16:46:27 星期四 阅读:2190
####实战项目功能 >1.Python程序模拟实时产生数据并往Kafka发送; 2.Flink程序构建Kafka消费者实时消费数据并统计最近10秒钟的词频; 3.Flink将统计结果实时写入MySQL; 4.Hue通过查询MySQL验证计算是否正确 #####环境准备 开启Zookeeper集群 开启Kafka集群 开启YARN集群(本次演示使用Flink ON Yarn) 有可用的MySQL服务 ####Python实现Kafka生产者 安装SDK ``` pip3 install kafka-python ``` 实时往kafka集群发送消息 ```python import datetime import json import time import uuid import random from kafka import KafkaProducer from kafka.errors import KafkaError # broker的IP和端口 bootstrap_servers = ["192.168.1.101:9092", "192.168.1.102:9092", "192.168.1.104:9092"] producer = KafkaProducer(bootstrap_servers=bootstrap_servers) topic = "word_count_topics" # 单词库 words = ["hello", "flink", "my", "name", "is", "aaron", "the", "city", "is", "beijing"] n = 0 while True: info = {} info["id"] = n info["uuid"] = uuid.uuid4().hex info["time"] = time.time() info["word"] = random.sample(words, 1)[0] # 随机选择一个单词进行发送 info["msg"] = "this is test info of python-kafka" producer.send(topic, json.dumps(info).encode()) print("第{}条消息发送成功,发送的消息为:{}".format(n, info)) time.sleep(random.random()) # 随机停顿一段时间 n += 1 ``` #### 创建MySQL结果表 ```mysql CREATE TABLE word_count_realtime ( id int PRIMARY KEY AUTO_INCREMENT, word varchar(200) COMMENT "单词", count int COMMENT "recent count", update_time datetime ) ``` ##### 下一步 [自定义Sink](https://libins.cn/detail/moduleName=flink&articleId=cca78f569c404e6f823f77dc0f2c9355 "自定义Sink")