From 1a1c0f07c814e8de64e0c36a22bd3ff4a0f70695 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E7=BA=A2?= Date: Tue, 24 Jul 2018 19:13:12 +0800 Subject: [PATCH 1/2] parse error return data --- mgr/dataflow.go | 16 +++++++--------- parser/syslog/syslog.go | 38 ++++++++++++++++++++++---------------- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/mgr/dataflow.go b/mgr/dataflow.go index c1df664ca..b2c59f439 100644 --- a/mgr/dataflow.go +++ b/mgr/dataflow.go @@ -130,12 +130,11 @@ func ParseData(parserConfig conf.MapConf) (parsedData []Data, err error) { } parsedData, err = logParser.Parse(sampleData) - err = checkErr(err, logParser.Name()) if err != nil { - return nil, err + return parsedData, checkErr(err) } - return + return parsedData, nil } func TransformData(transformerConfig map[string]interface{}) ([]Data, error) { @@ -365,21 +364,20 @@ func checkSampleData(sampleData []string, logParser parser.Parser) ([]string, er return sampleData, nil } -func checkErr(err error, parserName string) error { +func checkErr(err error) error { se, ok := err.(*StatsError) var errorCnt int64 if ok { errorCnt = se.Errors err = se.ErrorDetail - } else if err != nil { + } else { errorCnt = 1 } + if err != nil { - errMsg := fmt.Sprintf("parser %s, error %v ", parserName, err.Error()) - err = fmt.Errorf("%v parse line errors occured, same as %v", errorCnt, errors.New(errMsg)) + return fmt.Errorf("%v parse line errors occured, error %v ", errorCnt, err.Error()) } - - return err + return nil } func getTransformerCreator(transformerConfig map[string]interface{}) (transforms.Creator, error) { diff --git a/parser/syslog/syslog.go b/parser/syslog/syslog.go index 7804ed15d..48a84722a 100644 --- a/parser/syslog/syslog.go +++ b/parser/syslog/syslog.go @@ -2,7 +2,6 @@ package syslog import ( "bytes" - "errors" "fmt" "strconv" "strings" @@ -150,11 +149,10 @@ func (p *SyslogParser) Parse(lines []string) ([]Data, error) { se.AddErrors() se.ErrorDetail = err se.LastError = err.Error() - if !p.disableRecordErrData { - errData := make(Data) - errData[KeyPandoraStash] = line - datas = append(datas, errData) - } else { + if d != nil { + datas = append(datas, d) + } + if p.disableRecordErrData { se.DatasourceSkipIndex = append(se.DatasourceSkipIndex, idx) } continue @@ -176,24 +174,29 @@ func (p *SyslogParser) parse(line string) (data Data, err error) { if p.buff.Len() > 0 { if line == parser.PandoraParseFlushSignal { return p.Flush() - } else if p.curline >= p.maxline || p.format.IsNewLine([]byte(line)) { + } + + if p.curline >= p.maxline || p.format.IsNewLine([]byte(line)) { data, err = p.Flush() + if err != nil { + return data, err + } } else { p.curline++ } } - var serr error + if line != parser.PandoraParseFlushSignal { - _, serr = p.buff.Write([]byte(line)) - } - if serr != nil { + _, err = p.buff.Write([]byte(line)) if err != nil { - err = errors.New(err.Error() + serr.Error()) - } else { - err = serr + if !p.disableRecordErrData { + data = Data{KeyPandoraStash: string(p.buff.Bytes())} + } + return data, err } } - return + + return data, nil } func (p *SyslogParser) Flush() (data Data, err error) { @@ -206,10 +209,13 @@ func (p *SyslogParser) Flush() (data Data, err error) { if p.curline == p.maxline { err = fmt.Errorf("syslog meet max line %v, try to parse err %v, check if this is standard rfc3164/rfc5424 syslog", p.maxline, err) } + if !p.disableRecordErrData { + data = Data{KeyPandoraStash: string(p.buff.Bytes())} + } } p.curline = 0 p.buff.Reset() - return + return data, err } type RFC6587 struct{} From 15e91864462baccac1b153ee74be107295065a49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E7=BA=A2?= Date: Tue, 24 Jul 2018 20:45:22 +0800 Subject: [PATCH 2/2] fix test error --- mgr/dataflow.go | 18 +----------------- mgr/metric_runner_test.go | 18 ++++++++++++------ parser/syslog/syslog_test.go | 18 ++++++++++++++++++ reader/sql/sql_test.go | 11 +++++++---- utils/models/utils.go | 16 ++++++++++++++++ 5 files changed, 54 insertions(+), 27 deletions(-) diff --git a/mgr/dataflow.go b/mgr/dataflow.go index b2c59f439..648dd9c1e 100644 --- a/mgr/dataflow.go +++ b/mgr/dataflow.go @@ -131,7 +131,7 @@ func ParseData(parserConfig conf.MapConf) (parsedData []Data, err error) { parsedData, err = logParser.Parse(sampleData) if err != nil { - return parsedData, checkErr(err) + return parsedData, CheckErr(err) } return parsedData, nil @@ -364,22 +364,6 @@ func checkSampleData(sampleData []string, logParser parser.Parser) ([]string, er return sampleData, nil } -func checkErr(err error) error { - se, ok := err.(*StatsError) - var errorCnt int64 - if ok { - errorCnt = se.Errors - err = se.ErrorDetail - } else { - errorCnt = 1 - } - - if err != nil { - return fmt.Errorf("%v parse line errors occured, error %v ", errorCnt, err.Error()) - } - return nil -} - func getTransformerCreator(transformerConfig map[string]interface{}) (transforms.Creator, error) { transformKeyType, ok := transformerConfig[transforms.KeyType] if !ok { diff --git a/mgr/metric_runner_test.go b/mgr/metric_runner_test.go index ec7a5cbd3..ef2250843 100644 --- a/mgr/metric_runner_test.go +++ b/mgr/metric_runner_test.go @@ -5,6 +5,7 @@ import ( "io" "log" "net/http" + "net/http/httptest" "os" "path/filepath" "testing" @@ -507,6 +508,11 @@ func metricHttpTest(p *testParam) { t.Fatalf("mkdir test path error %v", err) } time.Sleep(1 * time.Second) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + io.WriteString(w, "七牛云") + })) + defer ts.Close() mc := []MetricConfig{ { MetricType: "http", @@ -515,7 +521,7 @@ func metricHttpTest(p *testParam) { "http_data": false, }, Config: map[string]interface{}{ - "http_datas": `[{"method":"GET", "url":"https://www.qiniu.com", "expect_code":200, "expect_data":"七牛云"}]`, + "http_datas": `[{"method":"GET", "url":"` + ts.URL + `", "expect_code":200, "expect_data":"七牛云"}]`, }, }, } @@ -572,7 +578,7 @@ func metricHttpTest(p *testParam) { assert.Equal(t, len(httpAttr)+2, len(result[0]), string(str)) assert.Equal(t, float64(200), result[0]["http__status_code_1"]) - assert.Equal(t, "https://www.qiniu.com", result[0]["http__target_1"]) + assert.Equal(t, ts.URL, result[0]["http__target_1"]) assert.Equal(t, float64(1), result[0]["http__err_state_total"]) assert.Equal(t, "", result[0]["http__err_msg_total"]) assert.Equal(t, float64(1), result[0]["http__err_state_total"]) @@ -587,7 +593,7 @@ func metricHttpTest(p *testParam) { "http_time_cost": false, }, Config: map[string]interface{}{ - "http_datas": `[{"method":"GET", "url":"https://www.qiniu.com", "expect_code":200, "expect_data":"七牛云"},{"method":"GET", "url":"https://www.logkit-pandora.com", "expect_code":200, "expect_data":"七牛云"}]`, + "http_datas": `[{"method":"GET", "url":"` + ts.URL + `", "expect_code":200, "expect_data":"七牛云"},{"method":"GET", "url":"https://www.logkit-pandora.com", "expect_code":200, "expect_data":"七牛云"}]`, }, }, } @@ -642,7 +648,7 @@ func metricHttpTest(p *testParam) { } assert.Equal(t, float64(200), result[0]["http__status_code_1"]) - assert.Equal(t, "https://www.qiniu.com", result[0]["http__target_1"]) + assert.Equal(t, ts.URL, result[0]["http__target_1"]) assert.Equal(t, float64(1), result[0]["http__err_state_1"]) assert.Equal(t, float64(-1), result[0]["http__status_code_2"]) assert.Equal(t, "https://www.logkit-pandora.com", result[0]["http__target_2"]) @@ -660,7 +666,7 @@ func metricHttpTest(p *testParam) { "http_err_msg": false, }, Config: map[string]interface{}{ - "http_datas": `[{"method":"GET", "url":"https://www.qiniu.com", "expect_code":200, "expect_data":"潘多拉"}]`, + "http_datas": `[{"method":"GET", "url":"` + ts.URL + `", "expect_code":200, "expect_data":"潘多拉"}]`, }, }, } @@ -715,7 +721,7 @@ func metricHttpTest(p *testParam) { } assert.Equal(t, float64(200), result[0]["http__status_code_1"]) - assert.Equal(t, "https://www.qiniu.com", result[0]["http__target_1"]) + assert.Equal(t, ts.URL, result[0]["http__target_1"]) assert.Equal(t, float64(0), result[0]["http__err_state_1"]) assert.Equal(t, float64(0), result[0]["http__err_state_total"]) assert.Equal(t, "don't contain: 潘多拉", result[0]["http__err_msg_total"]) diff --git a/parser/syslog/syslog_test.go b/parser/syslog/syslog_test.go index cc7f0fe7d..8c7b01cd0 100644 --- a/parser/syslog/syslog_test.go +++ b/parser/syslog/syslog_test.go @@ -88,6 +88,24 @@ func Test_SyslogParser(t *testing.T) { } } +func Test_SyslogParserError(t *testing.T) { + c := conf.MapConf{} + c[parser.KeyParserType] = "syslog" + p, err := NewParser(c) + line := "Test my syslog CRON[000]: (root) CMD" + lines := []string{ + line, + "!@#pandora-EOF-line#@!", + } + parsedData, err := p.Parse(lines) + st, ok := err.(*StatsError) + assert.True(t, ok) + assert.Equal(t, "No start char found for priority", st.LastError, st.LastError) + assert.Equal(t, int64(1), st.Errors) + assert.Equal(t, 1, len(parsedData)) + assert.Equal(t, line, parsedData[0]["pandora_stash"]) +} + func TestSyslogParser5424(t *testing.T) { fpas := &RFC5424{} pas := fpas.GetParser([]byte("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47\n- BOM'su root' failed for lonvick on /dev/pts/8")) diff --git a/reader/sql/sql_test.go b/reader/sql/sql_test.go index 7697496c3..7b29e4979 100644 --- a/reader/sql/sql_test.go +++ b/reader/sql/sql_test.go @@ -1498,7 +1498,7 @@ func TestMySql(t *testing.T) { mrHistoryAll2.Close() // test cron - minDataTestsLine, secondAdd3, err := setMinute() + minDataTestsLine, secondAdd3, err := setMinute(time.Now()) if err != nil { t.Errorf("prepare mysql database failed: %v", err) } @@ -1529,10 +1529,14 @@ func TestMySql(t *testing.T) { mrCron.SyncMeta() mrCron.Close() - minDataTestsLine, secondAdd3, err = setMinute() + now := time.Now() + minDataTestsLine, secondAdd3, err = setMinute(now) if err != nil { t.Errorf("prepare mysql database failed: %v", err) } + if now.Second() >= 57 { + minDataTestsLine++ + } // cron task, exec on start runnerName = "mrCronExecOnStart" mrCronExecOnStart, err := getMySqlReader(false, false, false, runnerName, CronInfo{true, secondAdd3, false}) @@ -1748,9 +1752,8 @@ func batchTimeout(before time.Time, interval float64) bool { return false } -func setMinute() (int, string, error) { +func setMinute(nowCron time.Time) (int, string, error) { var ( - nowCron = time.Now() secondAdd3 = getDateStr((nowCron.Second() + 3) % 60) minute = getDateStr(nowCron.Minute()) ) diff --git a/utils/models/utils.go b/utils/models/utils.go index 793a88743..094e6ec5f 100644 --- a/utils/models/utils.go +++ b/utils/models/utils.go @@ -864,6 +864,22 @@ func DeepConvertKeyWithCache(data map[string]interface{}, cache map[string]KeyIn return data } +func CheckErr(err error) error { + se, ok := err.(*StatsError) + var errorCnt int64 + if ok { + errorCnt = se.Errors + err = se.ErrorDetail + } else { + errorCnt = 1 + } + + if err != nil { + return fmt.Errorf("%v parse line errors occured, error %v ", errorCnt, err.Error()) + } + return nil +} + type KeyInfo struct { Valid bool NewKey string