Python技术栈与Spark大数据平台整合实战--大数据ML样本集案例实战

844 阅读4分钟

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

1 Python技术栈与Spark大数据数据平台整合

  • 下载Anaconda3 Linux版本

      Anaconda3-5.3.1-Linux-x86_64.sh
    
  • 安装Anaconda3

      bash Anaconda3-5.3.1-Linux-x86_64.sh -b 
    
  • 环境变量配置PYSPARK_DRIVER_PYTHON以及PYSPARK_PYTHON配置

      export SCALA_HOME=/usr/local/install/scala-2.11.8
      export JAVA_HOME=/usr/lib/java/jdk1.8.0_45
      export HADOOP_HOME=/usr/local/install/hadoop-2.7.3
      export SPARK_HOME=/usr/local/install/spark-2.3.0-bin-hadoop2.7
      export FLINK_HOME=/usr/local/install/flink-1.6.1
      
      export ANACONDA_PATH=/root/anaconda3
      export PYSPARK_DRIVER_PYTHON=$ANACONDA_PATH/bin/ipython
      export PYSPARK_PYTHON=$ANACONDA_PATH/bin/python
      
      
      export JRE_HOME=${JAVA_HOME}/jre
      export CLASS_PATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
      export PATH=:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${HADOOP_HOME}/bin:${SPARK_HOME}/bin:$PATH
      export PATH=/root/anaconda3/bin:$PATH
    
  • 启动Saprk

  • 启动jupyter notebook

      老版本
      PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root" pyspark
      
      未来版本
      PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=`jupyter notebook --allow-root` pyspark
    

  • jupyter远程访问

      jupyter notebook --generate-config
    
      vi ~/.jupyter/jupyter_notebook_config.py
      c.NotebookApp.ip = '*' # 允许访问此服务器的 IP,星号表示任意 IP
      c.NotebookApp.open_browser = False # 运行时不打开本机浏览器
      c.NotebookApp.port = 12035 # 使用的端口,随意设置
      c.NotebookApp.enable_mathjax = True # 启用 MathJax
      c.NotebookApp.allow_remote_access = True
    
  • jupyter NoteBook开发界面

  • spark程序调试

    lines=sc.textFile("/LICENSE")
    pairs = lines.map(lambda s: (s, 1))
    counts = pairs.reduceByKey(lambda a, b: a + b)
    
    counts.count()
    243
    
    counts.first()
    ('                                 Apache License', 1)
  • Standalone模式启动

      PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root" MASTER=spark://SparkMaster:7077 pyspark
    

2 Spark转换运算

2.1 scala 操作

    val intRDD=sc.parallelize(List(1,2,3))
    intRDD.collect
    Array[Int] = Array(1, 2, 3)

