Skip to content

Commit 9885e6f

Browse files
committed
Low Level Design repository
1 parent a01f44a commit 9885e6f

26 files changed

+760
-0
lines changed

.gitignore

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
### IntelliJ IDEA ###
2+
out/
3+
!**/src/main/**/out/
4+
!**/src/test/**/out/
5+
6+
### Eclipse ###
7+
.apt_generated
8+
.classpath
9+
.factorypath
10+
.project
11+
.settings
12+
.springBeans
13+
.sts4-cache
14+
bin/
15+
!**/src/main/**/bin/
16+
!**/src/test/**/bin/
17+
18+
### NetBeans ###
19+
/nbproject/private/
20+
/nbbuild/
21+
/dist/
22+
/nbdist/
23+
/.nb-gradle/
24+
25+
### VS Code ###
26+
.vscode/
27+
28+
### Mac OS ###
29+
.DS_Store

.idea/.gitignore

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/misc.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/modules.xml

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/vcs.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

LowLevelDesign.iml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<module type="JAVA_MODULE" version="4">
3+
<component name="NewModuleRootManager" inherit-compiler-output="true">
4+
<exclude-output />
5+
<content url="file://$MODULE_DIR$">
6+
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
7+
</content>
8+
<orderEntry type="inheritedJdk" />
9+
<orderEntry type="sourceFolder" forTests="false" />
10+
</component>
11+
</module>

src/lld/kafka/MainApplication.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package lld.kafka;
2+
3+
import lld.kafka.model.Consumer;
4+
import lld.kafka.model.ConsumerGroup;
5+
import lld.kafka.model.Producer;
6+
import lld.kafka.model.Topic;
7+
import lld.kafka.service.ConsumerService;
8+
import lld.kafka.service.TopicService;
9+
import lld.kafka.service.impl.ConsumerServiceImpl;
10+
import lld.kafka.service.impl.TopicServiceImpl;
11+
import lld.kafka.worker.ConsumerWorker;
12+
import lld.kafka.worker.ProducerWorker;
13+
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
16+
17+
public class MainApplication {
18+
19+
private static void test1() {
20+
TopicService topicService = new TopicServiceImpl();
21+
Topic topic = topicService.createTopic("my-topic", 3);
22+
ExecutorService executorService = Executors.newFixedThreadPool(10);
23+
24+
Producer producer = new Producer();
25+
executorService.execute(new ProducerWorker(topic, producer, topicService));
26+
27+
ConsumerGroup consumerGroup = new ConsumerGroup();
28+
ConsumerService consumerService = new ConsumerServiceImpl();
29+
30+
Consumer consumer1 = new Consumer(consumerGroup);
31+
consumerService.subscribe(topic, consumer1);
32+
executorService.execute(new ConsumerWorker(consumer1, consumerService));
33+
34+
Consumer consumer2 = new Consumer(consumerGroup);
35+
consumerService.subscribe(topic, consumer2);
36+
executorService.execute(new ConsumerWorker(consumer2, consumerService));
37+
38+
Consumer consumer3 = new Consumer(consumerGroup);
39+
consumerService.subscribe(topic, consumer3);
40+
executorService.execute(new ConsumerWorker(consumer3, consumerService));
41+
42+
// Consumer 4 should remain idle as we have only 3 partitions in the topic
43+
Consumer consumer4 = new Consumer(consumerGroup);
44+
consumerService.subscribe(topic, consumer4);
45+
executorService.execute(new ConsumerWorker(consumer4, consumerService));
46+
47+
executorService.shutdown();
48+
}
49+
50+
public static void main(String[] args) {
51+
test1();
52+
}
53+
}

src/lld/kafka/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<b> Design a message broker which has the following properties: </b>
2+
3+
1. Producers should be able to produce messages.
4+
2. Producers can be divided into multiple partitions.
5+
3. There can be multiple consumers in a consumer group.
6+
4. A single consumer can consume from multiple topics.
7+
5. In a consumer group, at a time, a consumer can consume from a single partition. In a consumer group, a
8+
6. single consumer always maps to a single partition.
9+
7. If possible, messages should be persisted.

