spark读取hbase遇到的问题

我们的程序会从Table1中取出数据,对每一行将其中的几列(这里用A和B)算出一个新的rowkey,并对整个数据做一些转换,之后以rowkey为key插入Table2中去。如果Table1中一行的数据不同时包含A和B,我们会丢弃这条数据。经过一段时间的运行,我们发现程序会出现丢数据的情况,大概30000万行会丢失1万行左右。并且该情况可以复现。

开始时我们怀疑是程序转换过程中出的问题,但是经过排查,发现在数据从hbase中load出来时一行数据就已经可能缺失。我在load时判断几个关键的列A,B,C,D是否存在,不存在时就将该列补充为“empty”,然后不作处理直接将load的数据写入hbase的新表,发现新表中的确有empty值出现。于是怀疑是不是spark读取hbase的接口有问题,我们使用的是newAPIHadoopRDD这个接口。可是作为一个很基础的接口,不应该出现类似的问题。

为了验证这个接口是不是有问题,我们做了这样一个实验,将一个表的数据读出,对于每一条数据,不做转换,以它本身的rowkey作为rowkey直接写入新表,发现没有问题。于是怀疑还是我们使用方式的问题。这个实验和上一个实验不同的地方就是一个是我们自己算rowkey,一个是数据本身的rowkey。难道是因为一条数据被分开读了导致我们自己算rowkey出现了问题丢了数据。

带着这个问题,我们进一步追踪了load的过程,对于有错的一行数据row X,system.out了所有load时key为row X的情况,果然这一行数据被分成两次读取,并且两次数据合并的结果是该行的最终结果。这导致如果恰好A,B两列被分开读取,这个数据就被我们丢弃了,其余分开读取的情况也会丢一部分数据。

针对这个问题,我们首先做了紧急修复,在load之后使用reduceByKey对数据进行merge,这样虽然可以修复问题,但是无疑会增加一次shuffle,拖慢程序的效率。我们猜想应该是由于hbase读取时设置不当导致数据被分开读取。

我们的代码中是这样读取hbase的

1
2
3
4
conf.set(TableInputFormat.INPUT_TABLE, tableName);
conf.setLong(TableInputFormat.SCAN_BATCHSIZE, 500);
conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 300000);
JavaPairRDD<ImmutableBytesWritable, Result> raw = jsc.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

这里设置了hbase SCAN_BATCHSIZE这个值,会设置scan的batchsize。这个设置的文档是这样说的:

Set the maximum number of values to return for each call to next()

之前一直以为这里是设置一次读取多少行,但是经过询问得知这里的values貌似是读取多少列,并且开启了这个值会导致hbase scan时返回一行的部分结果。于是我们写了demo,将这个值设为1,查看每个key在spark出现的次数,果然他们被读出了和自身column数量一样次。

于是将这个设置注释掉,程序即可正常运行。

进一步的,我们从hbase端代码看看这个设置。hbase的scan会两个成员变量:

  • private boolean allowPartialResults = false;
  • private int batch = -1;

allowPartialResult这个很明显就是会返回部分结果的设置,那么这个batch呢?setBatch()时并不会设置allowPartialResult。但是在Scan的getResultsToAddToCache()函数中,如果batch值大于0,会设置isBatch=true。之后会有这段代码

1
2
3
4
5
6
7
8
9
10
11
// If the caller has indicated in their scan that they are okay with seeing partial results,
// then simply add all results to the list. Note that since scan batching also returns results
// for a row in pieces we treat batch being set as equivalent to allowing partials. The
// implication of treating batching as equivalent to partial results is that it is possible
// the caller will receive a result back where the number of cells in the result is less than
// the batch size even though it may not be the last group of cells for that row.
if (allowPartials || isBatchSet) {
addResultsToList(resultsToAddToCache, resultsFromServer, 0,
(null == resultsFromServer ? 0 : resultsFromServer.length));
return resultsToAddToCache;
}

这里就得出了结论,设置batchSize会导致结果返回时出现一次只返回一行的部分值,一行数据分成多条被返回的情况。


本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。

本文链接:http://thousandhu.github.io/2016/11/03/spark读取hbase遇到的问题/