最近做实时数仓用到了spark streaming和kudu两个组件,因为资料少得可怜,折腾了一番终于是搞定了,在这里记录下期间遇到的坑
先通过Impala建张Kudu表
create table kudu_appbind_test(
md5 string,
userid string,
datetime_ string,
time_ string,
cardno string,
flag string,
cardtype string,
primary key(md5,userid,datetime_)
)
stored as kudu;
依赖选择
参考kudu官网:kudu.apache.org/docs/develo… 官网上提及了几点关键信息
- Use the kudu-spark_2.10 artifact if using Spark with Scala 2.10. Note that Spark 1 is no longer supported in Kudu starting from version 1.6.0. So in order to use Spark 1 integrated with Kudu, version 1.5.0 is the latest to go to.
- Use kudu-spark2_2.11 artifact if using Spark 2 with Scala 2.11.
- kudu-spark versions 1.8.0 and below have slightly different syntax.
- Spark 2.2+ requires Java 8 at runtime even though Kudu Spark 2.x integration is Java 7 compatible. Spark 2.2 is the default dependency version as of Kudu 1.5.0.
我这里是使用spark 2.4.0、scala 2.11、kudu 1.8.0,所以也该选择 kudu-spark_2.11-1.8.0.jar,maven中配置如下:
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.8.0</version>
</dependency>
但是针对如下写入语句时报错
kuduDF.write.format("kudu")
.mode("append")
.option("kudu.master", "server:7051")
.option("kudu.table", "impala::kudu_appbind_test")
.mode("append")
.save()
java.lang.ClassNotFoundException: Failed to find data source: kudu. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:649)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
... 49 elided
Caused by: java.lang.ClassNotFoundException: kudu.DefaultSource
at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:628)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:628)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:628)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:628)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:628)
... 51 more
从报错信息来看,kudu不是spark的Data Source。百度了一下,看到有人说把上面那个jar包换成1.9.0版本,也就是 kudu-spark_2.11-1.9.0.jar。还是报错了
# 使用 kudu-spark2_2.11-1.9.0.jar
java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.kudu.spark.kudu.DefaultSource not a subtype
at java.util.ServiceLoader.fail(ServiceLoader.java:239)
at java.util.ServiceLoader.access$300(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:624)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
... 49 elided
继续看了下kudu这里的源码,发现kudu中 org.apache.kudu.spark.kudu
类里写的是
implicit class KuduDataFrameWriter[T](writer: DataFrameWriter[T]) {
def kudu = writer.format("org.apache.kudu.spark.kudu").save
}
它format的写法和官网format("kudu")
不同,最后我也改成这个,发现居然可以了
kudu.write.format("org.apache.kudu.spark.kudu")
.mode("append")
.option("kudu.master", "server:7051")
.option("kudu.table", "impala::kudu_appbind_test")
.save()
Spark 集成 Kudu的几个限制
- Kudu tables with a name containing upper case or non-ascii characters must be assigned an alternate name when registered as a temporary table.
- Kudu tables with a column name containing upper case or non-ascii characters may not be used with SparkSQL. Columns may be renamed in Kudu to work around this issue.
- <> and OR predicates are not pushed to Kudu, and instead will be evaluated by the Spark task. Only LIKE predicates with a suffix wildcard are pushed to Kudu, meaning that LIKE "FOO%" is pushed down but LIKE "FOO%BAR" isn’t.
- Kudu does not support every type supported by Spark SQL. For example, Date and complex types are not supported.
- Kudu tables may only be registered as temporary tables in SparkSQL. Kudu tables may not be queried using HiveContext.
-
当注册为临时表时,必须为名称包含大写或非ascii字符的Kudu表分配备用名称。
-
包含大写或非ascii字符的列名的Kudu表不能与SparkSQL一起使用。可以在Kudu中重命名列以解决此问题。
-
<>并且OR谓词不会被推送到Kudu,而是由Spark任务进行评估。只有LIKE带有后缀通配符的谓词才会被推送到Kudu,这意味着它LIKE "FOO%"被推下但LIKE "FOO%BAR"不是。
-
Kudu不支持Spark SQL支持的每种类型。例如, Date不支持复杂类型。
-
Kudu表只能在SparkSQL中注册为临时表。使用HiveContext可能无法查询Kudu表。
- 还有DataFrame在写入Kudu表时,要保证Columns的列名要和Kudu表的列名一一相符。
- 在涉及到Kudu分区使用时,DataFrame要写入Kudu分区的数据一定是已有的分区,不能插入不存在的分区
文章首发于:blog.csdn.net/lzw2016/art…
更多大数据相关Tips可以关注:github.com/josonle/Cod… 和 github.com/josonle/Big…