我们的程序会从Table1中取出数据,对每一行将其中的几列(这里用A和B)算出一个新的rowkey,并对整个数据做一些转换,之后以rowkey为key插入Table2中去。如果Table1中一行的数据不同时包含A和B,我们会丢弃这条数据。经过一段时间的运行,我们发现程序会出现丢数据的情况,大概30000万行会丢失1万行左右。并且该情况可以复现。
开始时我们怀疑是程序转换过程中出的问题,但是经过排查,发现在数据从hbase中load出来时一行数据就已经可能缺失。我在load时判断几个关键的列A,B,C,D是否存在,不存在时就将该列补充为“empty”,然后不作处理直接将load的数据写入hbase的新表,发现新表中的确有empty值出现。于是怀疑是不是spark读取hbase的接口有问题,我们使用的是newAPIHadoopRDD
这个接口。可是作为一个很基础的接口,不应该出现类似的问题。
为了验证这个接口是不是有问题,我们做了这样一个实验,将一个表的数据读出,对于每一条数据,不做转换,以它本身的rowkey作为rowkey直接写入新表,发现没有问题。于是怀疑还是我们使用方式的问题。这个实验和上一个实验不同的地方就是一个是我们自己算rowkey,一个是数据本身的rowkey。难道是因为一条数据被分开读了导致我们自己算rowkey出现了问题丢了数据。
带着这个问题,我们进一步追踪了load的过程,对于有错的一行数据row X,system.out了所有load时key为row X的情况,果然这一行数据被分成两次读取,并且两次数据合并的结果是该行的最终结果。这导致如果恰好A,B两列被分开读取,这个数据就被我们丢弃了,其余分开读取的情况也会丢一部分数据。
针对这个问题,我们首先做了紧急修复,在load之后使用reduceByKey对数据进行merge,这样虽然可以修复问题,但是无疑会增加一次shuffle,拖慢程序的效率。我们猜想应该是由于hbase读取时设置不当导致数据被分开读取。
我们的代码中是这样读取hbase的
|
|
这里设置了hbase SCAN_BATCHSIZE这个值,会设置scan的batchsize。这个设置的文档是这样说的:
Set the maximum number of values to return for each call to next()
之前一直以为这里是设置一次读取多少行,但是经过询问得知这里的values貌似是读取多少列,并且开启了这个值会导致hbase scan时返回一行的部分结果。于是我们写了demo,将这个值设为1,查看每个key在spark出现的次数,果然他们被读出了和自身column数量一样次。
于是将这个设置注释掉,程序即可正常运行。
进一步的,我们从hbase端代码看看这个设置。hbase的scan会两个成员变量:
private boolean allowPartialResults = false;
private int batch = -1;
allowPartialResult这个很明显就是会返回部分结果的设置,那么这个batch呢?setBatch()时并不会设置allowPartialResult。但是在Scan的getResultsToAddToCache()函数中,如果batch值大于0,会设置isBatch=true。之后会有这段代码
|
|
这里就得出了结论,设置batchSize会导致结果返回时出现一次只返回一行的部分值,一行数据分成多条被返回的情况。
本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。
本文链接:http://thousandhu.github.io/2016/11/03/spark读取hbase遇到的问题/