// 转化成RDD
val lines = sc.textFile("../../full-server.log")
val defualtRDD = lines.filter(line => line.contains("org.pbsframework.ebs.impl.DefaultDataBus"))
val defaultMes = lines.filter(line => line.contains("org.springframework.jms.listener.DefaultMessageListenerContainer"))
// 对RDD直接操作(Actions)
val unionAll = defualtRDD.union(defaultMes)
println("Input defualtRDD:count:" + defualtRDD.count() + "concerning lines")
println("Here are 10 example:")
defualtRDD.take(10).foreach(println)
// save as file
lines.saveAsTextFile()
- 键值对RDD通常用来进行聚合计算
-
文件格式与文件系统
格式名称 结构化 文本文件 否 JSON 半结构化 CSV 是 SequenceFiles 是 Protocol Buffers 是 对象文件 是 -
Spark SQL中的结构化数据源
-
数据库与键值存储
- java数据库连接
- 提交到任务中
./spark-submit --class "SimpleApp" --master local[4] ~/phwang/job/spark-job-1.0-SNAPSHOT.jar 其中 local[4]的意思是:使用四个核心
- 把用户程序转换成任务
- 读取
// load jdbc driver
Class.forName("com.mysql.jdbc.Driver").newInstance
val prop = new Properties()
prop.put("user","test")
prop.put("password","test")
// create DataFrame
val df: DataFrame = sqlContext.read.jdbc("jdbc:mysql://localhsot:3306", "user", prop)
// show schema
df.schema()
// show data table row count
df.count()
// print table content
df.show()
- 写数据至数据库
// load jdbc driver
Class.forName("com.mysql.jdbc.Driver").newInstance
val url = "jdbc:mysql://localhost:3306/test"
val prop = new Properties()
prop.put("user","test")
prop.put("password","test")
// init data
case class user :(id: String, name :String)
val users = sparkContext.parallelize(1 to 10).map(i => User("id_" + i, "name_" + i)).map(i => Row(i.id, i.name))
// create schema
val schema = StructType(Array(StructFiled("id", StringType, true),StructFiled("name", StringType, true)))
// create dataFrame
val df = sqlContext.createDataFrame(users, schema)
// write data for mysql
df.write.mode(SaveMode.Append).jdbc(url, "user", prop)
// when the database is oracle. use follow code
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(df, url, "user", prop)
- spark-submit方式提交的时候,会出现找不到驱动,直接--jars也无法找到驱动 使用下面这种方式就可以:
./spark-submit --master local[2] --driver-class-path /opt/users/ojdbc14.jar --class "DBShow" /opt/users/jobs/spark-read-db-jobs-SNAPSHOT.jar
- 或者直接把驱动加载
conf/spark-env.sh
这个文件里也可以
export $SPARK_CLASSPAHT=$SPARK_CLASSPAHT:/opt/users/ojdbc14.jar