阅读 142

使用Spark批量导入数据到JanusGraph完整实践

序章

啰嗦一下

  图形数据库是一种非关系型数据库,它应用图理论存储实体之间的关系信息,在描述、存储、查询知识图谱的关联关系方面具有天生的优势。目前,常用的图形数据库有Neo4j,JanusGraph,Giraph, TigerGraph等。其中Neo4j由于采用自己的存储方式,性能优势突出,但是社区版不支持集群,可扩展性比较差;JanusGraph支持Hbase、Cassanda、Google Cloud Bigtable等作为底层存储,支持Elaticsearch、Apache Solr、Apache Lucene作为底层索引,实现tinkerpop标准图框架,可扩展性较强。

  开发层面通常使用嵌入式的Java应用程序或连接JanusGraph Server的模式与JanusGraph数据交互,但作为分布式图数据库的一个代表,不与Spark这样的分布式计算引擎产生一点瓜葛,心里有些芥蒂,本次实践基于最新的jg 0.4版本,记录了我在使用Spark On Yarn模式批量导入图数据到JanusGraph中遇到的错误和排除过程,分享,共勉。

交待背景

  • JanusGraph v0.4 (存储到HBase)
  • Spark 2.3.1 (HDP)
  • CentOS 7.4
  • IDEA社区版2019

出发吧

maven主要配置

满足功能前提下,jar包依赖越少越好

<dependencies>

        <dependency>
            <groupId>org.janusgraph</groupId>
            <artifactId>janusgraph-all</artifactId>
            <version>0.4.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.janusgraph</groupId>
                    <artifactId>janusgraph-berkeleyje</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-graphx -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-graphx_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.tinkerpop/spark-gremlin -->
        <dependency>
            <groupId>org.apache.tinkerpop</groupId>
            <artifactId>spark-gremlin</artifactId>
            <version>3.4.4</version>
        </dependency>

    </dependencies>
复制代码
  1. janusgraph-berkeleyje这个东西无法下载,但我们也用不到这个存储,直接扔
  2. httpclient这个包是es索引需要的库,之前专栏有讲
  3. 代码是从hdfs上的一个csv文件读取数据,通过spark写入到JanusGraph

贴代码

def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("test-load-data")
      .master("local[*]") //注意,打包提交时注释本行,使用yarn模式
      .getOrCreate()

    val rdd = spark.read.csv("hdfs://101.bigdata:8020/xxx/graphData.csv")
      .rdd.map(x => {
      (x.getString(0), x.getString(1).toInt, x.getString(2))
    })

    rdd.foreachPartition { x => {
      //var tx = janusGraph.newTransaction()
      val janusGraph = JanusGraphFactory.open(conf)
      val g = janusGraph.traversal()
      var counts = 0L
      try {
        x.foreach(y => {
          g.addV(y._1).property("property1", y._2).property("property2", y._3).next()
          counts += 1
          if (counts == 1000) {
            g.tx().commit()
            println("################################################")
            println("#################" + counts + "#################")
            println("################################################")
          }
        })
      } finally {
        g.tx().commit()
        janusGraph.close()
      }
    }
    }
    println("-------这表示结束--------")
  }
复制代码

Bug1 - LZ4BlockInputStream

Exception in thread "main" java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V

解决办法

jar包冲突所致,在依赖中排除一个低版本的相关依赖,如下lz4那个

 <dependency>
            <groupId>org.janusgraph</groupId>
            <artifactId>janusgraph-all</artifactId>
            <version>0.4.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.janusgraph</groupId>
                    <artifactId>janusgraph-berkeleyje</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>lz4</artifactId>
                    <groupId>net.jpountz.lz4</groupId>
                </exclusion>
            </exclusions>
        </dependency>
复制代码

如此,本地运行时就是ok的。

Bug2 - StopWatch

spark-submit提交时,指定master为yarn,cluster,报错: java.lang.NoSuchMethodError:com.google.common.base.StopWatch.createStarted()

一查,这个错倒是蛮常见,罪魁祸首是一个叫guava的包,捯饬了半天,最后搞定:

解决办法

注意一下jar包加载顺序:

错误的根本在于执行spark提交的任务过程中使用了guava-14这个包,而StopWatch方法出现于15版本以后。

spark2/jars路径下的guava包为程序中使用的版本是14,改为16或18都行,依据最小改动原则,我们改为版本16。只修改spark安装目录的包不行,因为提交yarn运行时,yarn还有一份spark的依赖,最终要改这里:把机器上spark2-hdp-yarn-archive.tar.gz目录下的guava包替换为16版本的jar包即可。

数据写好了。

下一步

然后,我打算继续看看Spark读取图数据的事情,使用Spark进行图数据的OLAP分析,再会吧。
复制代码

❀豪华的感谢❀

部分截图由我的小伙伴支援,感谢 @小样儿 基情兹吃!

关注下面的标签,发现更多相似文章
评论