Skip to content

Commit 8030593

Browse files
committed
[Kayrock] Add response parser & send describe groups request
With single group now only
1 parent e446074 commit 8030593

File tree

6 files changed

+185
-5
lines changed

6 files changed

+185
-5
lines changed

lib/kafka_ex/new/client.ex

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ defmodule KafkaEx.New.Client do
1414
alias KafkaEx.Config
1515
alias KafkaEx.NetworkClient
1616

17+
alias KafkaEx.New.Client.RequestBuilder
18+
alias KafkaEx.New.Client.ResponseParser
1719
alias KafkaEx.New.Structs.Broker
1820
alias KafkaEx.New.Structs.ClusterMetadata
1921
alias KafkaEx.New.Structs.NodeSelector
@@ -163,6 +165,17 @@ defmodule KafkaEx.New.Client do
163165
{:reply, {:ok, topic_metadata}, updated_state}
164166
end
165167

168+
def handle_call({:describe_groups, [consumer_group_name]}, _from, state) do
169+
if KafkaEx.valid_consumer_group?(consumer_group_name) do
170+
{response, updated_state} =
171+
describe_group_request(consumer_group_name, state)
172+
173+
{:reply, response, updated_state}
174+
else
175+
{:reply, {:error, :invalid_consumer_group}, state}
176+
end
177+
end
178+
166179
def handle_call({:kayrock_request, request, node_selector}, _from, state) do
167180
{response, updated_state} = kayrock_network_request(request, node_selector, state)
168181

@@ -245,6 +258,72 @@ defmodule KafkaEx.New.Client do
245258
end
246259
end
247260

261+
defp describe_group_request(consumer_group_name, state) do
262+
node_selector = NodeSelector.consumer_group(consumer_group_name)
263+
264+
[consumer_group_name]
265+
|> RequestBuilder.describe_groups_request(state)
266+
|> handle_describe_group_request(node_selector, state)
267+
end
268+
269+
defp handle_describe_group_request(
270+
_,
271+
_,
272+
_,
273+
retry_count \\ @retry_count,
274+
_last_error \\ nil
275+
)
276+
277+
defp handle_describe_group_request(_, _, state, 0, last_error) do
278+
{{:error, last_error}, state}
279+
end
280+
281+
defp handle_describe_group_request(
282+
request,
283+
node_selector,
284+
state,
285+
retry_count,
286+
_last_error
287+
) do
288+
case kayrock_network_request(request, node_selector, state) do
289+
{{:ok, response}, state_out} ->
290+
case ResponseParser.describe_groups_response(response) do
291+
{:ok, [consumer_group]} ->
292+
{{:ok, consumer_group}, state_out}
293+
294+
{:error, [error | _]} ->
295+
consumer_group = request.groups[0]
296+
297+
Logger.warn(
298+
"Unable to fetch consumer group metadata for #{consumer_group.group_id}"
299+
)
300+
301+
handle_describe_group_request(
302+
request,
303+
node_selector,
304+
state,
305+
retry_count - 1,
306+
error
307+
)
308+
end
309+
310+
{_, _state_out} ->
311+
consumer_group = request.groups[0]
312+
313+
Logger.warn(
314+
"Unable to fetch consumer group metadata for #{consumer_group.group_id}"
315+
)
316+
317+
handle_describe_group_request(
318+
request,
319+
node_selector,
320+
state,
321+
retry_count - 1,
322+
:unknown
323+
)
324+
end
325+
end
326+
248327
defp maybe_connect_broker(broker, state) do
249328
case Broker.connected?(broker) do
250329
true ->
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
defmodule KafkaEx.New.Client.ResponseParser do
2+
@moduledoc """
3+
This module is used to parse response from KafkaEx.New.Client.
4+
It's main decision point which protocol to use for parsing response
5+
"""
6+
alias KafkaEx.New.Structs.ConsumerGroup
7+
8+
@protocol Application.get_env(
9+
:kafka_ex,
10+
:protocol,
11+
KafkaEx.New.Protocols.KayrockProtocol
12+
)
13+
14+
@doc """
15+
Parses response for Describe Groups API
16+
"""
17+
@spec describe_groups_response(term) ::
18+
{:ok, [ConsumerGroup.t()]} | {:error, term}
19+
def describe_groups_response(response) do
20+
@protocol.parse_response(:describe_groups, response)
21+
end
22+
end

