帮助文档
搜索文档…
导出数据处理建议

数据格式

导出数据皆为csv格式:
    分隔符: ,
    包围符:"
    转义符:\

处理建议

Spark处理
Scala/Java程序处理示例
Python处理建议示例
建议下载数据后,将下载的压缩文件放于hdfs的以日期建立目录结构,同一小时或者同一天的数据放在同一目录下,然后通过spark streaming的fileStream接口监控根目录,读取变动的文件内容。
1
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Copied!
在依赖中添加:
1
groupId: com.databricksartifactId: spark-csv_2.10version: 1.4.0
Copied!
具体数据操作参考spark-csv(https://github.com/databricks/spark-csv)
推荐使用 org.apache.commons.csv 来处理下载到本地的数据文件,如下面的例子:
1
import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
2
import org.apache.commons.csv.{CSVFormat, CSVParser, QuoteMode}
3
import scala.collection.JavaConverters._
4
5
object Test extends App {
6
val file = new File("xxx")
7
val br = new BufferedReader(new InputStreamReader(new FileInputStream(file)))
8
val csvFileFormat = CSVFormat.DEFAULT.withEscape('\\').withQuote('"')
9
val csvParser = new CSVParser(br, csvFileFormat)
10
val records = csvParser.getRecords
11
12
for (record <- records.asScala) {
13
val sb = new StringBuilder()
14
val length = record.size()
15
(0 until length).foreach(i => {
16
sb.append(record.get(i))
17
sb.append(",")
18
})
19
println(sb.toString)
20
}
21
}
Copied!
1
import csv
2
3
with open(FILE_NAME, "rb") as f:
4
reader = csv.reader(f, quotechar='"', escapechar='\\')
5
for line in reader:
6
print(line)
Copied!

导入到数据仓库示例

导入到Hive
利用Spark导入到其他数据仓库
    1.
    使用Hive新建外部表,如下图所示。
    2.
    使用hadoop fs -put /xx.csv /tmp/test_export/ 将csv放到外部表目录下。
1
CREATE EXTERNAL TABLE TEST_EXPORT
2
(
3
sessionId STRING,
4
time BIGINT,
5
sendTime BIGINT,
6
pageTime BIGINT,
7
domain STRING,
8
page STRING,
9
queryParameters STRING,
10
eventName STRING,
11
eventNumber DOUBLE,
12
eventVariable map<string, string>,
13
loginUserId STRING
14
)
15
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
16
STORED AS TEXTFILE
17
location '/tmp/test_export'
18
tblproperties ("skip.header.line.count"="1", "quote.delim"="\"", "escape.delim"="\\")
Copied!
    1.
    获取DataFrame,代码如下。
    2.
    使用databricks提供的函数Load到Hive数据库(这里省去spark和hive的配置),如: df.select("sessionId", "time", "sendTime", "pageTime", "domain", "page", "queryParameters", "eventName", "eventNumber", "eventVariable", "loginUserId").write.mode("append").insertInto("TEXT_EXPORT")
1
val df = spark.read
2
.option("header","true")
3
.option("escape", "\\")
4
.option("quote", "\"")
5
.csv("filePath")
Copied!

使用Content-Length进行文件完整性校验

生效日期 : 2020/10/29 21:00:00 开始

用户如果对文件完整性有担心,可以对原始数据导出 API第三步下载时response的headers中的value值Content-Length和下载文件的大小进行校验。若校验未通过,可重启第三步,轮询获取,若校验通过,可以解压缩,如果解压出现异常,可重启第三步,轮询获取。
最近更新 11mo ago