2.3. 1.hadoop主要组件及作用是什么?
使用hdfs进行数据的存储
使用mapreduce进行数据的计算 、(spark、tez)
使用yarn进行资源的调度
使用hive编写sql语句
使用mysql存储表格的元数据结构
Sqoop hive专用的etl工具 不能表到表
Hdfs下面有三个部分组成,
HDFS副本数是通过配置文件hdfs-site.xml中的参数dfs.replication来设置的。默认情况下,HDFS副本数为3
hdfs、yarn、MR三者的关系
首先接口client向yarn的rm提交申请一个mr的task的id
然后rm将id,路径给到接口,接口得到id和路径后,
将自己的jar包、切片信息和配置文件提交到路径。
提交完资源后向rm申请运行MRappmaster
rm得到申请后将job分配给某个空闲的NM(nodemanager)
该NM创建Container,并产生MRAppmaster
再将路径里接口提交的资源下载到本地后开始分配任务
MrAppMaster向RM申请运行多个MapTask任务资源(多个切片(128mb)对应的多个map task,每个task基本上都会分配到不同的节点上,所以每个task的老大就是节点上的nodemanager,mr会将多个map task分配给多个NM,每个 NM再产生多个container容器,容器里封装的是appmaster所带的任务)
后面再到mapreduce内部执行过程,map任务对数据分配k,v键值对,shuffer缓冲,分区排序,之后再是继续向rm申请运行reduce task的资源,然后rm再将task分给空闲的NM
,最后程序运行完毕后,MR会向RM申请注销自己
Hive表内容
创建表:
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type [COMMENT col_comment], …)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], …)]
[CLUSTERED BY (col_name, col_name, …)
[SORTED BY (col_name [ASC|DESC], …)] INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION hdfs_path]
[TBLPROPERTIES (property_name=property_value, …)]
复制表结构:create table tablename like tablename1
根据表1建表2:table tablename2 select * from biao1
根据表1,插入表2、表3:
from u1
insert into u2 select *
insert into u3 select id,name;
修改列名、修改列的位置:ALTER TABLE table_name CHANGE column_name new_column_name column_type;
增加列 ALTER TABLE table_name ADD COLUMNS (column_name data_type [COMMENT column_comment], …);
移动列 ALTER TABLE table_name CHANGE COLUMN column_name column_name data_type [COMMENT column_comment] AFTER new_position;
删除列:ALTER TABLE table_name replace COLUMNS(lie1、lie2。。。)相当于保留其他列来删除想要删除的列
Namenode 9870 服务器端口9820
Yarn 8088 8032
历史服务器 jobhistory 19888 10020
STORED AS(重点)
表的存储格式
指定文件格式,常用的文件格式有,行textfile(默认值),sequence file,列orc file、parquet file等等。
大表和大表关联是考虑分区、分桶实现mapredus和cpu内核的关系,一对一成正比
2.4. hive支持多种压缩方式
不压缩:不进行任何压缩,直接存储数据
Gzip压缩:使用Gzip算法进行压缩,可以显著减少数据存储空间。
Snappy压缩:使用Snappy算法进行压缩,速度比Gzip快,但压缩比较低。
压缩:使用LZO算法进行压缩,速度很快,但压缩比较低。
Bzip2压缩:使用Bzip2算法进行压缩,压缩比较高,但速度比较慢。
在创建表时,可以通过设置表属性来指定压缩方式,例如:
CREATE TABLE mytable ( col1 INT, col2 STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ STORED AS TEXTFILE TBLPROPERTIES (“compression.type”=”gzip”);
此外,可以通过设置MapReduce作业的压缩方式来对Hive表进行压缩,例如:
SET mapreduce.map.output.compress=true;
SETmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
SET hive.exec.compress.output=true;
SET mapreduce.output.fileoutputformat.compress=true;
SETmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
以上设置将对Hive表进行Gzip压缩。
传统数据库和分布式的数据库的区别
数据量级别,响应时间,安全性 可扩展性、计算能力,索引,适用环境
Hive和Oracle都是用于数据存储和处理的工具,但它们在很多方面有所不同。
类型:Hive是一个数据仓库工具,用于在Hadoop平台上进行大规模数据处理,通常用于分析和查询大型数据集。Oracle是一个关系型数据库管理系统,用于OLTP(联机事务处理)和OLAP(联机分析处理)任务。
架构:Hive基于Hadoop平台,使用HiveQL(类似于SQL)进行查询,而Oracle使用SQL进行数据查询和管理。
处理能力:Hive适合处理大规模的数据集,可以在一个分布式环境中进行并行处理。Oracle也可以处理大规模数据,但通常更适用于小型至中等规模的数据集。
存储方式:Hive使用Hadoop分布式文件系统(HDFS)来存储数据,而Oracle使用传统的关系型数据库管理系统来存储数据。
成本:Hive是开源软件,免费使用,但需要一定的Hadoop基础设施来支持。Oracle是商业数据库管理系统,需要购买许可证才能使用。
Oracle和 hive得字段类型区别
Number -》 decimal
Varchar2 -》 string
Date -》 string ,timestamp
2.5. hive 和oracle 语法、函数的区别
同一种关联在oracle中有两种写法,而在hive中只有一种,hive不能用where进行关联
Oracle: select * from a,b where a.c = b.d;或select * from a join b on a.c = b.d;
Hive: select * from a join b on a.c = b.d;
左连接Oracle: select * from a,b where a.c = b.d(+);Hive: select * from a left join b on a.c = b.d;
笛卡尔积 Oracle: select * from a,b; Hive: select * from a join b;
select中含有子查询
例如select a.id, (select b.id from b where b.name=a.id) from a
hive 是不支持select 里面子查询的。 修改如下
select a.id ,b.id from a left join b on a.id=b.name
临时表名
oracle 这样是可以的SELECT * FROM (SELECT 1,2 FROM dual )
hive注意必须临时表名 select * from (select 1, 2 )t –正确
Instr()oracle有四个参数,hive两个参数没有oracle的强大
instr(‘被查找的字符串’,’我们需要查找的字符’,从第几位开始 首位是0,查找第几个出现的)
SELECT instr(‘1234567890123456789’,’3’) – 3 没问题 但是这个功能比较简单,{hive的}
hive-locate函数
select locate(‘3’,’12345123’,4) –8
– 这个locate函数也是找到字符串的下标 locate(‘要找的字符’,’被找的字符串’,’ 从下标多少开始找’)。
2.6. Hive实现不等值连接
select a.app_name,
b.app_name
from a
left outer join b
where instr(b.app_name,a.app_name) > 0
LOCATE / LIKE
2.7. Oracle中的for循环:
DECLARE
i NUMBER;
BEGIN
FOR i IN 1..10 LOOP
DBMS_OUTPUT.PUT_LINE(‘i = ‘ || i);
END LOOP;
END;
对应的Hive循环:
SET max=10;
SET i=1;
WHILE ${hiveconf:max} >= ${hiveconf:i} DO
SELECT CONCAT(‘i = ‘, ${hiveconf:i});
SET i=${hiveconf:i}+1;
END WHILE;
2.8. Hive里面循环:
Hive:
For I in 1 2 3 while I > 10 do
Do …….
…… end while
Done
Oracle : for while 都是跟loop声明开始 end loop 声明结束
Mysql:while do end while , loop end loop , repeat end repeat;
2.9. Hive排序
1、全局排序(order by)
Order by:全局排序,只有一个reducer
ASC(ascend):升序(默认)
DESC(descend):降序
2、每个MR内部排序(sort by)
sort By:对于大规模的数据集order by的效率非常低。在很多情况下,并不需要全局排序,此时可以使用sort by
Sort By为每个Reducer产生一个排序文件。每个Reducer内部进行排序,对全局结果集来说不是排序。
(1)设置reduce个数
hive (default)> set mapreduce.job.reduces=3;
(2)根据部分编号降序查看员工信息
hive (default)> select * from emp sort by empno desc;
3、分区排序(Distribute By)
Distribute By:在某些情况下,我们需要控制某个特定行应该到哪个Reducer,通常是为了后续的聚集操作。
Distribute by类似MR中partition(自定义分区),进行分区,结合sort by使用
测试时要分配多个reduce进行处理,否则无法看到Distribute by的效果
set mapreduce.job.reduces=3;
注意:
Distribute by的分区规则是根据分区字段的hashcode与reduce的个数进行取模后,余数相同的分到一个区
hive要求Distribute by语句要写在sort by语句之前
4、cluster by
当Distribute by 和sort by 字段相同时,可以使用cluster by 方式
cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。
1)以下两种写法等价
hive (default)> select * from emp cluster by deptno;
hive (default)> select * from emp distribute by deptno sort by deptno;
Distribute by和sort by的使用场景
1.Map输出的文件大小不均。
2.Reduce输出文件大小不均。
3.小文件过多。
4.文件超大。
2.10. 2.LINUX文件导入到hive的方法?
sqoop import
–hive-import
–connect jdbc:…
–username 用户名
–password 密码
–table 表名
–fields-terminated-by ‘,’;
Load data local inpath ‘文件路径’ into table 表名;
Hadoop dfs -put 文件路径 –导到HDFS
2.11. 3.如何删除(清空)分区?
alter table drop partition (分区字段=分区值);
alter table truncate partition (分区字段=分区值);
2.12. 拉链表存储数据量过大的问题
可以根据冷数据热数据的思路,将近三年的放进一张表存的热数据,三年前的放在另一张表存储冷数据,
还可以根据结束时间进行分区
外部表和內部表
外部表建立的时候要用EXTERNAL声明 、外加声明LOCATION 申明文件所再得文件夹,分隔符,换行符、删除时候外部表删除meta store里面的namenode存储的元数据信息,不会删除原数据文件,内部表会删除原文件和元数据文件
分别是什么文件格式,有什么区别,哪个占用的空间更大,外部表需要注意什么?
外部表 TEXTFILE
内部表 ORCFILE
区别:TEXTFILE 行存储 ORC 列存储,TEXTFILE 只能 LOAD DATA,ORCFILE 只能insert into
TEXTFILE 更占空间
外表需要注意 LOCATION 申明文件所再得文件夹,分隔符,换行符
外表删除时HDFS上得文件并不会被删除
2.13. 5.TEXTFILE得表能直接转成ORCFILE格式得表吗?
不能,只能建新表插入数据.
区别:TEXTFILE 行存储 ORC 列存储,TEXTFILE 只能 LOAD DATA,ORCFILE 只能insert into
TEXTFILE 更占空间
2.14. 6.update,delete,循环用hive怎么实现?
update:建临时表,把不需要修改得数据筛选出去union上需要更改得数据
delete: 建临时表,把要不删除得筛选出去
循环: 用shell封装hsql,运用shell脚本得循环来实现
2.15. 8.mapreduce执行流程?
1.input:先获取在计算过程中所需要的数据
2.split:对大的数据进行切割的操作,将数据分成一块块的
3.map:数据的映射。将分配好的数据,给到不同的进程去运行
4.shuffle:每个进程单独的对自己拿到的数据进行计算
5.reduce:将单独的数据进行总体汇总的计算过程
6.finalize:将计算结果输出
9.分区和分桶得区别?
分区:partition by 分桶:clustered by
分区原理上是分文件夹,分桶是分数据文件
分区是通过伪列来分(指定新的字段),分桶是按照表里现有得列来分得
分区主要提升查询某个分区数据得查询速度,分桶主要提高分桶字段得关联速度
分桶和分区两者不干扰,可以把分区表进一步分桶;
分桶是字段值得hash值按照分桶数来取模分配
设置分桶数:set map.reduce.tasks = 4; //设置分桶数
hive 计算桶列的hash值再除以桶的个数取余,得到某条记录到底属于哪个桶
2.16. 分桶
创建语句
create table bucket_user (
id int
)
clustered by (id) into 4 buckets;
优点:Hive分桶可以提高查询性能,增强并行处理能力,提高数据压缩比率,方便数据过滤和聚合,并支持数据分布式处理。这些优点使得Hive分桶成为大数据处理场景下的重要技术手段。
分桶抽样:
select * from bucket_user tablesample(bucket x out of y on 字段)
x 表示从第几个分桶进行抽样,y每隔几个分桶取一个分桶,y必须是table总 bucket数的倍数或者因子。
静态分区和动态分区介绍
静态分区与动态分区的主要区别在于静态分区是手动指定,而动态分区是通过数据来进行判断。
Hive分区是在创建表的时候用Partitioned by 关键字定义的,但要注意,Partitioned by子句中定义的列是表中正式的列,但是Hive下的数据文件中并不包含这些列,因为它们是目录名
动态分区不允许主分区采用动态列而副分区采用静态列,这样将导致所有的主分区都要创建副分区静态列所定义的分区:
静态分区:分区是指定的一个固定值;
静态分区的表不能被加到动态分区里,可以增加新的分区但是只对后来加入的新数据有意义,已经写入数仓的文件不能被重新分区了,只能重写!
动态分区:分区根据读入的值动态写(可理解为变量形式)
这是因为Hive默认配置不启用动态分区,需要使用前开启配置。开启的方式有两种:
在hive服务配置文件中全局配置;
每次交互时候进行配置(只影响本次交互);
通常我们生产环境使用第二种。
hive.exec.dynamic.partition=true,是否支持动态分区操作
hive.exec.dynamic.partition.mode=strict/nonstrict: 严格模式/非严格模式
hive.exec.max.dynamic.partitions=1000: 总共允许创建的动态分区的最大数量
其中参数hive.exec.dynamic.partition.mode表示动态分区的模式。默认是strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区
另外动态分区的值是MapReduce任务在reduce运行阶段确定的,也就是所有的记录都会distribute by,相同字段(分区字段)的map输出会发到同一个reduce节点去处理,如果数据量大,这是一个很弱的运行性能。而静态分区在编译阶段就确定了,不需要reduce任务处理。所以如果实际业务场景静态分区能解决的,尽量使用静态分区即可。
动态分区和静态分区创建的语法都一样,加载的时候不一样,静态是加载的时候给定分区名 加载用load data比较多
动态是加载的时候不用给定分区名,加载用insert比较多
Insert overwrite table表 partition(分区字段=值)select ,,,from 表:这个是静态
Insert overwrite table表 partition(分区字段)select ,,,from 表:这个是动态
注意:在hive中,分区字段名是不区分大小写的,不过字段值是区分大小写的
动态分区会使用mapreduce来进行查询数据,分区多会影响namenode 和recoremanager资源
分区删除后:修复分区就是重新同步hdfs上的分区信息。
msck repair table table_name;
hive怎么查看每个分区分到哪个文件夹去了?
在 Hive 中,要查看每个分区对应的数据存储在哪个文件夹,您可以通过以下方式:
首先,使用 DESCRIBE EXTENDED <表名> 命令查看表的详细信息,其中包括分区的信息。
然后,在输出的结果中,您可以找到关于分区的描述,其中会显示每个分区的存储路径。
或者,您也可以直接在 Hive 所使用的文件系统(如 HDFS)中,根据表的默认存储路径和分区的字段值来查找对应的文件夹。
数据倾斜怎么发现?可能形成得原因?怎么解决?
数据倾斜其实就是系统再分配reduce task时候,每个task里面的数据量的分配不均匀,导致有的task早早的跑完,有的却还在跑的原因
发生数据倾斜一般体现为运行一个任务的时候,所有的map task都完成了,
并且99%的reduce task也完成,只剩下一个或者少数几个reduce task一直在执行,无论等多久都不会停止.
原因:
1.key值分布不均匀.
2.大表join小表时:其中小表的key集中,造成大量一对多,分发到某一个或者几个reduce上的数据远远高于平均值.
3.大表join大表时:空值过多,在做join时这些空值就会非常集中,拖累进度.
4.group by:某个分组值的数量过多,处理某值的reduce非常耗时间.
5.Count distinct:某字段值过多,处理此特殊值的reduce耗时.
解决:
参数调节:hive.map.aggr = true(原理是将一个任务分成两部分,第一个MR会将group by 得key值随机分配给reduce做部分聚合操作,这样相同得key值也可能分配到不同得reduce,再利用第二个MRjob将第一个任务预处理得数据中相同的key值再整合到一个reduce当中完成最终得聚合操作)
sql调节:
1.关联:选用key分布最均匀的表作为驱动表.
2.大表join小表的时候:使用 /*+ mapjoin(小表的名字) */让小表先进内存,在map端完成reduce.
3.大表join大表的时候:把空值变成一个关联不上得字符串加上随机数,把倾斜的数据分到不同的reduce上.
4.count distinct时可以先group by再count.(原理是一般count distinct只会分配给一个reduce task,所以将SELECT count(distinct 列) FROM 表 转化为 SELECT count(列) FROM (select 列 FROM 表 GROUP BY 列);利用group by开启多个reduce task进行去重后再count)
5.group by:一般用参数调节处理.(set hive.map.aggr = true)
11.hive不支持不等值连接怎么处理不等值连接.
inner join:直接将非等值连接条件写进where中.
left join:不能将非等值条件直接写进where里会过滤主表数据,所以要先inner再left例如:
select t1.id,t1.bal,t2.bal
from table1 t1
left join table2 t2
on t1.id = t2.id
where t1.bal<t2.bal;(这样会过滤主表数据)
改为
select *
from table1 t
left join(
select t1.id,t2.bal
from table1 t1
inner join table2 t2
on t1.id=t2.id
where t1.bal<t2.bal) t3
on t.id = t3.id;
2.17. 12、hdfs命令
启动命令:/hadoop/sbin/start-dfs.sh
停止命令:/hadoop/sbin/stop-dfs.sh
hadoop fs -put 本地路径 HDFS路径
从本地文件系统中拷贝文件到 HDFS
hadoop fs -get HDFS路径 本地路径
从 HDFS 拷贝到本地文件系统
hadoop fs -ls HDFS路径
显示HDFS目录信息
hadoop fs -cat HDFS路径
显示HDFS文件内容
hadoop fs -mkdir HDFS创建路径
创建HDFS路径( -P 递归创建)
hadoop fs -cp HDFS路径1 HDFS路径2
从 HDFS 的一个路径拷贝到 HDFS 的另一个路径
hadoop fs -tail HDFS文件
显示一个HDFS文件的末尾 1kb 的数据
hadoop fs -rm -r HDFS路径
递归删除HDFS目录及目录里面内容
Hadoop fs -du HDFS文件夹
统计HDFS文件夹的大小信息
2.18. Hive锁表
hive存在两种锁,共享锁Shared (S)和互斥锁Exclusive (X)
其中只触发s锁的操作可以并发的执行,只要有一个操作对表或者分区出发了x锁,则该表或者分区不能并发的执行作业,
表锁和分区锁是两个不同的锁,对表解锁,对分区是无效的,分区需要单独解锁。
8、hive底层与数据库交互原理
Hive的查询功能是由hdfs + mapreduce结合起来实现的
Hive与mysql的关系:只是借用mysql来存储hive中的表的元数据信息,称为metastore
Sort By:每个Reducer内部进行排序,对全局结果集来说不是排序。丛林
1.设置reduce个数
set mapreduce.job.reduces=3;
1
2.查看设置reduce个数
set mapreduce.job.reduces;
开发中,经常会遇到Hive分区表需要加字段的问题。在我们使用常规手段alter table 加上字段后,重新导入当天的数据,会发现新加字段的值全为空。
答; 一、ALTER TABLE test.partition_test ADD columns(id string)CASCADE;
第二步:ALTER TABLE test.partition_test DROP partition(dt = ‘2022-09-04’);删除分区
ALTER TABLE test.partition_test ADD partition(dt = ‘2022-09-04’);添加分区
导入数据INSERT overwrite TABLE test.partition_test PARTITION(dt=’2022-09-04’)VALUES (。。。。);
2.1 存储格式
存储格式用于建表的时候指定将表中的数据按照什么样子的存储方式,风险数据集市统一使用parquet格式存储,并使用Gzip压缩。
2.2 字符集
在建表时可能涉及到中文乱码问题,所有导入/导出文件的字符编码统一为utf-8格式。
hive里面没有索引,加速搜索要用solr组件 或者ES组件
Hive怎么查看执行计划?
EXPLAIN [EXTENDED|DEPENDENCY|REWRITE|LOGICAL|AUTHORIZATION] select_statement
EXTENDED参数可以显示更详细的执行计划信息,包括每个任务的详细信息、每个阶段的运行时间和输入/输出记录数等
DEPENDENCY参数可以显示查询语句中所有依赖的表和分区信息
REWRITE参数可以显示查询语句的重写规则
LOGICAL参数可以显示查询语句的逻辑执行计划
AUTHORIZATION参数可以显示查询语句的权限信息
2.19. Hive优化
一、SQL本身的优化
1、只select需要的列,避免select *
2、where条件写在子查询中,先过滤再关联
3、关联条件写在on中,而不是where中
4、数据量大时,用group by代替count distinct
5、数据量小时,用in代替join
6、避免笛卡尔积
7、把重复关联键少的表放在join前面做关联可以提高join的效率。”写在关联左侧的表每有1条重复的关联键时底层就会多1次运算处理
7、严格格式
Hive.mapred.mode,分 nonstrict,strict,默认是nonstrict,
如果设置为strict,对三种情况限制:
(1)分区表必须加分区。
(2)order by 必须使用limit
(3)存在笛卡尔积
数据倾斜
数据倾斜的现象:
1、任务进度长时间维持在99%(或100%);
2、查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。
3、本地读写数据量很大。
导致数据倾斜的原因:
1、空值问题
2、数据类型不一致
3、业务数据本身的问题
解决:
1、小表关联大表,开启mapjoin
(1)设置参数
set hive.auto.convert.join=true;
hive.mapjoin.smalltable.filesize=25000000 即25M
(2)手动指定
select /+ mapjoin(A)/ f.a,f.b from A t join B f on ( f.a=t.a and f.ftime=20110802)
2、加盐打散
(1)空值0值 或 关联不上的,用随机数
from a join b
on if(a.key=’’, rand()-1, a.key)=b.key
–rand() 0-1之间的小数
(2)都是有用的key,则加随机数后缀
group by concat(key, cast(round(rand()*10) as int))
缺点是分成10份是提前写好的,数据变更大时,还是会跑得慢
3、开启combiner,即map端聚合
set hive.map.aggr=true;
4、开启负载均衡,会生成两个MRJob
set hive.groupby.skewindata=true;
第一个 MR Job 中,Map 的输出结果集合会随机分布到Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group ByKey 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce中),最后完成最终的聚合操作。
5、group by 代替count distinct
6、大key单独处理,再union回去
7、增大并行度
set mapred.job.queue.name=pms; //设置队列
set hive.exec.reducers.max=8; //设置最大的reducers
set mapred.reduce.tasks=8; //设置最大的redue tasks
set hive.exec.parallel=true; //开启任务并行执行
set hive.exec.parallel.thread.number=8; // 同一个sql允许并行任务的最大线程数
小文件的处理
https://blog.csdn.net/xianyu624/article/details/130284200?ops_request_misc=&request_id=&biz_id=102&utm_term=hadoop%E6%80%8E%E4%B9%88%E8%A7%A3%E5%86%B3%E5%B0%8F%E6%96%87%E4%BB%B6%E9%97%AE%E9%A2%98&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduweb~default-0-130284200.142^v96^pc_search_result_base6&spm=1018.2226.3001.4187
小文件是如何产生的?
动态分区插入数据,产生大量的小文件,从而导致 map 数量剧增;
reduce 数量越多,小文件也越多,reduce 的个数和输出文件个数一致;
数据源本身就是大量的小文件
【影响】
1、小文件会开很多map,导致执行task时间还没启动task时间快
2、在HDFS中,每个小文件对象约占150字节,如果小文件过多会占用大量namenode内存。
【解决方法】
1)数据采集时,进行文件的合并
使用 Hadoop Archive(能高效将小文件放入HDFS的文件存档工具) 将多个小文件打包为一个HAR文件,减少NameNode的内存使用。
2、减少reduce个数 set mapreduce.job.reduces=n个;
3、参数调节
(1)设置map输入合并小文件
(2)设置map和reduce输出合并小文件
4、少用动态分区,使用distribute by分区
数据压缩和存储格式
【数据的压缩与存储格式】
1、压缩 gzip bzip2 snappy
2、存储格式
【行式存储和列式存储】
TextFile:行式存储,Gzip压缩后不支持split
RCFile:数据按行分块,每块按列存储。头信息:行组记录数、每列字节数、
ORC:数据按行分块,每块按列存储,是rcfile的改良版本。头信息:每一列最大小值、该行的偏移量和长度
Parquet:列式存储(压缩比高)。头信息:数据量、偏移量。
hive参数的调整
1、多个job无依赖(如union all),可设置并行执行
//开启任务并行执行
set hive.exec.parallel=true;
//同一个sql允许并行任务的最大线程数
set hive.exec.parallel.thread.number=8;
2、设置map和reduce个数
set mapred.max.split.size=100000000; 每个map的最大输入大小
set mapred.min.split.size.per.node=100000000;一个节点上split的至少的大小,决定了多个data node上的文件是否需要合并
set mapred.min.split.size.per.rack=100000000;一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并
3– 开启动态分区(默认开启的话可以不执行此命令)
set hive.exec.dynamic.partition=true;
–使用非严格模式
set hive.exec.dynamic.partition.mode=nostrick;
hive.exec.max.dynamic.partitions=1000: 总共允许创建的动态分区的最大数量
hive.exec.max.dynamic.partitions.pernode=100:in each mapper/reducer node
行转成数组列:
concat_ws(‘,’,COLLECT_LIST(column)) 与concat_ws(‘,’,COLLECT_SET(column)) 都可实现。
区别:collect_list 不去重,collect_set 去重。
group_concat()函数
功能:将group by产生的同一个分组中的值连接起来 返回一个字符串结果
语法 :group_concat([distinct]要连接的字段 [order by 排序字段 asc/desc][separator’分隔符’])
说明:通过使用distinct可以排除重复值 如果希望对结果中的值进行排序 可以使用order by子句 separator是一个字符串值
注意: column的数据类型要求是string数组列转行
数组里面的列转行:explode()、split()和LATERAL VIEW
select name,cj from arr2 lateral view explode(scores) mytable as cj;
查询每个人的最后一科的成绩
select name,scores[size(scores)-1] from 表
Hive中Join的 MR 底层原理
Hive中的Join可分为Common Join(Reduce阶段完成join)和Map Join(Map阶段完成join)。
一、 Hive Common Join
如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join.
整个过程包含Map、Shuffle、Reduce阶段
Map阶段<key,value>
读取源表的数据,Map输出时候以 Join on 条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key;
Map输出的 value 为 join 之后所关心的(select或者where中需要用到的)列;同时在value中还会包含表的 Tag 信息,用于标明此value对应哪个表;
按照key进行排序
Shuffle阶段
根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中
Reduce阶段
根据key的值完成join操作,期间通过Tag来识别不同表中的数据。
select u.name, o.orderid from order o join user u on o.uid = u.uid;
Hive Map Join :map join 省略了reduce阶段,直接map rtask输出
MapJoin通常用于一个很小的表和一个大表进行 Join 的场景,具体小表有多小,由参数hive.mapjoin.smalltable.filesize来决定,该参数表示小表的总大小,默认值为25000000字节,即25M。
0.7版本之后,默认自动会转换Map Join,由参数 hive.auto.convert.join 来控制,默认为true.
仍然以前面的HQL来说吧,假设a表为一张大表,b为小表,并且hive.auto.convert.join=true,那么Hive在执行时候会自动转化为MapJoin。
如图中的流程,首先是Task A,它是一个Local Task(在客户端本地执行的Task),负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中,该HashTable的数据结构可以抽象为
key value
1 26
2 34
接下来是Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据 a 的每一条记录去和DistributeCache中 b 表对应的 HashTable 关联,并直接输出结果。
由于 MapJoin 没有Reduce,所以由 Map直接输出结果文件,有多少个Map Task,就有多少个结果文件。
GROUP BY
任务转化为MR任务的流程如下:
Map:生成键值对,以GROUP BY条件中的列作为Key,以聚集函数的结果作为Value
Shuffle:根据Key的值进行 Hash,按照Hash值将键值对发送至不同的Reducer中
Reduce:根据SELECT子句的列以及聚集函数进行Reduce
编写SQL如下:
SELECT pageid, COUNT(1) as num
FROM page_view
GROUP BY pageid;
则转化后的MR任务流程如下图所示:
Distinct 和group by 的效率
GroupBy从 MapReduce 的角度看
map 阶段,将 group by 后的字段组合作为 key,如果 group by 单字段那么 key 就一个。
将 group by 之后要进行的聚合操作字段作为值,如要进行 count,则 value 是 1;如要 sum 另一个字段,则 value 就是该字段。
shuffle 阶段,按照 key 的不同分发到不同的 reducer。注意此时可能因为 key 分布不均匀而出现数据倾斜的问题。
reduce 阶段,将相同 key 的值累加或作其他需要的聚合操作,得到结果。
GROUP BY 和 DISTINCT 去重的区别
GROUP BY 是按照 key 进行 hash 分散到多个 Reduce 中处理,速度快;
DISTINCT 是把要处理字段全部放到一个 Reduce中 处理,速度慢;
结论:尽量用 GROUP BY 替代 DISTINCT。不过如果 key 类别太多会造成分桶过多。例如 GROUP BY user_id 和 DISTINCT user_id,user_id 可能很多,分桶太多。
GROUP BY 的特性
使用了 reduce 操作,受限于 reduce 数量,通过参数 mapred.reduce.tasks 设置 reduce 个数。
输出文件个数与 reduce 数量相同,文件大小与 reduce 处理的数量有关。
GROUP BY 出现数据倾斜的调优方法
set hive.map.aggr=true,即开启 map 端的 combiner,减少传到 reducer 的数据量,同时需设置参数 hive.groupby.mapaggr.checkinterval 规定在 map 端进行聚合操作的条目数目。
设置 mapred.reduce.tasks 为较大数量,降低每个 reducer 处理的数据量。
set hive.groupby.skewindata=true,该参数可自动进行负载均衡。生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
Map join原理
小表关联大表开启map join比较快的原因有以下几点:
减少了数据的传输量:开启map join后,小表的数据将会广播到每个map任务所在的节点上,大大减少了数据的传输量。这样可以减少网络IO的开销,加快查询的速度。
提高了并行度:开启map join后,小表的数据可以按照join条件进行哈希分桶,并且每个map任务只需要处理一个桶的数据。这样可以提高并行度,使查询能够更快地完成。
减少了磁盘IO:开启map join后,小表的数据被缓存在内存中,不需要进行磁盘IO操作。而传统的join操作需要将小表的每一行数据与大表进行比较,需要频繁地进行磁盘IO,效率较低。
可以减少shuffle操作:在开启map join的情况下,不需要进行shuffle操作,因为小表的数据已经被复制到了每个map任务所在节点上。而传统的join操作需要将所有的数据进行shuffle操作,非常耗时。
综上所述,开启map join可以减少数据传输量、提高并行度、减少磁盘IO和shuffle操作,从而加快查询的速度。
Hive v0.7之后的版本已经不需要给出MapJoin的指示就进行优化。它是通过如下配置参数来控制的:
hive> set hive.auto.convert.join=true;
Hive 还提供另外一个参数,就是:表文件的大小作为开启和关闭MapJoin的阈值。
hive> set hive.mapjoin.smalltable.filesize=25000000 即25M
Hive的导入导出数据
2.20. 数据导入
1、Sqoop
先从最简单的任务开始
sqoop import
–connect jdbc:mysql://10.66.38.125:3306/user_db
–username cloudera
–password secretkey
–table department
–target-dir /sqoopdata/departments \ # HDFS 的目标存储位置
–where “department_id = 1000” \ # 指定条件,只有达成这个条件的才会被 import 进来
– m 1
就这个语句就可以将我们关系型数据库中的某个表 import 进 HDFS 的某个位置。
同样我们可以 import 某些字段进来生成文件
sqoop import
–connect jdbc:mysql://localhost:3306/retry_db
–username cloudera
–password secret
–table departments
–columns “dept_id, name” \ # 指定需要的字段
–as-avrodatafile # 指定存成 avro 数据文件
如果我们要 import 一个库里面的所有表可以使用
sqoop import-all-tables
–connect jdbc:mysql://localhost:3306/retry_db
–username cloudera
–password secret
–warehouse-dir /mydata # HDFS parent for table 这个会将所有这些表都放到 HDFS 这个文件夹下面
2、Sqoop的常见问题
由于Sqoop在使用时不需要编程,主要依赖环境配置以及运行参数,所以也导致很多问题出现的时候只能通过报错信息来进行排查
2.Caused by: java.lang.RuntimeException: Can’t parse input data: ‘800 1 620025 塑料油箱 ‘;java.io.IOException: Can’t export data, please check failed map task logs
原因:此问题出现在hive导出到关系数据库,原因为hive表的存储格式问题。
sqoop无法导出parquet文件或者ORC格式到关系型数据库,否则会报错,修改方案:
方式1:原表改为普通的textfile存储类型的表。
:方式2:把要导出的数据存到一个临时表,然后再把数据导出。
3.sqoop从mysql导入到hive的时候,mysql中的null值,导入到hive之后变为null字符串
sqoop命令上加入
–null-non-string ‘\N’
–null-string ‘\N’
一定要是\N,双反斜杠这种会把mysql中的null值转换成hive中的NULL值,大写的NULL在hive中就对应空值
4.sqoop 导入过程中报错
ERROR sqoop.Sqoop: Got exception running Sqoop: java.lang.RuntimeException: java.lang.RuntimeException: java.sql.SQLException: The connection property ‘zeroDateTimeBehavior’ acceptable values are: ‘CONVERT_TO_NULL’, ‘EXCEPTION’ or ‘ROUND’. The value ‘convertToNull’ is not acceptable.
原因:
上面的报错信息可以得出zeroDateTimeBehavior可以接受的值是CONVERT_TO_NULL’, ‘EXCEPTION’ or ‘ROUND’
解决办法:在connect参数中指定一下zeroDateTimeBehavior值类型
–connect jdbc:mysql://xxxxx:3306/ry_bi?zeroDateTimeBehavior=CONVERT_TO_NULL
5.sqoop从mysql导入hive数据,时间数据全都多了13个小时
问题原因:mysql时区问题
排查:mysql>show variables like ‘%time_zone%’;
用于查看mysql中的时区 可以看到time_zone | SYSTEM,
mysql中的时区是随系统时区,这时如果你的系统时区显示CST时区,就有可能会出现这种情况,因为CST时区可以同时代表四个时区
两种解决方法:
a、mysql>set global time_zone=’+8:00’;
直接设置mysql全局的时区为东八区,这种方法是要对全局影响的
b、jdbc:mysql://{0}:3306/{1}?zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=’Asia/Shanghai’
在sqoop通过jdbc连接数据库的时候加上时区限制,这种方法只对当前会话生效
6.ERROR sqoop.Sqoop: Got exception running Sqoop: org.kitesdk.data.DatasetNotFoundException: Descriptor location does not exist: hdfs://XXX/.metadata
原因:此报错多为从hive导出到关系型数据库导致。原因为hive表为parquet格式。需要
添加–hcatalog参数
–hcatalog-database hive库名
–hcatalog-table hive表名 \
hive同时对一张表进行操作会发生什么情况
当多个用户同时对一张表进行操作时,可能会发生以下情况:
冲突:如果多个用户同时尝试对同一行数据进行修改或删除操作,可能会发生冲突。为了避免冲突,Hive通常会使用锁机制来控制并发访问。
锁等待:如果一个用户在对表进行操作时已经获取了锁,其他用户需要等待锁释放才能进行操作。这可能导致其他用户的操作被阻塞,直到锁被释放。
数据不一致:如果多个用户同时向表中插入数据,可能会导致数据不一致的问题。这是因为每个用户插入的数据可能与其他用户插入的数据冲突。
为了避免以上问题,可以采取以下措施:
使用事务:Hive支持ACID事务,可以使用事务来确保多个用户对表的操作是原子的,并且保持数据的一致性。
分区操作:如果不同用户对不同的分区进行操作,可以减少冲突和锁等待的发生。
锁机制:Hive提供了锁机制来控制并发访问,可以避免同时对同一行数据进行操作的冲突,并确保数据的一致性。
3、load date
hive> load data [local] inpath ‘数据的 path’ [overwrite] into table student [partition (partcol1=val1,…)];
load data:表示加载数据
local:表示从本地加载数据到 hive 表;否则从 HDFS 加载数据到 hive 表
inpath:表示加载数据的路径
overwrite:表示覆盖表中已有数据,否则表示追加
into table:表示加载到哪张表
student:表示具体的表
partition:表示上传到指定分区
hive数据库里创建一张表
hive (default)> create table student(id string, name string) row format delimited fields terminated by ‘\t’;
加载本地文件到 hive
hive (default)> load data local inpath ‘/opt/module/hive/datas/student.txt’ into table default.student;
加载 HDFS 文件到 hive 中
上传文件到 HDFS
hive (default)> dfs -put /opt/module/hive/data/student.txt /user/atguigu/hive;
加载 HDFS 上数据
hive (default)> load data inpath ‘/user/atguigu/hive/student.txt’ into table default.student;
加载数据覆盖表中已有的数据
上传文件到 HDFS
hive (default)> dfs -put /opt/module/data/student.txt /user/atguigu/hive;
加载数据覆盖表中已有的数据
hive (default)> load data inpath ‘/user/atguigu/hive/student.txt’ overwrite into table default.student;
hive多表插入数据
from test1
insert overwrite table test2
partition (age)
select name,address,school,age
insert overwrite table test3
select name,address
2.21. hive的执行顺序
第一步: FROM
第二步: ON
第三步:
第四步: WHERE
第五步: GROUP BY
第六步: HAVING
第七步: SELECT
第八步: DISTINCT
第九步: ORDER BY
第十步: LIMIT
标准sql语句的一些规则:
-1. 列别名的使用,必须完全符合执行顺序,不能提前使用。(mysql除外)
-2. 在分组查询时,select子句中只能含有分组字段和聚合函数,不能有其他普通字段。(mysql除外)
hive几个大表关联优化
使用分桶表:将表按照相同的分桶列进行分桶,并且确保要关联的字段是分桶列的子集。这样可以实现更有效的关联,减少数据的移动和网络延迟。
使用分区表:将表按照某个或多个字段进行分区,然后进行关联操作。这样可以减少扫描的数据量,提高查询性能。
使用Join优化:Hive提供了多种关联操作的方式,如Map Join、Sort Merge Join等。根据实际情况选择最适合的关联方式进行优化。
增加缓存:如果部分表的数据量较小,可以将其缓存在内存中,以减少磁盘I/O操作。
优化表结构:对表结构进行调整,例如使用压缩格式、选择合适的数据类型、合理设置字段的宽度等,以减少存储空间和提高查询性能。
调整Join顺序:如果多表关联操作中需要进行多次关联,可以通过调整Join的顺序,优先关联小数据量的表,减少关联操作的数据量。
综上所述,可以通过以上方法对Hive中多个大表的关联进行优化,提高查询性能和效率。但是具体的优化策略需要根据具体的数据量、数据分布、查询需求等因素来综合考虑和调整。
Hive数据库导出表前十万条数据到本地路径
INSERT OVERWRITE LOCAL DIRECTORY ‘/path/to/output_folder’
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
SELECT * FROM table_name LIMIT 100000;
将/path/to/output_folder替换为要导出数据的本地文件夹路径。
2.22. Hive数组:
ARRAY 有序的的同类型的集合 array(1,2)
MAP key-value,key必须为原始类型,value可以任意类型 map(‘a’,1,’b’,2)
STRUCT 字段集合,类型可以不同 struct(‘1’,1,1.0), named_stract(‘col1’,’1’,’col2’,1,’clo3’,1.0)
create table tableName(
……
colName array<基本类型>
……)
说明:下标从0开始,越界不报错,以null代替
zhangsan 78,89,92,96
lisi 67,75,83,94
王五 23,12
查询:select name,scores[1] from arr2 where size(scores) > 3;
–统计arr2中的每个人的总成绩
select scores[0]+scores[1]+nvl(scores[2],0)+nvl(scores[3],0) from arr2;
map
create table tableName(
…….
colName map<T,T>
……)
数据:
zhangsan chinese:90,math:87,english:63,nature:76
lisi chinese:60,math:30,english:78,nature:0
wangwu chinese:89,math:25
create table if not exists map1(
name string,
score map<string,int>
)
row format delimited
fields terminated by ‘\t’
collection items terminated by ‘,’
map keys terminated by ‘:’
#查询数学大于35分的学生的英语和自然成绩:
select
m.name,
m.score[‘english’] ,
m.score[‘nature’]
from map1 m
where m.score[‘math’] > 35
size(Map)函数:可得map的长度。返回值类型:int
map_keys(Map)函数:可得map中所有的key;返回值类型: array
map_values(Map)函数:可得map中所有的value;返回值类型: array
判断map中是否包含某个key值:
select array_contains(map_keys(t.params),’k0’);
在k-v对中,若value有多个值的情况,如 {**‘k1’:‘01,02,03’} ,如果要用 ‘k1’ 中 ‘02’作为过滤条件,则语句如下:
(这里用到split来处理)
select * from t where split(t.params[‘k1’],’,’)[1]
将字符串转成map 使用函数str_to_map(text, delimiter1, delimiter2)
text:是字符串
delimiter1:多个键值对之间的分隔符
delimiter2:key和value之间的分隔符
hive优化
一、从建表设计层面
hive里面的表一般内部表和外部表,外部表的插入数据时是不会移动数据文件的位置的,内部表会
hive的表还可以分为分区表和分桶表,分区是分目录用的伪劣,分桶是分文件用的是本身含有的列
如果一张表的查询经常是某个字段的话,这种就适合建立分区表,这样在查询的时候,使用分区字段来过滤,就可以避免全表扫描。只需要扫描这张表的一个分区的数据即可
分桶表的话适合在数据抽样采集的时候使用,它会按照分桶列进行分桶,我们只要看某个桶的数据就可以抽样了
表设计层面也会考虑到数据的存储和压缩,存储上hive表支持TextFile、SequenceFile、ORC、 Parquet等格式,TextFile是最简单的存储格式,它是纯文本记录,也是Hive的默认格式。虽然它的磁盘开销比较大,查询 效率也低,但它更多地是作为跳板来使用。 RCFile、ORC、 Parquet等格式的表都不能由文件直接导入数 据,必须由TextFile来做中转。
创建表时,特别是宽表,尽量使用 ORC、ParquetFile这些列式存储格式,因为列式存储的表,每一列的数据在物理上是存储在一起的,Hive查询时会只遍历需要列数据,大大减少处理的数据量。
压缩有:Gzip、Snappy、LZO、Bzip2,一般压缩我们采用的是gzip压缩,
gzip的压缩率极高,速度比较快
Job 输出文件按照 Block 以 GZip 的方式进行压缩:
Map 输出结果也以 Gzip 进行压缩
对 Hive 输出结果和中间都进行压缩
二、语法和参数方面调优
首先HQL执行的时候会转化成mr作业,我们可以查看其执行计划来得到系统预定的执行顺序
。。。。。。。。。
1.避免使用select * ,查询时具体到具体的列,因为hive一般用列存储,所以在查询具体的列可以减少扫描
2.我们在逻辑复杂的关联时,可以将在where条件中对副表的过滤提前做临时表过滤,这样可以减少关联时候副表的数据量
3.尽量对经常出现在where中的列做分区,这样查询时可以减少扫描的数据量
4.合并小文件:
Map 输入合并
在执行 MapReduce 程序的时候,一般情况是一个文件的一个数据分块需要一个 mapTask 来处理。但是如果数据源是大量的小文件,这样就会启动大量的 mapTask 任务,这样会浪费大量资源。可以将输入的小文件进行合并,从而减少 mapTask 任务数量 。
Map端输入、合并文件之后按照block的大小分割(默认)
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
Map端输入,不合并
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
Map/Reduce输出合并
大量的小文件会给 HDFS 带来压力,影响处理效率。可以通过合并 Map 和 Reduce 的结果文件来消除影响 。
是否合并Map输出文件, 默认值为true
set hive.merge.mapfiles=true;
是否合并Reduce端输出文件,默认值为false
set hive.merge.mapredfiles=true;
合并文件的大小,默认值为256000000
set hive.merge.size.per.task=256000000;
每个Map 最大分割大小
set mapred.max.split.size=256000000;
一个节点上split的最少值
set mapred.min.split.size.per.node=1; // 服务器节点
一个机架上split的最少值
set mapred.min.split.size.per.rack=1; // 服务器机架
默认情况先把这个节点上的所有数据进行合并,如果合并的那个文件的大小超过了256M就开启另外一个文件继续合并 2、如果当前这个节点上的数据不足256M,那么就都合并成一个逻辑切片
5、关联上的优化
1)能尽量在关联前过滤掉数据就尽量过滤掉,(可以用表分区,子查询where过滤再关联)
2)小表join大表的时候一定要小表在前大表在后,因为join操作再reduce阶段左边的表会写进内存,如果表太大会内存溢出,多表关联的时候最好是从左到右依次变大
大表关联小表的时候可以开启map join,MapJoin 是将 join 双方比较小的表直接分发到各个 map 进程的内存中,在 map 进程中进行 join 操作,这样就不用进行 reduce 步骤,从而提高了速度。只有 join 操作才能启用 MapJoin 。
是否根据输入小表的大小,自动将reduce端的common join 转化为map join,将小表刷入内存中。
对应逻辑优化器是MapJoinProcessor
set hive.auto.convert.join = true;
刷入内存表的大小(字节)
set hive.mapjoin.smalltable.filesize = 25000000;
hive会基于表的size自动的将普通join转换成mapjoin
set hive.auto.convert.join.noconditionaltask=true;
多大的表可以自动触发放到内层 LocalTask 中,默认大小10M
set hive.auto.convert.join.noconditionaltask.size=10000000;
Hive 在解析带 join 的 SQL 语句时,会默认将最后一个表作为 probe table,将前面的表作为 build table 并试图将它们读进内存,这种 Join 方式在 map 端直接完成 join 过程,消灭了 reduce,效率很高。而且 MapJoin 还支持非等值连接 。Hive 将JOIN 语句中的最后一个表用于流式传输,因此我们需要确保这个流表在两者之间是最大的
手工开启mapjoin
SELECT /*+ MAPJOIN(smallTable) */ 小表.列, 大表.列 FROM小表 JOIN 大表 ON smallTable.key = bigTable.key;
3)多张表Join的时候,如果关联条件是相同字段,MRjob会将他们放在一个job上运行,可以节省时间
4)大表join大表的时候考虑关联列上会不会有大量的空值或者重复值导致相同key集中到一个reduce上而拖累进度,从而发生数据倾斜(大量重复一般是数据异常、null可以赋值随机数而分发数据)
6、合理设置MapTask并行度
第一:MapReduce中的MapTask的并行度机制
Map数过大:当输入文件特别大,MapTask 特别多,每个计算节点分配执行的 MapTask 都很多,这时候可以考虑减少 MapTask 的数量 ,增大每个 MapTask 处理的数据量。如果 MapTask 过多,最终生成的结果文件数会太多 。
原因:
1、Map阶段输出文件太小,产生大量小文件
2、初始化和创建Map的开销很大
Map数太小:当输入文件都很大,任务逻辑复杂,MapTask 执行非常慢的时候,可以考虑增加 MapTask 数,来使得每个 MapTask 处理的数据量减少,从而提高任务的执行效率 。
原因:
1、文件处理或查询并发度小,Job执行时间过长
2、大量作业时,容易堵塞集群
在 MapReduce 的编程案例中,我们得知,一个 MapReduce Job 的 MapTask 数量是由输入分片 InputSplit 决定的。而输入分片是由 FileInputFormat.getSplit() 决定的。一个输入分片对应一个 MapTask,而输入分片是由三个参数决定的
输入分片大小的计算是这么计算出来的:
long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
默认情况下,输入分片大小和 HDFS 集群默认数据块大小一致,也就是默认一个数据块,启用一个 MapTask 进行处理,这样做的好处是避免了服务器节点之间的数据传输,提高 job 处理效率 。
两种经典的控制 MapTask 的个数方案:减少 MapTask 数 或者 增加 MapTask 数:
1、减少 MapTask 数是通过合并小文件来实现,这一点主要是针对数据源
2、增加 MapTask 数可以通过控制上一个 job 的 reduceTask 个数 重点注意:不推荐把这个值进行随意设置!推荐的方式:使用默认的切块大小即可。如果非要调整,最好是切块的N倍数
第二:合理控制 MapTask 数量
减少 MapTask 数可以通过合并小文件来实现
增加 MapTask 数可以通过控制上一个 ReduceTask 默认的 MapTask 个数
计算方式
输入文件总大小:total_size HDFS 设置的数据块大小:dfs_block_size default_mapper_num = total_size / dfs_block_size
MapReduce 中提供了如下参数来控制 map 任务个数,从字面上看,貌似是可以直接设置 MapTask 个数的样子,但是很遗憾不行,这个参数设置只有在大于 default_mapper_num 的时候,才会生效 。
参数来控制 map 任务个数
默认值是2
set mapred.map.tasks=10;
那如果我们需要减少 MapTask 数量,但是文件大小是固定的,那该怎么办呢?
可以通过 mapred.min.split.size 设置每个任务处理的文件的大小,这个大小只有在大于 dfs_block_size 的时候才会生效
split_size = max(mapred.min.split.size, dfs_block_size)
split_num = total_size / split_size
compute_map_num = Math.min(split_num, Math.max(default_mapper_num, mapred.map.tasks))
这样就可以减少 MapTask 数量了 。
总结一下控制 mapper 个数的方法:
1、如果想增加 MapTask 个数,可以设置 mapred.map.tasks 为一个较大的值
2、如果想减少 MapTask 个数,可以设置 maperd.min.split.size 为一个较大的值
3、如果输入是大量小文件,想减少 mapper 个数,可以通过设置 hive.input.format 合并小文件
如果想要调整 mapper 个数,在调整之前,需要确定处理的文件大概大小以及文件的存在形式(是大量小文件,还是单个大文件),然后再设置合适的参数。不能盲目进行暴力设置,不然适得其反。
MapTask 数量与输入文件的 split 数息息相关,在 Hadoop 源码org.apache.hadoop.mapreduce.lib.input.FileInputFormat 类中可以看到 split 划分的具体逻辑。可以直接通过参数 mapred.map.tasks (默认值2)来设定 MapTask 数的期望值,但它不一定会生效 。
7、合理设置ReduceTask并行度
如果 ReduceTask 数量过多,一个 ReduceTask 会产生一个结果文件,这样就会生成很多小文件,那么如果这些结果文件会作为下一个 Job 的输入,则会出现小文件需要进行合并的问题,而且启动和初始化ReduceTask 需要耗费资源 。
如果 ReduceTask 数量过少,这样一个 ReduceTask 就需要处理大量的数据,容易拖慢运行时间或者造成 OOM(内存不足导致报错),可能会出现数据倾斜的问题,使得整个查询耗时长。默认情况下,Hive 分配的 reducer 个数由下列参数决定:
参数1:hive.exec.reducers.bytes.per.reducer (默认256M)
参数2:hive.exec.reducers.max (默认为1009)
参数3:mapreduce.job.reduces (默认值为-1,表示没有设置,那么就按照以上两个参数 进行设置)
ReduceTask 的计算公式为:N = Math.min(参数2,总输入数据大小 / 参数1)
可以通过改变上述两个参数的值来控制 ReduceTask 的数量。也可以通过
set mapred.map.tasks=10;
set mapreduce.job.reduces=10;
通常情况下,有必要手动指定 ReduceTask 个数。考虑到 Mapper 阶段的输出数据量通常会比输入有大幅减少,因此即使不设定 ReduceTask 个数,重设 参数2 还是必要的 。
依据经验,可以将 参数2 设定为 M * (0.95 * N) (N为集群中 NodeManager 个数)。一般来说,NodeManage 和 DataNode 的个数是一样的。
8、Count Distinct优化
当要统计某一列去重数时,如果数据量很大,count(distinct) 就会非常慢,原因与 order by 类似,count(distinct) 逻辑只会有很少的 reducer 来处理。这时可以用 group by 来改写:
– 先 group by 再 count
SELECT
COUNT(1)
FROM
(
SELECT
age
FROM
student
WHERE
department >= “MA”
GROUP BY
age
) t;
9、Order By优化
order by 只能是在一个 reduce 进程中进行,所以如果对一个大数据集进行 order by ,会导致一个 reduce 进程中处理的数据相当大,造成查询执行缓慢 。
1、在最终结果上进行order by,不要在中间的大数据集上进行排序。如果最终结果较少,可以在一个 reduce上进行排序时,那么就在最后的结果集上进行order by。
2、如果是取排序后的前N条数据,可以使用distribute by和sort by在各个reduce上进行排序后前N 条,然后再对各个reduce的结果集合合并后在一个reduce中全局排序,再取前N条,因为参与全局排序的 order by的数据量最多是reduce个数 * N,所以执行效率会有很大提升。
9、启动中间结果压缩
map 输出压缩
set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
中间数据压缩
中间数据压缩就是对 hive 查询的多个 Job 之间的数据进行压缩。最好是选择一个节省CPU耗时的压缩方式。可以采用 snappy 压缩算法,该算法的压缩和解压效率都非常高。
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
结果数据压缩
最终的结果数据(Reducer输出数据)也是可以进行压缩的,可以选择一个压缩效果比较好的,可以减少数据的大小和数据的磁盘读写时间 。
需要注意:常用的 gzip,snappy 压缩算法是不支持并行处理的,如果数据源是 gzip/snappy压缩文件大文件,这样只会有有个 mapper 来处理这个文件,会严重影响查询效率。所以如果结果数据需要作为其他查询任务的数据源,可以选择支持 splitable 的 LZO 算法,这样既能对结果文件进行压缩,还可以并行的处理,这样就可以大大的提高 job 执行的速度了。
set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.G zipCodec;
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
Hadoop集群支持的压缩算法:
org.apache.hadoop.io.compress.DefaultCodec org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.BZip2Codec org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.SnappyCodec org.apache.hadoop.io.compress.Lz4Codec
com.hadoop.compression.lzo.LzoCodec com.hadoop.compression.lzo.LzopCodec
10、开启本地模式
Hive在集群上查询时,默认是在集群上多台机器上运行,需要多个机器进行协调运行,这种方式很好地解决了大数据量的查询问题。但是在Hive查询处理的瓣量比较小的时候,其实没有必要启动分布 式模式去执行,因为以分布式方式执行设计到跨网络传输、多节点协调等,并且消耗资源。对于小数据 集,可以通过本地模式,在单台机器上处理所有任务,执行时间明显被缩短 。
启动本地模式涉及到三个参数:
##打开hive自动判断是否启动本地模式的开关
set hive.exec.mode.local.auto=true;
map任务晝專大值,*启用本地模式的task最大皋数
set hive.exec.mode.1ocal.auto.input.files.max=4;
map输入文件最大大小,不启动本地模式的最大输入文件大小
set hive.exec.mode.1ocal.auto.inputbytes.max=134217728;
hive子查询
Hive仅在FROM子句中支持子查询(从Hive 0.12版本开始)。必须为子查询指定名称,因为FROM子句中的每个表都必须具有名称。子查询 SELECT 列表中的列必须具有独一无二的名称
Hadoop集群的一些重要参数
1、Core-site.xml
1)指定 HDFS(Hadoop 分布式文件系统)的默认文件系统:通过fs.defaultFS属性指定 HDFS 中 NameNode 的地址和端口。
2)指定 Hadoop 运行时产生文件的存储目录:使用hadoop.tmp.dir属性指定一个临时目录,用于存储 Hadoop 运行时产生的文件。
3)其他相关配置:根据具体需求,还可以在 core-site.xml 文件中添加其他与 Hadoop 核心功能相关的配置项。比如说可以配置hdfs网页登陆的静态用户hadoop.http.staticuser.user
2、hdfs-site.xml
dfs.replication;指定了HDFS数据块的副本数量,默认值为3
dfs.name.dir:指定NameNode元数据的存放位置。
dfs.block.size:定义新文件的块大小,单位为字节。默认值为64MB,建议值为128MB。
dfs.data.dir:指定DataNode在本地磁盘存储数据块的位置,可以是逗号分隔的目录列表。
dfs.namenode.handler.count:设置NameNode处理来自DataNode的RPC请求的线程数量。建议设置为DataNode数量的10%,一般在10-200个之间。
** dfs.namenode.http-address ** nn web 端访问地址
** dfs.namenode.secondary.http-address 2 nn web 端访问地址
<<在hadoop的配置文件中与HDFS(hadoop分布式文件系统)相关的是hdfs-core.xml文件。在伪分布集群中只有一个节点,因此此节点即要有NameNode功能也要有DataNode功能。在工作环境中这两个是不会在一个节点上的,在我们的多节点分布式集群中master只运行NameNode因此需在hdfs-site.xml文件中删除DataNode相关配置。>>
3、yarn-site.xml
*yarn.resourcemanager.hostname:指定ResourceManager的主机名。
yarn.nodemanager.aux-services:指定NodeManager要启动的辅助服务。
yarn.scheduler.minimum-allocation-mb:单个容器可申请的最小内存,默认值为1024MB。
yarn.scheduler.maximum-allocation-mb:单个容器可申请的最大内存,默认值为8192MB。
yarn.scheduler.minimum-allocation-vcores:单个容器可申请的最小虚拟CPU个数,默认值为1。
yarn.scheduler.maximum-allocation-vcores:单个容器可申请的最大虚拟CPU个数,默认值为32。
yarn.nodemanager.resource.memory-mb:每个NodeManager可分配的内存总量,默认值为8192MB。
yarn.nodemanager.resource.cpu-vcores:每个NodeManager可分配的CPU虚拟核数总量,默认值为
** yarn.nodemanager.env-whitelist **环境变量的继承
4、mapred-site.xml
mapreduce.framework.name:指定 MapReduce 框架的名称为yarn,表示将 MapReduce 任务运行在 YARN 上。
mapreduce.jobhistory.address配置 JobHistoryServer 的地址和端口
mapreduce.jobhistory.webapp.address
5、调节namenode 、2nn的内存大小
在 Hadoop 中,NameNode 的内存大小可以调节。NameNode 管理着集群里面所有文件的信息,其内存大小的取值需要根据具体情况进行估算和调整。
Hadoop 配置文件hadoop-env.sh中有个选项HADOOP_NAMENODE_OPTS,此 JVM 选项可用于设置 NameNode 内存的大小。例如,可以通过以下设置将 NameNode 内存分配为 2000MB:
xml
复制
HADOOP_NAMENODE_OPTS=-Xmx2000m
如果改变了 NameNode 的内存大小,那么 SecondaryNameNode 的内存大小同样也要改变,其选项是HADOOP_SECONDARYNAMENODE_OPTS。
看出数据倾斜的方法
- 查看任务进度
o 在 Hive 任务执行过程中,可以通过 Hive 的日志或者任务监控工具(如 YARN 的监控页面)查看任务的进度。如果发现某些任务进度明显慢于其他任务,可能存在数据倾斜。
o 例如,在 YARN 的 ResourceManager 页面上,可以看到各个任务的进度条,如果某些任务的进度长时间停留在较低水平,而其他任务已经接近完成,就可能是数据倾斜导致。 - 分析任务日志
o 查看 Hive 任务的日志文件,寻找与数据倾斜相关的错误信息或警告。常见的提示可能包括 “Container killed by YARN for exceeding memory limits”(容器因超出内存限制被 YARN 杀死)或者 “Task took x times longer than average”(任务花费的时间比平均时间长 x 倍)等。
o 例如,在日志中发现某个 reduce 任务出现内存溢出错误,而其他 reduce 任务正常完成,这很可能是由于该任务处理的数据量过大,即数据倾斜导致。 - 使用 Hive 命令查看任务统计信息
o 在 Hive 命令行中,可以使用 EXPLAIN 命令查看 SQL 语句的执行计划,从中可以了解到数据的分布情况和任务的并行度。如果发现某些 reduce 任务的输入数据量远大于其他任务,可能存在数据倾斜。
o 例如:
EXPLAIN SELECT * FROM big_table JOIN small_table ON big_table.key = small_table.key;
• 执行计划中会显示各个阶段的任务信息,包括 map 任务和 reduce 任务的数量、输入输出数据量等。通过分析这些信息,可以初步判断是否存在数据倾斜。
二、分析数据倾斜原因的方法
- 检查连接操作和分组操作
o 数据倾斜通常在连接(JOIN)和分组(GROUP BY)操作中比较容易出现。如果 SQL 语句中包含连接操作,检查连接键是否存在数据分布不均匀的情况。例如,如果连接键的值非常集中,可能会导致某些 reduce 任务处理的数据量过大。
o 对于分组操作,同样检查分组键是否存在数据倾斜。如果某些分组键的值出现频率非常高,可能会导致对应的 reduce 任务处理的数据量过大。 - 分析数据特点
o 了解数据的特点和分布情况。如果数据本身存在严重的不平衡,例如某些值出现的频率远高于其他值,那么在进行某些操作时就容易出现数据倾斜。
o 例如,在一个用户行为日志表中,如果某些用户的行为记录数量远多于其他用户,那么在按照用户 ID 进行分组或连接操作时,就可能出现数据倾斜。 - 检查数据类型和转换
o 数据类型的不匹配或者不正确的类型转换也可能导致数据倾斜。例如,如果连接键的类型在两个表中不一致,可能会导致 Hive 在进行连接操作时无法正确地进行数据分发,从而引起数据倾斜。
o 检查 SQL 语句中是否存在类型转换操作,如果有,确保这些转换是正确的,并且不会导致数据分布的变化。 - 考虑业务逻辑
o 从业务逻辑的角度分析数据倾斜的原因。例如,如果数据是按照时间维度进行分区的,而某些时间段的数据量远大于其他时间段,那么在进行涉及时间维度的操作时,就可能出现数据倾斜。
o 考虑是否可以通过调整业务逻辑或者数据预处理的方式来避免数据倾斜。例如,可以对数据进行随机采样或者分区调整,使数据更加均匀地分布。
1、什么是数据倾斜
数据倾斜主要表现在, map/reduce程序执行时, reduce节点大部分执行完毕,但是有一个或者几个
reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时 是百倍或者千倍之多),这条Key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几 个节点迟迟运行不完。
2、数据倾斜的原因
一些操作:
关键词 情形 后果
Join 其中一个表较小,但是 key集中 分发到某一个或几个Reduce上的 数据远高于平均值
大表与大表,但是分桶的判断 字段0值或空值过多 这些空值都由一个
reduce处理,灰常慢
group by group by 维度过小,某 值的数量过多 处理某值的reduce灰常耗时
Count Distinct 某特殊值过多 处理此特殊值的reduce耗时
原因:
key分布不均匀
业务数据本身的特性
建表时考虑不周
某些SQL语句本身就有数据倾斜
现象:
任务进度长时间维持在99% (或100%),查看任务监控页面,发现只有少量(1个或几个) reduce 子任务未完成。因为其处理的数据量和其他reduce差异过大。
单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多。 最长时长远大于平均时 长。
3、数据倾斜的解决方案
1)参数调节
Map 端部分聚合,相当于Combiner
有数据倾斜的时候进行负载均衡,当选项设定为true,生成的查询计划会有两个MR Job。第一个MR Job 中, Map 的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理 的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的; 第二个MR
Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key 被分布到同一个Reduce中),最后完成最终的聚合操作。
2)SQL语句调节
如何join:
关于驱动表的选取,选用join key分布最均匀的表作为驱动表,做好列裁剪和filter操作,以达到两表做 join的时候,数据量相对变小的效果。
大小表Join:
使用map join让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。
大表Join大表:
把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处 理后并不影响最终结果。
count distinct大量相同特殊值:
count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在 最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计 算结果进行union。
group by维度过小:
采用sum() group by的方式来替换count(distinct)完成计算。
特殊情况特殊处理:
在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。
4、典型的业务场景
1)空值产生的数据倾斜
场景:如日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的 user_id 关联,会碰到数据倾斜的问题。
解决方法一:user_id为空的不参与关联
1 select * from log a
2 join users b
3 on a .user_id is not null
4 and a .user_id = b.user_id
5 union all
6 select * from log a
7 where a .user_id is null ;
解决方法二:赋与空值分新的key值
1 select *
2 from log a
3 left outer join users b
4 on case when a .user_id is b.user_id ; null then concat(‘hive’, rand() ) else a .user_id end =
结论:
方法2比方法1效率更好,不但io少了,而且作业数也少了。解决方法一中log读取两次, jobs是2。解决 方法二job数是1。这个优化适合无效id (比如 -99 , ’’, null 等) 产生的倾斜问题。把空值的key变成一个字符 串加上随机数,就能把倾斜的数据分到不同的reduce上,解决数据倾斜问题。
2)不同数据类型关联产生数据倾斜
场景:用户表中user_id字段为int ,log表中user_id字段既有string类型也有int类型。当按照user_id进行两 个表的Join操作时,默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分 配到一个Reducer中。
解决方法:把数字类型转换成字符串类型
3)小表不小不大,怎么用 map join 解决倾斜问题
使用map join解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小 表很大,大到map join会出现bug或异常,这时就需要特别的处理。
例子:
1 select * from log a
2 left outer join users b
3 on a .user_id = b.user_id ;
users表有600w+的记录,把users分发到所有的map上也是个不小的开销,而且map join不支持这么大的 小表。如果用普通的join,又会碰到数据倾斜的问题。
解决方法:
1 select /+mapjoin(x)/* from log a
2 left outer join (
3 select /+mapjoin(c)/d.*
4 from ( select distinct user_id from log ) c
5 join users d
6 on c .user_id = d.user_id
7 ) x
8 on a .user_id = b.user_id ;
假如, log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易
的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景 下的数据倾斜问题。
2.23. 什么时候适合用外部表,内部表
数据的来源和管理:
• 如果数据是由 Hive 流程生成并且完全由 Hive 来管理和控制,内部表可能更合适。因为内部表的数据与表定义紧密关联,删除表时会自动清理数据。
• 当数据来自外部系统,并且在 Hive 之外可能还有其他用途或需要被其他系统访问和操作时,外部表是更好的选择,因为删除表不会影响原始数据。
数据的生命周期:
• 对于短期使用、临时计算或测试目的的数据,内部表可能更方便,因为可以方便地进行删除和清理。
• 如果数据具有较长的生命周期,需要长期保存且可能被多个不同的任务或流程使用,外部表更合适,以确保数据的独立性和持久性。
数据的更新和维护:
• 如果数据经常需要更新和修改,内部表在数据管理和一致性方面可能更具优势。
• 对于相对静态、不常修改的数据,外部表可以减少数据迁移和重新加载的成本。
数据的安全性和共享:
• 当数据需要严格的安全控制和权限管理,内部表可以更容易与 Hive 的权限体系集成。
• 如果数据需要在不同的系统或团队之间共享,外部表可以保持数据的原始位置,方便其他系统直接访问。
例如,一个数据仓库中,从业务系统导入的原始数据通常会创建为外部表,以保留数据的原始状态。而在数据清洗和转换过程中生成的中间结果表,如果只在当前数据处理流程中使用,可能会创建为内部表。
又如,对于一些关键的指标计算结果,如果需要长期保存并严格控制其修改和删除操作,会选择内部表。但如果是从外部数据源获取的参考数据,如行业标准数据,可能更适合创建为外部表
当将 Hive 中的内部表转换为外部表时,存储路径通常不会自动改变。
如果您希望更改存储路径,可以在转换为外部表的语句中进行指定,如下所示:
ALTER TABLE internal_table SET TBLPROPERTIES(‘EXTERNAL’=’TRUE’);
ALTER TABLE internal_table SET LOCATION ‘/new_path’;
这样,表就被转换为外部表,并且数据的存储路径被更改为 /new_path
Hive中实现oracle中的merge操作
select A.* from A
left join B
on A.id = B.id
where B.id is null – 找出A表有,B表没有的数据.
union
select * from B; – 找出B表的所有数据(最新数据)进行去重合并.
当然在Hive中我们是极少使用insert操作的,所以要把上面的结果写到指定HDFS路径下,并建立一张Hive外部表去指向它,这样就可以对数据进行再操作了。
2.24. 怎么去执行hive-sql
在 Hive 中执行 SQL 语句可以通过以下几种方式:
一、Hive 命令行界面(CLI)
打开终端或命令提示符,进入 Hive 安装目录。
运行 hive 命令启动 Hive 命令行界面。
在 Hive CLI 中,可以直接输入 Hive SQL 语句并按回车键执行。例如:
SELECT * FROM my_table;
SHOW TABLES;
可以使用分号(;)来分隔多条 SQL 语句,一次性执行多个查询。
二、Hive 脚本文件
使用文本编辑器创建一个包含 Hive SQL 语句的脚本文件,例如 my_script.hql。
在脚本文件中编写 Hive SQL 语句,可以包含多个查询、创建表、插入数据等操作。
在终端或命令提示符中,使用以下命令执行脚本文件:
hive -f /path/to/my_script.hql
其中 /path/to/my_script.hql 是脚本文件的完整路径。
三、通过 Hive JDBC/ODBC 连接
使用支持 JDBC 或 ODBC 连接的数据库客户端工具,如 SQuirreL SQL Client、DBeaver 等。
在客户端工具中配置 Hive 的 JDBC 或 ODBC 连接信息,包括连接 URL、用户名、密码等。
连接成功后,可以在客户端工具的 SQL 编辑器中输入 Hive SQL 语句并执行。
四、在 Hive 交互式 shell 中执行(如 Beeline)
打开终端或命令提示符,进入 Hive 安装目录的 bin 目录。
运行 beeline 命令启动 Beeline 交互式 shell。
在 Beeline 中,可以使用以下命令连接到 Hive 服务器:
beeline -u jdbc:hive2://localhost:10000/default -n username -p password
其中 jdbc:hive2://localhost:10000/default 是 Hive 服务器的连接 URL,username 和 password 是登录用户名和密码。
连接成功后,可以在 Beeline 中输入 Hive SQL 语句并执行。
五、在大数据处理框架中执行(如 Spark、Hadoop 等)
如果使用 Spark 或其他大数据处理框架,可以通过集成 Hive 来执行 Hive SQL 语句。
在 Spark 中,可以使用 SparkSession 对象的 sql 方法执行 Hive SQL 语句。例如:
spark.sql(“SELECT * FROM my_table”).show();
在 Hadoop MapReduce 作业中,可以使用 Hive 的 HiveContext 或 HiveSession 来执行 Hive SQL 查询。
无论使用哪种方式执行 Hive SQL,都需要确保 Hive 服务已经正确安装和配置,并且具有执行 SQL 语句所需的权限。同时,根据具体的执行方式,可能需要了解相应工具的使用方法和配置要求
以下是在 Hive 中知道表名后获取 HDFS 上对应文件路径和大小的步骤:
一、查询表的存储路径
使用 Hive 命令行客户端或者通过其他与 Hive 交互的工具连接到 Hive。
执行以下命令查询表的详细信息,从中获取存储路径:
DESCRIBE FORMATTED table_name;
在输出结果中查找 “Location” 字段,其值就是该表在 HDFS 上的存储路径。
二、获取文件大小
确定 Hive 数据存储在 HDFS 的路径后,可以使用 HDFS 命令来获取该路径下文件的大小。
例如,使用以下命令可以列出指定路径下的文件信息,包括文件大小:
hdfs dfs -du -h /path/to/table_location/
该命令会显示每个文件和目录的大小,以人类可读的格式(如 K、M、G 等)表示。
需要注意的是:
确保你有足够的权限来执行这些操作,包括访问 Hive 元数据和 HDFS 文件系统。
不同版本的 Hive 和 HDFS 可能在命令和输出格式上略有不同,但基本原理是相似的。
最后编辑:严锋 更新时间:2024-10-05 14:25