登 录
注 册
< 大 数 据
Flink
Hadoop
Spark
Hive
HBase
Kafka
其他框架
HBase读写流程
多租户环境
HBase HA
HBase预分区
RowKey设计
HBase优化
HBase数据删除与Split
HBase PythonAPI
HBase存储结构
HBased对接Hive
热门推荐>>>
中台架构
中台建设与架构
Hadoop
源码分析-NN启动(三)
Linux
Nginx高可用
Python
数据导出工具
Flink
3分钟搭建Flink SQL测试环境
Kafka
Kafka对接Flume
深度学习
卷积神经网络
数据结构与算法
选择合适的算法
MySQL
数据备份恢复
计算机系统
信号量同步线程
Hive
Hive调优参数大全
其他框架
Azkaban Flow1.0与2.0
ClickHouse
表引擎-其他类型
技术成长
最好的职业建议
精选书单
技术成长书单—机器学习
技术资讯
数据在线:计算将成为公共服务
开发工具
IntelliJ IDEA 20年发展回顾(二)
系统工具
Mac命令行工具
虚拟化
内存虚拟化概述
云原生
云原生构建现代化应用
云服务
一文搞懂公有云、私有云...
Java
Spring Boot依赖注入与Runners
Go
Go函数与方法
SQL
SQL模板
安全常识
一文读懂SSO
当前位置:
首页
>>
HBase
>>
HBase PythonAPI
HBase PythonAPI
2021-12-25 13:25:52 星期六 阅读:935
 Python操作HBase,目前比较好的第三方库为:happybase,支持基本的增删改等从操作。 `pip3 install happybase` #### HBase表数据探查 ```python # -*- coding: utf-8 -*- import happybase import time import threading import json HOST = "ip" PORT = 6004 SIZE = 11 TABLENAME = "namespace:tablename" class HBaseResearch(): def print_data_sample(self, tablename, limit): with self.pool.connection() as conn: table = conn.table(bytes(tablename, encoding="utf8")) scanner = table.scan(batch_size=1000, limit=limit) for rowkey, values in scanner: print(f"RK: {rowkey} VALUES: {values} ") print("-" * 80) ``` #### 获取某个表的region信息 ``` # 获取某个表的region信息 def get_regions_info(self, tablename): with self.pool.connection() as conn: table = conn.table(bytes(tablename, encoding="utf8")) regions = table.regions() servers = [] for region in regions: servers.append(region.get("server_name")) print(region) distinct_servers = set(servers) print("-" * 60) return f" Region Number:{len(regions)} Servers Number:{len(distinct_servers)} Server Detail:{distinct_servers}" ``` #### 根据RowKey范围获取数据 ``` def download_data_from_key(self, tablename, start_key, end_key, filepath): with self.pool.connection() as conn: start_time = time.time() counter = 0 table = conn.table(bytes(tablename, encoding="utf8")) print(f"线程:{threading.currentThread().getName()} start_key:{start_key}-{self.my_hex(start_key)} end_key:{end_key}-{self.my_hex(end_key)}") for num in range(start_key, end_key): # 获取当前prefix row_prefix = bytes(self.my_hex(num), encoding="utf8") prefix_start_time = time.time() scanner = table.scan(batch_size=5000, row_prefix=row_prefix) for row_key, values in scanner: lines = {} # 二进制数据的key 和 value转换为str类型 for field, value in values.items(): lines[str(field, encoding="utf-8")] = str(value, encoding="utf-8") # 字典转化为json并写入本地文件(注意一定要是append格式写入) with open(filepath, "a") as f: f.write(json.dumps(lines, ensure_ascii=False) + " ") counter += 1 prefix_end_time = time.time() print(f"线程{threading.currentThread().getName()}的row_prefix:{row_prefix}已处理完成,用时:{prefix_end_time - prefix_start_time}") end_time = time.time() print(f"线程{threading.currentThread().getName()}共写入{counter}条数据,用时:{end_time - start_time}") ```