2.2 python 操作

  • python基础RDD操作

      #parallelize
      intRDD=sc.parallelize([1,2,3])
      intRDD.collect()
      [1, 2, 3]
      
      StringRDD=sc.parallelize(["Apple","Orange"])
      StringRDD.collect()
      ['Apple', 'Orange']
      
      #具名函数
      def addOne(x):
          return x+1
      intRDD.map(addOne).collect()
     
     #匿名函数
     intRDD=sc.parallelize([1,2,3])
     intRDD.map(lambda x:x+1).collect() 
     [2, 3, 4]
     
     #过滤器
     intRDD.filter(lambda x:1< x and x<5).collect()
     [2, 3]
     
     #in
     stringRDD =sc.parallelize(["apple","blue"])
     stringRDD.filter(lambda x:"apple" in x).collect()
     ['apple']
     
     #distinct
     intRDD=sc.parallelize([1,2,3,2,7])
     intRDD.distinct().collect()
     [1, 2, 3, 7]
     
     #randomSplit
     sRDD=intRDD.randomSplit([0.4,0.6])
     sRDD[0].collect()
     [1, 2]
     
     #groupBy
     group=intRDD.groupBy(lambda x:"even" if(x%2==0) else "odd").collect()
     print(group)
     
     [('odd', <pyspark.resultiterable.ResultIterable object at 0x7f2186897978>), ('even', <pyspark.resultiterable.ResultIterable object at 0x7f21868978d0>)] 
    
     print (sorted(group[0][1]))
     [1, 3, 7]
    
     print (sorted(group[1][1]))
     [2, 2]
    
  • python多个RDD转换操作

     intRDD1=sc.parallelize(["apple","blue"])
     intRDD2=sc.parallelize([1,2])
     intRDD3=sc.parallelize(["apple","blue"])
     
     #合并运算
     intRDD1.union(intRDD2).union(intRDD3).collect()
     
     ['apple', 'blue', 1, 2, 'apple', 'blue']
    
     #交集运算
     intRDD1=sc.parallelize([3,1,2,5,5])
     intRDD2=sc.parallelize([5,6])
     intRDD3=sc.parallelize([2,7])
     intRDD1.intersection(intRDD2).collect()
     [5]
     
     intRDD1=sc.parallelize([3,1,2,5,5])
     intRDD2=sc.parallelize([5,6])
     intRDD3=sc.parallelize([2,7])
     intRDD1.subtract(intRDD2).collect()
     [2, 3, 1]
     
     intRDD1.first()
     intRDD1.take(3)
     
     intRDD1.takeOrdered(3)
     [1, 2, 3]
     
     intRDD1.takeOrdered(3,lambda x:-x)
     [5, 5, 3]
    
  • Python RDD基于Key-Value转换

      kvRDD1=sc.parallelize([(3,4),[3,6],[5,6],[1,2]])
      kvRDD1.collect()
      [(3, 4), [3, 6], [5, 6], [1, 2]]
      
      kvRDD1.keys().collect()
      [3, 3, 5, 1]
      
      kvRDD1.values().collect()
      [4, 6, 6, 2]
      
      kvRDD1.filter(lambda keyvalue :keyvalue[0]<5).collect()
      [(3, 4), [3, 6], [1, 2]]
      
      kvRDD1.mapValues(lambda x:x*x).collect()
      [(3, 16), (3, 36), (5, 36), (1, 4)]
      
      kvRDD1.sortByKey(ascending=False).collect()
      [[1, 2], (3, 4), [3, 6], [5, 6]]
      
      kvRDD1.reduceByKey(lambda x,y:x+y).collect()
      [(3, 10), (5, 6), (1, 2)]
    
  • Python 多个RDD 转换操作

      #join
      kvRDD1=sc.parallelize([(3,4),[3,6],[5,6],[1,2]])
      kvRDD2=sc.parallelize([(3,8)])  
      kvRDD1.join(kvRDD2).collect() 
      
      [(3, (4, 8)), (3, (6, 8))]
      
      #左连接
      kvRDD1.leftOuterJoin(kvRDD2).collect()
      [(3, (4, 8)), (3, (6, 8)), (5, (6, None)), (1, (2, None))]
      
      #右连接
      kvRDD1.rightOuterJoin(kvRDD2).collect()
      [(3, (4, 8)), (3, (6, 8))]
      
      #去除掉相同的key
      kvRDD1.subtractByKey(kvRDD2).collect()
      [(5, 6), (1, 2)]
      
      kvRDD1.countByKey()
      defaultdict(int, {3: 2, 5: 1, 1: 1})
      
      
      #创建字典,对于Key=3的以value=6为输出
      KV1=kvRDD1.collectAsMap()
      {3: 6, 5: 6, 1: 2}
      KV1[3]
      6
      
      kvRDD1.lookup(3)
      [4, 6]
    
  • Python 的广播变量

      kvFruit = sc.parallelize([(1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")])
      FruitMap=kvFruit.collectAsMap()
      
      print(FruitMap)
      
      #广播
      broadcastFruitMap=sc.broadcast(FruitMap)
      print(broadcastFruitMap.value)
      {1: 'apple', 2: 'orange', 3: 'banana', 4: 'grape'}
    
      #取出广播
      fruitIds =sc.parallelize([2,4,3,1])
      fruitNames =fruitIds.map(lambda x:broadcastFruitMap.value[x]).collect()
      print ("水果名称" +str(fruitNames))
      
      水果名称['orange', 'grape', 'banana', 'apple']
    
  • Python 的累加器

      intRDD=sc.parallelize([1,2,3])
      
      total=sc.accumulator(0.0)
      num=sc.accumulator(0)
      
      intRDD.foreach(lambda i:[total.add(i),num.add(1)])
      avg=total.value/num.value
      print (str(total.value )+" "+ str(num.value) + " "+ str(avg))
      
      6.0 3 2.0
    
  • Python持久化操作

      intRDD=sc.parallelize([1,2,3])
      intRDD.persist()
      
      intRDD.is_cached
      
       #没有执行成功
      intRDD.persist(StorageLevel.MEMORY_AND_DISK)
    
  • python 综合案例

      textFile=sc.textFile("/LICENSE")
      
      stringRDD = textFile.flatMap(lambda line:line.split(" ")).map(lambda x: (x,1)).reduceByKey(lambda x,y:x+y)
      print(stringRDD.take(10))
      stringRDD.saveAsTextFile("/pythonWordCount")
      
      [('', 1445), ('Apache', 6), ('License', 9), ('Version', 2), ('2.0,', 1), ('January', 1), ('2004', 1), ('http://www.apache.org/licenses/', 1), ('TERMS', 2), ('AND', 3)]
    

3 总结

通过Python技术栈与Spark大数据数据平台整合,我们将实现python生态最完善的计算和可视化体系。

秦凯新 于深圳 201812132319

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。