Skip to content

syslog 出错时没有返回错误数据,大部分出错时候pandora_stash记录的数据不准确,应该为上一条数 #642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 2 additions & 20 deletions mgr/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,11 @@ func ParseData(parserConfig conf.MapConf) (parsedData []Data, err error) {
}

parsedData, err = logParser.Parse(sampleData)
err = checkErr(err, logParser.Name())
if err != nil {
return nil, err
return parsedData, CheckErr(err)
}

return
return parsedData, nil
}

func TransformData(transformerConfig map[string]interface{}) ([]Data, error) {
Expand Down Expand Up @@ -365,23 +364,6 @@ func checkSampleData(sampleData []string, logParser parser.Parser) ([]string, er
return sampleData, nil
}

func checkErr(err error, parserName string) error {
se, ok := err.(*StatsError)
var errorCnt int64
if ok {
errorCnt = se.Errors
err = se.ErrorDetail
} else if err != nil {
errorCnt = 1
}
if err != nil {
errMsg := fmt.Sprintf("parser %s, error %v ", parserName, err.Error())
err = fmt.Errorf("%v parse line errors occured, same as %v", errorCnt, errors.New(errMsg))
}

return err
}

func getTransformerCreator(transformerConfig map[string]interface{}) (transforms.Creator, error) {
transformKeyType, ok := transformerConfig[transforms.KeyType]
if !ok {
Expand Down
18 changes: 12 additions & 6 deletions mgr/metric_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"log"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -507,6 +508,11 @@ func metricHttpTest(p *testParam) {
t.Fatalf("mkdir test path error %v", err)
}
time.Sleep(1 * time.Second)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
io.WriteString(w, "<html><body>七牛云</body></html>")
}))
defer ts.Close()
mc := []MetricConfig{
{
MetricType: "http",
Expand All @@ -515,7 +521,7 @@ func metricHttpTest(p *testParam) {
"http_data": false,
},
Config: map[string]interface{}{
"http_datas": `[{"method":"GET", "url":"https://www.qiniu.com", "expect_code":200, "expect_data":"七牛云"}]`,
"http_datas": `[{"method":"GET", "url":"` + ts.URL + `", "expect_code":200, "expect_data":"七牛云"}]`,
},
},
}
Expand Down Expand Up @@ -572,7 +578,7 @@ func metricHttpTest(p *testParam) {

assert.Equal(t, len(httpAttr)+2, len(result[0]), string(str))
assert.Equal(t, float64(200), result[0]["http__status_code_1"])
assert.Equal(t, "https://www.qiniu.com", result[0]["http__target_1"])
assert.Equal(t, ts.URL, result[0]["http__target_1"])
assert.Equal(t, float64(1), result[0]["http__err_state_total"])
assert.Equal(t, "", result[0]["http__err_msg_total"])
assert.Equal(t, float64(1), result[0]["http__err_state_total"])
Expand All @@ -587,7 +593,7 @@ func metricHttpTest(p *testParam) {
"http_time_cost": false,
},
Config: map[string]interface{}{
"http_datas": `[{"method":"GET", "url":"https://www.qiniu.com", "expect_code":200, "expect_data":"七牛云"},{"method":"GET", "url":"https://www.logkit-pandora.com", "expect_code":200, "expect_data":"七牛云"}]`,
"http_datas": `[{"method":"GET", "url":"` + ts.URL + `", "expect_code":200, "expect_data":"七牛云"},{"method":"GET", "url":"https://www.logkit-pandora.com", "expect_code":200, "expect_data":"七牛云"}]`,
},
},
}
Expand Down Expand Up @@ -642,7 +648,7 @@ func metricHttpTest(p *testParam) {
}

assert.Equal(t, float64(200), result[0]["http__status_code_1"])
assert.Equal(t, "https://www.qiniu.com", result[0]["http__target_1"])
assert.Equal(t, ts.URL, result[0]["http__target_1"])
assert.Equal(t, float64(1), result[0]["http__err_state_1"])
assert.Equal(t, float64(-1), result[0]["http__status_code_2"])
assert.Equal(t, "https://www.logkit-pandora.com", result[0]["http__target_2"])
Expand All @@ -660,7 +666,7 @@ func metricHttpTest(p *testParam) {
"http_err_msg": false,
},
Config: map[string]interface{}{
"http_datas": `[{"method":"GET", "url":"https://www.qiniu.com", "expect_code":200, "expect_data":"潘多拉"}]`,
"http_datas": `[{"method":"GET", "url":"` + ts.URL + `", "expect_code":200, "expect_data":"潘多拉"}]`,
},
},
}
Expand Down Expand Up @@ -715,7 +721,7 @@ func metricHttpTest(p *testParam) {
}

assert.Equal(t, float64(200), result[0]["http__status_code_1"])
assert.Equal(t, "https://www.qiniu.com", result[0]["http__target_1"])
assert.Equal(t, ts.URL, result[0]["http__target_1"])
assert.Equal(t, float64(0), result[0]["http__err_state_1"])
assert.Equal(t, float64(0), result[0]["http__err_state_total"])
assert.Equal(t, "don't contain: 潘多拉", result[0]["http__err_msg_total"])
Expand Down
38 changes: 22 additions & 16 deletions parser/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package syslog

