Skip to content

Commit d6a990d

Browse files
cun8cun8dianfu
authored andcommitted
[FLINK-26444][python] Move all window tests into test_window.py
This closes apache#18957.
1 parent eac5b97 commit d6a990d

File tree

1 file changed

+182
-0
lines changed

1 file changed

+182
-0
lines changed
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
from typing import Iterable
20+
21+
from pyflink.common.time import Time
22+
from pyflink.common.typeinfo import Types
23+
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
24+
from pyflink.datastream.data_stream import DataStream
25+
from pyflink.datastream.functions import (ProcessWindowFunction, WindowFunction)
26+
from pyflink.datastream.window import (TumblingEventTimeWindows,
27+
SlidingEventTimeWindows, EventTimeSessionWindows,
28+
CountSlidingWindowAssigner, SessionWindowTimeGapExtractor,
29+
CountWindow)
30+
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
31+
from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
32+
33+
34+
class WindowTests(PyFlinkStreamingTestCase):
35+
36+
def setUp(self) -> None:
37+
super(WindowTests, self).setUp()
38+
self.test_sink = DataStreamTestSinkFunction()
39+
40+
def tearDown(self) -> None:
41+
self.test_sink.clear()
42+
43+
def assert_equals_sorted(self, expected, actual):
44+
expected.sort()
45+
actual.sort()
46+
self.assertEqual(expected, actual)
47+
48+
def test_event_time_tumbling_window(self):
49+
data_stream = self.env.from_collection([
50+
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8), ('hi', 9),
51+
('hi', 15)],
52+
type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: DataStream
53+
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
54+
.with_timestamp_assigner(SecondColumnTimestampAssigner())
55+
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
56+
.key_by(lambda x: x[0], key_type=Types.STRING()) \
57+
.window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
58+
.process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
59+
.add_sink(self.test_sink)
60+
61+
self.env.execute('test_event_time_tumbling_window')
62+
results = self.test_sink.get_results()
63+
expected = ['(hi,4)', '(hi,3)', '(hi,1)']
64+
self.assert_equals_sorted(expected, results)
65+
66+
def test_count_tumbling_window(self):
67+
data_stream = self.env.from_collection([
68+
(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'),
69+
(6, 'hello')],
70+
type_info=Types.TUPLE([Types.INT(), Types.STRING()])) # type: DataStream
71+
data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
72+
.count_window(3) \
73+
.apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
74+
.add_sink(self.test_sink)
75+
76+
self.env.execute('test_count_tumbling_window')
77+
results = self.test_sink.get_results()
78+
expected = ['(hi,9)', '(hello,12)']
79+
self.assert_equals_sorted(expected, results)
80+
81+
def test_event_time_sliding_window(self):
82+
data_stream = self.env.from_collection([
83+
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8), ('hi', 9),
84+
('hi', 15)],
85+
type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: DataStream
86+
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
87+
.with_timestamp_assigner(SecondColumnTimestampAssigner())
88+
89+
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
90+
.key_by(lambda x: x[0], key_type=Types.STRING()) \
91+
.window(SlidingEventTimeWindows.of(Time.milliseconds(5), Time.milliseconds(2))) \
92+
.process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
93+
.add_sink(self.test_sink)
94+
95+
self.env.execute('test_event_time_sliding_window')
96+
results = self.test_sink.get_results()
97+
expected = ['(hi,2)', '(hi,4)', '(hi,4)', '(hi,3)', '(hi,2)', '(hi,2)', '(hi,1)', '(hi,1)']
98+
self.assert_equals_sorted(expected, results)
99+
100+
def test_count_sliding_window(self):
101+
data_stream = self.env.from_collection([
102+
(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
103+
type_info=Types.TUPLE([Types.INT(), Types.STRING()])) # type: DataStream
104+
data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
105+
.window(CountSlidingWindowAssigner(2, 1)) \
106+
.apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
107+
.add_sink(self.test_sink)
108+
109+
self.env.execute('test_count_sliding_window')
110+
results = self.test_sink.get_results()
111+
expected = ['(hello,6)', '(hi,8)', '(hi,4)', '(hello,10)']
112+
self.assert_equals_sorted(expected, results)
113+
114+
def test_event_time_session_window(self):
115+
data_stream = self.env.from_collection([
116+
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9), ('hi', 15)],
117+
type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: DataStream
118+
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
119+
.with_timestamp_assigner(SecondColumnTimestampAssigner())
120+
121+
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
122+
.key_by(lambda x: x[0], key_type=Types.STRING()) \
123+
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(5))) \
124+
.process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
125+
.add_sink(self.test_sink)
126+
127+
self.env.execute('test_event_time_session_window')
128+
results = self.test_sink.get_results()
129+
expected = ['(hi,1)', '(hi,6)']
130+
self.assert_equals_sorted(expected, results)
131+
132+
def test_event_time_dynamic_gap_session_window(self):
133+
self.env.set_parallelism(1)
134+
data_stream = self.env.from_collection([
135+
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 9), ('hi', 9), ('hi', 15)],
136+
type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: DataStream
137+
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
138+
.with_timestamp_assigner(SecondColumnTimestampAssigner())
139+
140+
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
141+
.key_by(lambda x: x[0], key_type=Types.STRING()) \
142+
.window(EventTimeSessionWindows.with_dynamic_gap(MySessionWindowTimeGapExtractor())) \
143+
.process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
144+
.add_sink(self.test_sink)
145+
146+
self.env.execute('test_event_time_dynamic_gap_session_window')
147+
results = self.test_sink.get_results()
148+
expected = ['(hi,3)', '(hi,4)']
149+
self.assert_equals_sorted(expected, results)
150+
151+
152+
class SecondColumnTimestampAssigner(TimestampAssigner):
153+
154+
def extract_timestamp(self, value, record_timestamp) -> int:
155+
return int(value[1])
156+
157+
158+
class MySessionWindowTimeGapExtractor(SessionWindowTimeGapExtractor):
159+
160+
def extract(self, element: tuple) -> int:
161+
return element[1]
162+
163+
164+
class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):
165+
166+
def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):
167+
result = 0
168+
for i in inputs:
169+
result += i[0]
170+
return [(key, result)]
171+
172+
173+
class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, CountWindow]):
174+
175+
def process(self,
176+
key: str,
177+
content: ProcessWindowFunction.Context,
178+
elements: Iterable[tuple]) -> Iterable[tuple]:
179+
return [(key, len([e for e in elements]))]
180+
181+
def clear(self, context: ProcessWindowFunction.Context) -> None:
182+
pass

0 commit comments

Comments
 (0)