lib/kafka_ex/new/protocols/kayrock_protocol.ex

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,29 @@ defmodule KafkaEx.New.Protocols.KayrockProtocol do
44
Once Kafka Ex v1.0 is released, this module will be renamed to KayrockProtocol
55
and will become a separated package.
66
"""
7+
@behaviour KafkaEx.New.Client.Protocol
78

89
alias KafkaEx.New.Protocols.Kayrock, as: KayrockProtocol
910

11+
# -----------------------------------------------------------------------------
1012
@doc """
1113
Builds request for Describe Groups API
1214
"""
15+
@impl KafkaEx.New.Client.Protocol
1316
def build_request(:describe_groups, api_version, opts) do
1417
group_names = Keyword.fetch!(opts, :group_names)
1518

1619
api_version
1720
|> Kayrock.DescribeGroups.get_request_struct()
1821
|> KayrockProtocol.DescribeGroups.Request.build_request(group_names)
1922
end
23+
24+
# -----------------------------------------------------------------------------
25+
@doc """
26+
Parses response for Describe Groups API
27+
"""
28+
@impl KafkaEx.New.Client.Protocol
29+
def parse_response(:describe_groups, response) do
30+
KayrockProtocol.DescribeGroups.Response.parse_response(response)
31+
end
2032
end
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
defmodule KafkaEx.New.Client.Protocol do
2+
@moduledoc """
3+
This module is responsible for defining the behaviour of a protocol.
4+
"""
5+
# ------------------------------------------------------------------------------
6+
@type api_version :: non_neg_integer
7+
@type params :: Keyword.t()
8+
9+
# ------------------------------------------------------------------------------
10+
@callback build_request(:describe_groups, integer, params) :: term
11+
12+
# ------------------------------------------------------------------------------
13+
@type consumer_group :: KafkaEx.New.Structs.ConsumerGroup
14+
15+
@callback parse_response(:describe_groups, term) ::
16+
{:ok, [consumer_group]} | {:error, term}
17+
end

test/integration/new_client_test.exs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
defmodule KafkaEx.New.Client.Test do
22
use ExUnit.Case
3+
import TestHelper
34

45
alias KafkaEx.New.Client
56

@@ -22,6 +23,39 @@ defmodule KafkaEx.New.Client.Test do
2223
{:ok, %{client: pid}}
2324
end
2425

26+
describe "describe_groups/1" do
27+
setup do
28+
consumer_group = generate_random_string()
29+
topic = "new_client_implementation"
30+
31+
{:ok, %{consumer_group: consumer_group, topic: topic}}
32+
end
33+
34+
test "returns group metadata for single consumer group", %{
35+
consumer_group: consumer_group,
36+
topic: topic,
37+
client: client
38+
} do
39+
join_to_group(client, topic, consumer_group)
40+
41+
{:ok, group_metadata} =
42+
GenServer.call(client, {:describe_groups, [consumer_group]})
43+
44+
assert group_metadata.group_id == consumer_group
45+
assert group_metadata.protocol_type == "consumer"
46+
assert group_metadata.protocol == ""
47+
assert length(group_metadata.members) == 1
48+
end
49+
50+
test "returns dead when consumer group does not exist", %{client: client} do
51+
{:ok, group_metadata} =
52+
GenServer.call(client, {:describe_groups, ["non-existing-group"]})
53+
54+
assert group_metadata.group_id == "non-existing-group"
55+
assert group_metadata.state == "Dead"
56+
end
57+
end
58+
2559
test "update metadata", %{client: client} do
2660
{:ok, updated_metadata} = GenServer.call(client, :update_metadata)
2761
%ClusterMetadata{topics: topics} = updated_metadata
@@ -177,4 +211,16 @@ defmodule KafkaEx.New.Client.Test do
177211

178212
assert Process.alive?(client)
179213
end
214+
215+
# ------------------------------------------------------------------------------------------------
216+
defp join_to_group(client, topic, consumer_group) do
217+
request = %KafkaEx.Protocol.JoinGroup.Request{
218+
group_name: consumer_group,
219+
member_id: "",
220+
topics: [topic],
221+
session_timeout: 6000
222+
}
223+
224+
KafkaEx.join_group(request, worker_name: client, timeout: 10000)
225+
end
180226
end
Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
defmodule KafkaEx.New.Client.RequestBuilderTest do
22
use ExUnit.Case, async: true
3+
34
alias KafkaEx.New.Client.RequestBuilder
45

56
describe "describe_groups_request/2" do
67
test "returns request for DescribeGroups API" do
78
state = %KafkaEx.New.Client.State{api_versions: %{describe_groups: 1}}
89
group_names = ["group1", "group2"]
910

10-
assert RequestBuilder.describe_groups_request(group_names, state) ==
11-
{:ok,
12-
%KafkaEx.New.Protocols.DescribeGroups.Request{
13-
group_names: group_names
14-
}}
11+
expected_request = %Kayrock.DescribeGroups.V1.Request{
12+
group_ids: group_names
13+
}
14+
15+
{:ok, request} =
16+
RequestBuilder.describe_groups_request(group_names, state)
17+
18+
assert expected_request == request
1519
end
1620
end
1721
end

0 commit comments

Comments
 (0)