import (
"bytes"
"errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -150,11 +149,10 @@ func (p *SyslogParser) Parse(lines []string) ([]Data, error) {
se.AddErrors()
se.ErrorDetail = err
se.LastError = err.Error()
if !p.disableRecordErrData {
errData := make(Data)
errData[KeyPandoraStash] = line
datas = append(datas, errData)
} else {
if d != nil {
datas = append(datas, d)
}
if p.disableRecordErrData {
se.DatasourceSkipIndex = append(se.DatasourceSkipIndex, idx)
}
continue
Expand All @@ -176,24 +174,29 @@ func (p *SyslogParser) parse(line string) (data Data, err error) {
if p.buff.Len() > 0 {
if line == parser.PandoraParseFlushSignal {
return p.Flush()
} else if p.curline >= p.maxline || p.format.IsNewLine([]byte(line)) {
}

if p.curline >= p.maxline || p.format.IsNewLine([]byte(line)) {
data, err = p.Flush()
if err != nil {
return data, err
}
} else {
p.curline++
}
}
var serr error

if line != parser.PandoraParseFlushSignal {
_, serr = p.buff.Write([]byte(line))
}
if serr != nil {
_, err = p.buff.Write([]byte(line))
if err != nil {
err = errors.New(err.Error() + serr.Error())
} else {
err = serr
if !p.disableRecordErrData {
data = Data{KeyPandoraStash: string(p.buff.Bytes())}
}
return data, err
}
}
return

return data, nil
}

func (p *SyslogParser) Flush() (data Data, err error) {
Expand All @@ -206,10 +209,13 @@ func (p *SyslogParser) Flush() (data Data, err error) {
if p.curline == p.maxline {
err = fmt.Errorf("syslog meet max line %v, try to parse err %v, check if this is standard rfc3164/rfc5424 syslog", p.maxline, err)
}
if !p.disableRecordErrData {
data = Data{KeyPandoraStash: string(p.buff.Bytes())}
}
}
p.curline = 0
p.buff.Reset()
return
return data, err
}

type RFC6587 struct{}
Expand Down
18 changes: 18 additions & 0 deletions parser/syslog/syslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,24 @@ func Test_SyslogParser(t *testing.T) {
}
}

func Test_SyslogParserError(t *testing.T) {
c := conf.MapConf{}
c[parser.KeyParserType] = "syslog"
p, err := NewParser(c)
line := "Test my syslog CRON[000]: (root) CMD"
lines := []string{
line,
"!@#pandora-EOF-line#@!",
}
parsedData, err := p.Parse(lines)
st, ok := err.(*StatsError)
assert.True(t, ok)
assert.Equal(t, "No start char found for priority", st.LastError, st.LastError)
assert.Equal(t, int64(1), st.Errors)
assert.Equal(t, 1, len(parsedData))
assert.Equal(t, line, parsedData[0]["pandora_stash"])
}

func TestSyslogParser5424(t *testing.T) {
fpas := &RFC5424{}
pas := fpas.GetParser([]byte("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47\n- BOM'su root' failed for lonvick on /dev/pts/8"))
Expand Down
11 changes: 7 additions & 4 deletions reader/sql/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,7 @@ func TestMySql(t *testing.T) {
mrHistoryAll2.Close()

// test cron
minDataTestsLine, secondAdd3, err := setMinute()
minDataTestsLine, secondAdd3, err := setMinute(time.Now())
if err != nil {
t.Errorf("prepare mysql database failed: %v", err)
}
Expand Down Expand Up @@ -1529,10 +1529,14 @@ func TestMySql(t *testing.T) {
mrCron.SyncMeta()
mrCron.Close()

minDataTestsLine, secondAdd3, err = setMinute()
now := time.Now()
minDataTestsLine, secondAdd3, err = setMinute(now)
if err != nil {
t.Errorf("prepare mysql database failed: %v", err)
}
if now.Second() >= 57 {
minDataTestsLine++
}
// cron task, exec on start
runnerName = "mrCronExecOnStart"
mrCronExecOnStart, err := getMySqlReader(false, false, false, runnerName, CronInfo{true, secondAdd3, false})
Expand Down Expand Up @@ -1748,9 +1752,8 @@ func batchTimeout(before time.Time, interval float64) bool {
return false
}

func setMinute() (int, string, error) {
func setMinute(nowCron time.Time) (int, string, error) {
var (
nowCron = time.Now()
secondAdd3 = getDateStr((nowCron.Second() + 3) % 60)
minute = getDateStr(nowCron.Minute())
)
Expand Down
16 changes: 16 additions & 0 deletions utils/models/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,22 @@ func DeepConvertKeyWithCache(data map[string]interface{}, cache map[string]KeyIn
return data
}

func CheckErr(err error) error {
se, ok := err.(*StatsError)
var errorCnt int64
if ok {
errorCnt = se.Errors
err = se.ErrorDetail
} else {
errorCnt = 1
}

if err != nil {
return fmt.Errorf("%v parse line errors occured, error %v ", errorCnt, err.Error())
}
return nil
}

type KeyInfo struct {
Valid bool
NewKey string
Expand Down