利用MapReduce从hbase导出数据到hive

最近在做一个从hbase导数据到hive的工作。主要涉及这么几个知识点

  1. MR如何从hbase里拿数据
  2. 如何将数据写成hive的指定格式,我主要用了rcfile

Hbase端

MR中读取hbase主要使用了
这个类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
//public static int Run里设置
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(
scan,
MapTask.class,
Text.class,
LongWritable.class,
job);
//mapper继承 TableMapper
public static class MapTask extends TableMapper<Text, LongWritable> {
//其中map的输入类型是
protected void map(ImmutableBytesWritable row, Result value, Context context) {
//拿到指定family指定列的数据
Cell c = value.getColumnLatestCell(byte[] family,byte[] qualifier);
String tmp = Bytes.toString(CellUtil.cloneValue(c));
}
}

scan可以加filter,指定只拿那些数据。同时可以用addcolume指定只拿哪列。但是这两个是冲突的,addcolumn和fiter的域不同时,filter就失效了。

1
2
3
4
5
6
7
8
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(
byte[] family,
byte[] qualifier,
CompareFilter.CompareOp.EQUAL,
byte[] value);
filterList.addFilter(filter1);
scan.setFilter(filterList);

具体冲突的讨论可以见这篇blog的分析

hive端

MR把数据写出去以后,一般有四种格式可以被hive load。分别是 text, sequencefile,rcfile和orcfile。本来我是打算用orcfile的,结果写完程序才发现我们的hive是0.10,orcfile是0.11才支持,只能用rcfile - -。

orcfile和rcfile这篇文章讲的比较概括,初步了解不错。orcfile可以看看这两个文档。
http://hadoopcraft.blogspot.com/2014/07/generating-orc-files-using-mapreduce.html
https://github.com/mayanhui/hive-orc-mr/blob/master/src/main/java/com/adintellig/hive/orc/mapred/ORCMapper.java

rcfile主要思想就是混合储存模式。先按行划分,再按列划分。并且保证同一行数据位于同一个节点。同时按照列储存可以在读取时跳过不需要的列,跳过不需要的列,减小数据读入。具体可以见这篇文章

MR写rcfile的代码主要有这么几部分,首先是在run里面的设置

1
2
3
4
5
6
7
8
job.setMapOutputKeyClass(LongWritable.class);//key无所谓 我是传了random int 防止数据倾斜
job.setMapOutputValueClass(BytesRefArrayWritable.class);//value一定要是这个
job.setOutputFormatClass(RCFileMapReduceOutputFormat.class);//这个也是定死的
RCFileMapReduceOutputFormat.setColumnNumber(job.getConfiguration(), elementNum);//设置列的number
RCFileMapReduceOutputFormat.setOutputPath(job, new Path(output));
RCFileMapReduceOutputFormat.setCompressOutput(job, true);
job.setNumReduceTasks(reduceNum);//reduce number设置多个亲测没问题

然后这个不需要写reduce,map端

1
2
3
4
5
6
7
8
9
bytes = new BytesRefArrayWritable(elementNum);
int pos=0;
//按照位置把数据依次写进去
for(object item : itemlist)
bytes.set(pos++, getBytesRefWritable(val));
//这个new最好写成成员变量,然后在setup的时候初始一次就好
rn = new Random();
int random = rn.nextInt(reduceNum);
context.write(new LongWritable(random), bytes);

之后在hive中使用

1
2
3
4
5
create external table table(
col1 string,
col2 long,
col3 timestamp
) stored as rcfile location "/hdfs/path";

即可。

最后专门说一下timestamp格式。
这个在写入的时候按照string写入,格式是yyyy-mm-dd hh:mm:ss[.f...],然后create table的时候用timestamp格式就可以。它支持各种处理时间的udf,比如year()就可以把时间拿出来。相关文档的链接,udf的链接

文档中关于timestamp格式的说明最重要的一段我摘出来放在这里了

Timestamps are interpreted to be timezoneless and stored as an offset from the UNIX epoch. Convenience UDFs for conversion to and from timezones are provided (to_utc_timestamp, from_utc_timestamp).
All existing datetime UDFs (month, day, year, hour, etc.) work with the TIMESTAMP data type.

最后,hive有一个开源机器学习库,hivemall:https://github.com/myui/hivemall先做个备忘。


本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。

本文链接:http://thousandhu.github.io/2015/11/30/利用MapReduce从hbase导出数据到hive/