src/lld/kafka/model/Consumer.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package lld.kafka.model;
2+
3+
import java.util.UUID;
4+
5+
public class Consumer {
6+
String id;
7+
ConsumerGroup consumerGroup;
8+
9+
public Consumer(ConsumerGroup consumerGroup) {
10+
this.id = UUID.randomUUID().toString();
11+
this.consumerGroup = consumerGroup;
12+
this.consumerGroup.addConsumer(this);
13+
}
14+
15+
public ConsumerGroup getConsumerGroup() {
16+
return this.consumerGroup;
17+
}
18+
19+
@Override
20+
public int hashCode() {
21+
return id.hashCode();
22+
}
23+
24+
@Override
25+
public boolean equals(Object o) {
26+
if (o == null || this.getClass() != o.getClass()) {
27+
return false;
28+
}
29+
30+
if (o == this) {
31+
return true;
32+
}
33+
34+
Consumer other = (Consumer) o;
35+
return this.id.equals(other.id);
36+
}
37+
38+
@Override
39+
public String toString() {
40+
return this.id;
41+
}
42+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package lld.kafka.model;
2+
3+
import java.util.*;
4+
import java.util.concurrent.locks.Lock;
5+
import java.util.concurrent.locks.ReentrantLock;
6+
7+
public class ConsumerGroup {
8+
private String groupId;
9+
private Set<Consumer> consumers;
10+
private Map<Partition, Integer> currentOffset;
11+
private Map<Partition, Integer> committedOffset; // Can be used to contain the committed offsets just for crash recovery purpose
12+
private Map<Consumer, Partition> consumerPartitionMap;
13+
14+
private static ConsumerGroup defaultInstance;
15+
16+
private Lock lock;
17+
18+
public static ConsumerGroup getDefaultInstance() {
19+
if (defaultInstance == null) {
20+
defaultInstance = new ConsumerGroup();
21+
defaultInstance.groupId = "default-consumer-group";
22+
}
23+
24+
return defaultInstance;
25+
}
26+
27+
public ConsumerGroup() {
28+
this.groupId = UUID.randomUUID().toString();
29+
this.consumers = new HashSet<>();
30+
this.currentOffset = new HashMap<>();
31+
this.committedOffset = new HashMap<>();
32+
this.consumerPartitionMap = new HashMap<>();
33+
this.lock = new ReentrantLock();
34+
}
35+
36+
public void addConsumer(Consumer consumer) {
37+
Objects.requireNonNull(consumer);
38+
this.consumers.add(consumer); // Irrespective of partition assignment, add the consumer to the group
39+
}
40+
41+
public int getConsumerCount() {
42+
return this.consumers.size();
43+
}
44+
45+
public boolean isPartitionAllocated(Partition partition) {
46+
return new HashSet<>(this.consumerPartitionMap.values()).contains(partition);
47+
}
48+
49+
public Partition allocatePartition(Partition partition, Consumer consumer) {
50+
this.currentOffset.put(partition, -1);
51+
this.consumerPartitionMap.put(consumer, partition);
52+
return partition;
53+
}
54+
55+
public Message getMessageForConsumer(Consumer consumer) {
56+
Message message = null;
57+
Partition partition = this.consumerPartitionMap.get(consumer);
58+
if (partition == null) {
59+
return message;
60+
}
61+
62+
try {
63+
lock.lock(); // Check if we need this lock really as in our design, a single consumer is always
64+
// mapped to a single partition. So, there is no chance of overriding partition data by multiple threads
65+
66+
int lastOffset = this.currentOffset.get(partition);
67+
int nextOffset = lastOffset + 1;
68+
69+
message = partition.getMessageAtOffset(nextOffset);
70+
if (message == null) {
71+
// no more message
72+
return null;
73+
}
74+
75+
this.currentOffset.put(partition, nextOffset);
76+
} finally {
77+
lock.unlock();
78+
}
79+
80+
return message;
81+
}
82+
83+
@Override
84+
public int hashCode() {
85+
return groupId.hashCode();
86+
}
87+
88+
@Override
89+
public boolean equals(Object o) {
90+
if (o == null || this.getClass() != o.getClass()) {
91+
return false;
92+
}
93+
94+
if (o == this) {
95+
return true;
96+
}
97+
98+
ConsumerGroup other = (ConsumerGroup) o;
99+
return this.groupId.equals(other.groupId);
100+
}
101+
}

0 commit comments

Comments
 (0)