Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.rdoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ The plugin supports the following parameters:
This parameter is only applicable to structured (JSON) log streams.
Default: ''.

[exception] Optional set of configuration to modify tag of the *matched*
exception event
[remove_tag_prefix] Prefix to remove from the input tag
[remove_tag_suffix] Suffix to remove from the input tag
[add_tag_prefix] Prefix to add to the input tag
[add_tag_suffix] Suffix to add to the input tag

Example configuration:

<match **>
Expand All @@ -103,6 +110,12 @@ Example configuration:
message log
languages java, python
multiline_flush_interval 0.1

# optionally modify tag of the matched exception
<exception>
remove_tag_prefix app
add_tag_suffix crash
</exception>
</match>

== Extending language support
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/exception_detector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ def flush
output_record = @first_record
output_record[@message_field] = combined_message
end
@emit.call(@first_timestamp, output_record)

@emit.call(@first_timestamp, output_record, true)
end
@messages = []
@first_record = nil
Expand Down
43 changes: 37 additions & 6 deletions lib/fluent/plugin/out_detect_exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ class DetectExceptionsOutput < Output
desc 'Separate log streams by this field in the input JSON data.'
config_param :stream, :string, default: ''

config_section :exception, multi: false do
config_param :remove_tag_prefix, :string, default: nil
config_param :add_tag_prefix, :string, default: nil
config_param :remove_tag_suffix, :string, default: nil
config_param :add_tag_suffix, :string, default: nil
end

Fluent::Plugin.register_output('detect_exceptions', self)

def configure(conf)
Expand Down Expand Up @@ -90,18 +97,42 @@ def process_record(tag, time_sec, record)
log_id.push(record.fetch(@stream, '')) unless @stream.empty?
unless @accumulators.key?(log_id)
out_tag = tag.sub(/^#{Regexp.escape(@remove_tag_prefix)}\./, '')
@accumulators[log_id] =
Fluent::TraceAccumulator.new(@message, @languages,
max_lines: @max_lines,
max_bytes: @max_bytes) do |t, r|
router.emit(out_tag, t, r)
end
@accumulators[log_id] = Fluent::TraceAccumulator.new(
@message, @languages,
max_lines: @max_lines,
max_bytes: @max_bytes
) { |t, r, e = false| modify_and_emit(out_tag, t, r, e) }
end

@accumulators[log_id].push(time_sec, record)
end
end

def modify_and_emit(tag, t, r, is_exception)
if is_exception && [email protected]?

if @exception.remove_tag_suffix
s = '.' + @exception.remove_tag_suffix
l = s.length
tag = tag[0...-l] if tag.end_with?(s) && tag.length > l
end

if @exception.remove_tag_prefix
s = @exception.remove_tag_prefix + '.'
l = s.length
tag = tag[l..-1] if tag.start_with?(s) && tag.length > l
end

if @exception.add_tag_prefix
tag.prepend(@exception.add_tag_prefix + '.')
end

tag.concat('.' + @exception.add_tag_suffix) if @exception.add_tag_suffix
end

router.emit(tag, t, r)
end

def flush_buffers
synchronize do
@stop_check = true
Expand Down
22 changes: 22 additions & 0 deletions test/plugin/test_out_detect_exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -284,4 +284,26 @@ def test_separate_streams
make_logs(t, 'something else', stream: 'java')
assert_equal(expected, d.events)
end

def test_execption_tag_modification
cfg = <<-CFG
languages all

<exception>
remove_tag_prefix prefix
remove_tag_suffix suffix
add_tag_suffix new_suffix
add_tag_prefix new_prefix
</exception>
CFG

tag = 'prefix.test.suffix'

d = create_driver(cfg, tag)
run_driver(d, ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT)

expected = [tag, 'new_prefix.test.new_suffix', tag]

d.emits.map.with_index { |e, i| assert_equal(e[0], expected[i]) }
end
end