Skip to content

Multiple Consumer Threads Share a Single Instance of the @codec #210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
original-brownbear opened this issue Jul 13, 2017 · 1 comment
Closed
Assignees
Labels

Comments

@original-brownbear
Copy link
Contributor

The @codec is not threadsafe and we're one instance of it across threads here:

private
  def thread_runner(logstash_queue, consumer)
    Thread.new do
      begin
        unless @topics_pattern.nil?
          nooplistener = org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener.new
          pattern = java.util.regex.Pattern.compile(@topics_pattern)
          consumer.subscribe(pattern, nooplistener)
        else
          consumer.subscribe(topics);
        end
        while !stop?
          records = consumer.poll(poll_timeout_ms)
          for record in records do
            @codec.decode(record.value.to_s) do |event|

we must clone the @codec once per thread like we do in tcp input for example to avoid concurrency issues.

@original-brownbear original-brownbear self-assigned this Jul 13, 2017
original-brownbear added a commit to original-brownbear/logstash-input-kafka that referenced this issue Jul 13, 2017
original-brownbear added a commit to original-brownbear/logstash-input-kafka that referenced this issue Jul 13, 2017
@original-brownbear
Copy link
Contributor Author

fixed in #211

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
1 participant