@@ -18,10 +18,29 @@ case class exportDf(file: String, content: String)
18
18
19
19
object CSVTool extends LazyLogging {
20
20
21
- def apply (spark : SparkSession , path : String , schema : StructType , delimiter : Option [String ] = None , escape : Option [String ] = None , multiline : Option [Boolean ] = None , dateFormat : Option [String ] = None , timestampFormat : Option [String ] = None , removeNullRows : Option [String ] = None , isCast : Boolean = true ): Dataset [Row ] = {
21
+ def apply (
22
+ spark : SparkSession ,
23
+ path : String ,
24
+ schema : StructType ,
25
+ delimiter : Option [String ] = None ,
26
+ escape : Option [String ] = None ,
27
+ multiline : Option [Boolean ] = None ,
28
+ dateFormat : Option [String ] = None ,
29
+ timestampFormat : Option [String ] = None ,
30
+ removeNullRows : Option [String ] = None ,
31
+ isCast : Boolean = true
32
+ ): Dataset [Row ] = {
22
33
val mandatoryColumns = DFTool .getMandatoryColumns(schema)
23
34
val optionalColumns = DFTool .getOptionalColumns(schema)
24
- val df = read(spark : SparkSession , path : String , delimiter, escape, multiline, dateFormat, timestampFormat)
35
+ val df = read(
36
+ spark : SparkSession ,
37
+ path : String ,
38
+ delimiter,
39
+ escape,
40
+ multiline,
41
+ dateFormat,
42
+ timestampFormat
43
+ )
25
44
26
45
DFTool .existColumns(df, mandatoryColumns)
27
46
val dfWithCol = DFTool .addMissingColumns(df, optionalColumns)
@@ -37,8 +56,13 @@ object CSVTool extends LazyLogging {
37
56
dfNull
38
57
}
39
58
40
- def write (df : DataFrame , path : String , mode : org.apache.spark.sql.SaveMode ): Unit = {
41
- df.write.format(" csv" )
59
+ def write (
60
+ df : DataFrame ,
61
+ path : String ,
62
+ mode : org.apache.spark.sql.SaveMode
63
+ ): Unit = {
64
+ df.write
65
+ .format(" csv" )
42
66
.option(" delimiter" , " ," )
43
67
.option(" header" , value = false )
44
68
.option(" nullValue" , null )
@@ -51,18 +75,30 @@ object CSVTool extends LazyLogging {
51
75
.save(path)
52
76
}
53
77
54
- def read (spark : SparkSession , path : String , delimiter : Option [String ] = None , escape : Option [String ], multiline : Option [Boolean ] = None , dateFormat : Option [String ], timestampFormat : Option [String ] = None ): Dataset [Row ] = {
78
+ def read (
79
+ spark : SparkSession ,
80
+ path : String ,
81
+ delimiter : Option [String ] = None ,
82
+ escape : Option [String ],
83
+ multiline : Option [Boolean ] = None ,
84
+ dateFormat : Option [String ],
85
+ timestampFormat : Option [String ] = None
86
+ ): Dataset [Row ] = {
55
87
val headers = getCsvHeaders(spark, path, delimiter)
56
88
val schemaSimple = getStringStructFromArray(headers)
57
89
logger.info(schemaSimple.prettyJson)
58
- val csvTmp = spark.read.format(" csv" )
90
+ val csvTmp = spark.read
91
+ .format(" csv" )
59
92
.schema(schemaSimple)
60
93
.option(" multiline" , multiline.getOrElse(false ))
61
94
.option(" delimiter" , delimiter.getOrElse(" ," ))
62
95
.option(" header" , value = true )
63
96
.option(" quote" , " \" " )
64
97
.option(" escape" , escape.getOrElse(" \" " ))
65
- .option(" timestampFormat" , timestampFormat.getOrElse(" yyyy-MM-dd HH:mm:ss" ))
98
+ .option(
99
+ " timestampFormat" ,
100
+ timestampFormat.getOrElse(" yyyy-MM-dd HH:mm:ss" )
101
+ )
66
102
.option(" dateFormat" , dateFormat.getOrElse(" yyyy-MM-dd" ))
67
103
.option(" columnNameOfCorruptRecord" , " _corrupt_record" )
68
104
.option(" mode" , " PERMISSIVE" )
@@ -82,7 +118,11 @@ object CSVTool extends LazyLogging {
82
118
83
119
}
84
120
85
- def getCsvHeaders (spark : SparkSession , path : String , delimiter : Option [String ]): Array [String ] = {
121
+ def getCsvHeaders (
122
+ spark : SparkSession ,
123
+ path : String ,
124
+ delimiter : Option [String ]
125
+ ): Array [String ] = {
86
126
val data = spark.sparkContext.textFile(path)
87
127
val header = data.first()
88
128
val headers = header.split(delimiter.getOrElse(" ," ))
@@ -100,50 +140,69 @@ object CSVTool extends LazyLogging {
100
140
}
101
141
102
142
/**
103
- * Exports local files
104
- *
105
- * @param df a dataframe with
106
- * @param fileColumn shall be a string
107
- * @param contentColumn shall be a string
108
- * @param folder
109
- */
110
- def writeDfToLocalFiles (df : DataFrame , fileColumn : String , contentColumn : String , folder : String ) = {
143
+ * Exports local files
144
+ *
145
+ * @param df a dataframe with
146
+ * @param fileColumn shall be a string
147
+ * @param contentColumn shall be a string
148
+ * @param folder
149
+ */
150
+ def writeDfToLocalFiles (
151
+ df : DataFrame ,
152
+ fileColumn : String ,
153
+ contentColumn : String ,
154
+ folder : String
155
+ ) = {
111
156
112
157
// validate folder exists
113
158
if (! Files .exists(Paths .get(folder)))
114
159
throw new InvalidPathException (folder, folder)
115
160
116
161
import df .sparkSession .implicits ._
117
162
val ds = df
118
- .select(col(fileColumn).cast(StringType ) as " file" , col(contentColumn).cast(StringType ) as " content" )
163
+ .select(
164
+ col(fileColumn).cast(StringType ) as " file" ,
165
+ col(contentColumn).cast(StringType ) as " content"
166
+ )
119
167
.as[exportDf]
120
168
121
- ds.collect().foreach(
122
- p => {
169
+ ds.collect()
170
+ .foreach( p => {
123
171
val fileName = folder + " /" + p.file + " .txt"
124
172
val writerAnn = new java.io.PrintWriter (fileName, " UTF-8" )
125
173
if (p.content != null )
126
174
writerAnn.write(p.content)
127
175
writerAnn.close()
128
- }
129
- )
176
+ })
130
177
logger.info(s " Exported ${ds.count} files " )
131
178
132
179
}
133
180
134
- def writeCsvLocal (df : DataFrame , tempPath : String , localPath : String , options : Map [String , String ] = Map (), format : String = " csv" ) = {
181
+ def writeCsvLocal (
182
+ df : DataFrame ,
183
+ tempPath : String ,
184
+ localPath : String ,
185
+ options : Map [String , String ] = Map (),
186
+ format : String = " csv"
187
+ ) = {
135
188
val hdfs = FileSystem .get(new Configuration ())
136
189
val hdfsPath = new Path (tempPath)
137
190
val targetFile = new File (localPath)
138
- val fileWDot = new File (targetFile.getPath.substring(0 , targetFile.getPath.length - targetFile.getName.length) + " ." + targetFile.getName)
191
+ val fileWDot = new File (
192
+ targetFile.getPath.substring(
193
+ 0 ,
194
+ targetFile.getPath.length - targetFile.getName.length
195
+ ) + " ." + targetFile.getName
196
+ )
139
197
logger.warn(s " writing to temp file ${fileWDot.getAbsolutePath}" )
140
198
val mime = format match {
141
- case " csv" => " .csv"
199
+ case " csv" => " .csv"
142
200
case " text" => " .txt"
143
- case _ => throw new Exception (" only text and csv" )
201
+ case _ => throw new Exception (" only text and csv" )
144
202
}
145
203
try {
146
- df.write.mode(SaveMode .Overwrite )
204
+ df.write
205
+ .mode(SaveMode .Overwrite )
147
206
.format(format)
148
207
.options(options)
149
208
.save(tempPath)
@@ -165,12 +224,13 @@ object CSVTool extends LazyLogging {
165
224
outStream.close()
166
225
}
167
226
}
168
- }
169
- finally {
227
+ } finally {
170
228
hdfs.delete(hdfsPath, true )
171
229
logger.warn(s " deleting hdfs path $hdfsPath" )
172
230
fileWDot.renameTo(targetFile)
173
- logger.warn(s " renaming ${fileWDot.getAbsolutePath} to ${targetFile.getAbsolutePath}" )
231
+ logger.warn(
232
+ s " renaming ${fileWDot.getAbsolutePath} to ${targetFile.getAbsolutePath}"
233
+ )
174
234
}
175
235
}
176
236
0 commit comments