Skip to content

Commit 47f6013

Browse files
Zikenkeon
authored andcommitted
Add huffman coding (encode and decode file), add tests for huffman coding (keon#500)
1 parent 4d65694 commit 47f6013

File tree

4 files changed

+368
-0
lines changed

4 files changed

+368
-0
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ If you want to uninstall algorithms, it is as simple as:
113113
- [binary_gap](algorithms/bit/binary_gap.py)
114114
- [calculator](algorithms/calculator)
115115
- [math_parser](algorithms/calculator/math_parser.py)
116+
- [compression](algorithms/compression)
117+
- [huffman_coding](algorithms/compression/huffman_coding.py)
116118
- [dfs](algorithms/dfs)
117119
- [all_factors](algorithms/dfs/all_factors.py)
118120
- [count_islands](algorithms/dfs/count_islands.py)

algorithms/compression/__init__.py

Whitespace-only changes.
Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
"""
2+
Huffman coding is an efficient method of compressing data without losing information.
3+
This algorithm analyzes the symbols that appear in a message.
4+
Symbols that appear more often will be encoded as a shorter-bit string
5+
while symbols that aren't used as much will be encoded as longer strings.
6+
"""
7+
8+
from collections import defaultdict, deque
9+
import heapq
10+
11+
12+
class Node:
13+
def __init__(self, frequency=0, sign=None, left=None, right=None):
14+
self.frequency = frequency
15+
self.sign = sign
16+
self.left = left
17+
self.right = right
18+
19+
def __lt__(self, other):
20+
return self.frequency < other.frequency
21+
22+
def __gt__(self, other):
23+
return self.frequency > other.frequency
24+
25+
def __eq__(self, other):
26+
return self.frequency == other.frequency
27+
28+
def __str__(self):
29+
return "<ch: {0}: {1}>".format(self.sign, self.frequency)
30+
31+
def __repr__(self):
32+
return "<ch: {0}: {1}>".format(self.sign, self.frequency)
33+
34+
35+
class HuffmanReader:
36+
def __init__(self, file):
37+
self.file = file
38+
self.buffer = []
39+
self.is_last_byte = False
40+
41+
def get_number_of_additional_bits_in_the_last_byte(self) -> int:
42+
bin_num = self.get_bit() + self.get_bit() + self.get_bit()
43+
return int(bin_num, 2)
44+
45+
def load_tree(self) -> Node:
46+
"""
47+
Load tree from file
48+
49+
:return:
50+
"""
51+
node_stack = deque()
52+
queue_leaves = deque()
53+
root = Node()
54+
55+
current_node = root
56+
is_end_of_tree = False
57+
while not is_end_of_tree:
58+
current_bit = self.get_bit()
59+
if current_bit == "0":
60+
current_node.left = Node()
61+
current_node.right = Node()
62+
node_stack.append(current_node.right) # going to left node, right push on stack
63+
current_node = current_node.left
64+
else:
65+
queue_leaves.append(current_node)
66+
if node_stack:
67+
current_node = node_stack.pop()
68+
else:
69+
is_end_of_tree = True
70+
71+
self._fill_tree(queue_leaves)
72+
73+
return root
74+
75+
def _fill_tree(self, leaves_queue):
76+
"""
77+
Load values to tree after reading tree
78+
:param leaves_queue:
79+
:return:
80+
"""
81+
leaves_queue.reverse()
82+
while leaves_queue:
83+
node = leaves_queue.pop()
84+
s = int(self.get_byte(), 2)
85+
node.sign = s
86+
87+
def _load_byte(self, buff_limit=8) -> bool:
88+
"""
89+
Load next byte is buffer is less than buff_limit
90+
:param buff_limit:
91+
:return: True if there is enough bits in buffer to read
92+
"""
93+
if len(self.buffer) <= buff_limit:
94+
byte = self.file.read(1)
95+
96+
if not byte:
97+
return False
98+
99+
i = int.from_bytes(byte, "big")
100+
self.buffer.extend(list("{0:08b}".format(i)))
101+
102+
return True
103+
104+
def get_bit(self, buff_limit=8):
105+
if self._load_byte(buff_limit):
106+
bit = self.buffer.pop(0)
107+
return bit
108+
else:
109+
return -1
110+
111+
def get_byte(self):
112+
if self._load_byte():
113+
byte_list = self.buffer[:8]
114+
self.buffer = self.buffer[8:]
115+
116+
return "".join(byte_list)
117+
else:
118+
return -1
119+
120+
121+
class HuffmanWriter:
122+
def __init__(self, file):
123+
self.file = file
124+
self.buffer = ""
125+
self.saved_bits = 0
126+
127+
def write_char(self, char):
128+
self.write_int(ord(char))
129+
130+
def write_int(self, num):
131+
bin_int = "{0:08b}".format(num)
132+
self.write_bits(bin_int)
133+
134+
def write_bits(self, bits):
135+
self.saved_bits += len(bits)
136+
137+
self.buffer += bits
138+
139+
while len(self.buffer) >= 8:
140+
i = int(self.buffer[:8], 2)
141+
self.file.write(bytes([i]))
142+
self.buffer = self.buffer[8:]
143+
144+
def save_tree(self, tree):
145+
"""
146+
Generate and save tree code to file
147+
:param tree:
148+
:return:
149+
"""
150+
signs = []
151+
tree_code = ""
152+
153+
def get_code_tree(T):
154+
nonlocal tree_code
155+
if T.sign is not None:
156+
signs.append(T.sign)
157+
if T.left:
158+
tree_code += "0"
159+
get_code_tree(T.left)
160+
if T.right:
161+
tree_code += "1"
162+
get_code_tree(T.right)
163+
164+
get_code_tree(tree)
165+
self.write_bits(tree_code + "1") # "1" indicates that tree ended (it will be needed to load the tree)
166+
for int_sign in signs:
167+
self.write_int(int_sign)
168+
169+
def _save_information_about_additional_bits(self, additional_bits: int):
170+
"""
171+
Overwrite first three bits in the file
172+
:param additional_bits: number of bits that were appended to fill last byte
173+
:return:
174+
"""
175+
self.file.seek(0)
176+
first_byte_raw = self.file.read(1)
177+
self.file.seek(0)
178+
first_byte = "{0:08b}".format(int.from_bytes(first_byte_raw, "big"))
179+
# overwrite first three bits
180+
first_byte = first_byte[3:]
181+
first_byte = "{0:03b}".format(additional_bits) + first_byte
182+
183+
self.write_bits(first_byte)
184+
185+
def close(self):
186+
additional_bits = 8 - len(self.buffer)
187+
if additional_bits != 8: # buffer is empty, no need to append extra "0"
188+
self.write_bits("0" * additional_bits)
189+
self._save_information_about_additional_bits(additional_bits)
190+
191+
192+
class TreeFinder:
193+
"""
194+
Class to help find signs in tree
195+
"""
196+
197+
def __init__(self, tree):
198+
self.root = tree
199+
self.current_node = tree
200+
self.found = None
201+
202+
def find(self, bit):
203+
"""
204+
Find sign in tree
205+
:param bit:
206+
:return: True if sign is found
207+
"""
208+
if bit == "0":
209+
self.current_node = self.current_node.left
210+
elif bit == "1":
211+
self.current_node = self.current_node.right
212+
else:
213+
self._reset()
214+
return True
215+
216+
if self.current_node.sign is not None:
217+
self._reset(self.current_node.sign)
218+
return True
219+
else:
220+
return False
221+
222+
def _reset(self, found=""):
223+
self.found = found
224+
self.current_node = self.root
225+
226+
227+
class HuffmanCoding:
228+
def __init__(self):
229+
pass
230+
231+
@staticmethod
232+
def decode_file(file_in_name, file_out_name):
233+
with open(file_in_name, "rb") as file_in, open(file_out_name, "wb") as file_out:
234+
reader = HuffmanReader(file_in)
235+
additional_bits = reader.get_number_of_additional_bits_in_the_last_byte()
236+
tree = reader.load_tree()
237+
238+
HuffmanCoding._decode_and_write_signs_to_file(file_out, reader, tree, additional_bits)
239+
240+
print("File decoded.")
241+
242+
@staticmethod
243+
def _decode_and_write_signs_to_file(file, reader: HuffmanReader, tree: Node, additional_bits: int):
244+
tree_finder = TreeFinder(tree)
245+
is_end_of_file = False
246+
247+
while not is_end_of_file:
248+
bit = reader.get_bit()
249+
if bit != -1:
250+
while not tree_finder.find(bit): # read whole code
251+
bit = reader.get_bit(0)
252+
file.write(bytes([tree_finder.found]))
253+
else: # There is last byte in buffer to parse
254+
is_end_of_file = True
255+
last_byte = reader.buffer
256+
last_byte = last_byte[:-additional_bits] # remove additional "0" used to fill byte
257+
for bit in last_byte:
258+
if tree_finder.find(bit):
259+
file.write(bytes([tree_finder.found]))
260+
261+
@staticmethod
262+
def encode_file(file_in_name, file_out_name):
263+
with open(file_in_name, "rb") as file_in, open(file_out_name, mode="wb+") as file_out:
264+
signs_frequency = HuffmanCoding._get_char_frequency(file_in)
265+
file_in.seek(0)
266+
tree = HuffmanCoding._create_tree(signs_frequency)
267+
codes = HuffmanCoding._generate_codes(tree)
268+
269+
writer = HuffmanWriter(file_out)
270+
writer.write_bits("000") # leave space to save how many bits will be appended to fill the last byte
271+
writer.save_tree(tree)
272+
HuffmanCoding._encode_and_write_signs_to_file(file_in, writer, codes)
273+
writer.close()
274+
275+
print("File encoded.")
276+
277+
@staticmethod
278+
def _encode_and_write_signs_to_file(file, writer: HuffmanWriter, codes: dict):
279+
sign = file.read(1)
280+
while sign:
281+
int_char = int.from_bytes(sign, "big")
282+
writer.write_bits(codes[int_char])
283+
sign = file.read(1)
284+
285+
@staticmethod
286+
def _get_char_frequency(file) -> dict:
287+
is_end_of_file = False
288+
signs_frequency = defaultdict(lambda: 0)
289+
while not is_end_of_file:
290+
prev_pos = file.tell()
291+
sign = file.read(1)
292+
curr_pos = file.tell()
293+
if prev_pos == curr_pos:
294+
is_end_of_file = True
295+
else:
296+
signs_frequency[int.from_bytes(sign, "big")] += 1
297+
298+
return signs_frequency
299+
300+
@staticmethod
301+
def _generate_codes(tree: Node) -> dict:
302+
codes = dict()
303+
HuffmanCoding._go_through_tree_and_create_codes(tree, "", codes)
304+
return codes
305+
306+
@staticmethod
307+
def _create_tree(signs_frequency: dict) -> Node:
308+
nodes = [Node(frequency=frequency, sign=char_int) for char_int, frequency in signs_frequency.items()]
309+
heapq.heapify(nodes)
310+
311+
while len(nodes) > 1:
312+
left = heapq.heappop(nodes)
313+
right = heapq.heappop(nodes)
314+
new_node = Node(frequency=left.frequency + right.frequency, left=left, right=right)
315+
heapq.heappush(nodes, new_node)
316+
317+
return nodes[0] # root
318+
319+
@staticmethod
320+
def _go_through_tree_and_create_codes(tree: Node, code: str, dict_codes: dict):
321+
if tree.sign is not None:
322+
dict_codes[tree.sign] = code
323+
324+
if tree.left:
325+
HuffmanCoding._go_through_tree_and_create_codes(tree.left, code + "0", dict_codes)
326+
327+
if tree.right:
328+
HuffmanCoding._go_through_tree_and_create_codes(tree.right, code + "1", dict_codes)

tests/test_compression.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from algorithms.compression.huffman_coding import HuffmanCoding
2+
3+
import unittest
4+
5+
6+
class TestHuffmanCoding(unittest.TestCase):
7+
@classmethod
8+
def setUpClass(cls):
9+
cls.file_in_name = "huffman_coding_in.txt"
10+
cls.file_out_bin_name = "huffman_coding_out.bin"
11+
cls.file_out_name = "huffman_coding_out.txt"
12+
13+
def setUp(self):
14+
import random
15+
random.seed(1951)
16+
with open(self.file_in_name, "wb") as file_in:
17+
for i in range(10000):
18+
file_in.write(bytes([random.randrange(0, 256)]))
19+
20+
def test_huffman_coding(self):
21+
HuffmanCoding.encode_file(self.file_in_name, self.file_out_bin_name)
22+
HuffmanCoding.decode_file(self.file_out_bin_name, self.file_out_name)
23+
24+
with open(self.file_in_name, "rb") as file_1, open(self.file_out_name, "rb") as file_2:
25+
content_1 = file_1.read()
26+
content_2 = file_2.read()
27+
28+
self.assertEqual(content_1, content_2)
29+
30+
def tearDown(self):
31+
import os
32+
os.remove(self.file_in_name)
33+
os.remove(self.file_out_bin_name)
34+
os.remove(self.file_out_name)
35+
36+
37+
if __name__ == "__main__":
38+
unittest.main()

0 commit comments

Comments
 (0)