Skip to content

Commit 29c839c

Browse files
committed
ch03: Streams basic improved. Parallel section improved added.
1 parent c8830dd commit 29c839c

File tree

7 files changed

+279
-17
lines changed

7 files changed

+279
-17
lines changed

03-streams.md

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
1-
## Streams
1+
Streams
2+
------
23

3-
On [day 1](http://shekhargulati.com/2015/07/25/day-1-lets-learn-about-lambdas/), we learnt how lambdas can help us write clean concise code by allowing us to pass behavior without the need to create a class. Lambdas is a very simple language construct that helps developer express their intent on the fly by using functional interfaces. The real power of lambdas can be experienced when an API is designed keeping lambdas in mind i.e. a fluent API that makes use of Functional interfaces(we discussed them on day 1).
4+
In [chapter 2](./02-lambdas.md), we learnt how lambdas can help us write clean concise code by allowing us to pass behavior without the need to create a class. Lambdas is a very simple language construct that helps developer express their intent on the fly by using functional interfaces. The real power of lambdas can be experienced when an API is designed keeping lambdas in mind i.e. a fluent API that makes use of Functional interfaces (we discussed them in [lambdas chapter](./02-lambdas.md#do-i-need-to-write-my-own-functional-interfaces)).
45

5-
One such API that makes heavy use of lambdas is Stream API introduced in JDK 8. Streams provide a higher level abstraction to express computations on Java collections in a declarative way similar to how SQL helps you declaratively query data in the database. Declarative means developers write what they want to do rather than how it should be done. Almost every Java developer has used `Collection` API for storing, accessing, and manipulating data. In this blog, we will discuss why need a new API, difference between Collection and Stream, and how to use Stream API in your applications.
6+
One such API that makes heavy use of lambdas is Stream API introduced in JDK 8. Streams provide a higher level abstraction to express computations on Java collections in a declarative way similar to how SQL helps you declaratively query data in the database. Declarative means developers write what they want to do rather than how it should be done. In this chapter, we will discuss why need a new data processing API, difference between Collection and Stream, and how to use Stream API in your applications.
7+
8+
> Code for this section is inside [ch03 package](https://github.com/shekhargulati/java8-the-missing-tutorial/tree/master/code/src/main/java/com/shekhargulati/java8_tutorial/ch03).
69
710
## Why we need a new data processing abstraction?
811

9-
1. Collection API is too low level: Collection API does not provide higher level constructs to query the data so developers are forced to write a lot of boilerplate code for the most trivial task.
12+
In my opinion, there are two reasons:
1013

11-
2. Limited language support to process Collections in parallel
14+
1. Collection API does not provide higher level constructs to query the data so developers are forced to write a lot of boilerplate code for the most trivial task.
1215

16+
2. It has limited language support to process Collection data in parallel. It is left to the developer to use Java language concurrency constructs and process data effectively and efficiently in parallel.
1317

14-
### Data processing before Java 8
18+
## Data processing before Java 8
1519

1620
Look at the code shown below and try to predict what code does.
1721

@@ -42,25 +46,28 @@ public class Example1_Java7 {
4246

4347
The code shown above prints all the reading task titles sorted by their title length. All Java developers write this kind of code everyday. To write such a simple program we had to write 15 lines of Java code. The bigger problem with the above mentioned code is not the number of lines a developer has to write but, it misses the developer's intent i.e. filtering reading tasks, sorting by title length, and transforming to List of String.
4448

45-
### Data processing in Java 8
49+
## Data processing in Java 8
4650

47-
The above mentioned code can be simplified using Java 8 streams as shown below.
51+
The above mentioned code can be simplified using Java 8 streams API as shown below.
4852

4953
```java
5054
public class Example1_Stream {
55+
5156
public static void main(String[] args) {
5257
List<Task> tasks = getTasks();
58+
5359
List<String> readingTasks = tasks.stream()
5460
.filter(task -> task.getType() == TaskType.READING)
5561
.sorted((t1, t2) -> t1.getTitle().length() - t2.getTitle().length())
5662
.map(Task::getTitle)
5763
.collect(Collectors.toList());
64+
5865
readingTasks.forEach(System.out::println);
5966
}
6067
}
6168
```
6269

63-
The line `tasks.stream().filter(task ->task.getType() == TaskType.READING).sorted((t1, t2) -> t1.getTitle().length() - t2.getTitle().length()).map(Task::getTitle).collect(Collectors.toList())` constructs a stream pipeline composing of multiple stream operations as discussed below.
70+
The code shown above constructs a pipeline composing of multiple stream operations as discussed below.
6471

6572
* **stream()** - You created a stream pipeline by invoking the `stream()` method on the source collection i.e. `tasks` `List<Task>`.
6673

@@ -76,22 +83,31 @@ The line `tasks.stream().filter(task ->task.getType() == TaskType.READING).sorte
7683

7784
In my opinion Java 8 code is better because of following reasons:
7885

79-
1. Java 8 code clearly reflect developer intent
80-
2. Developer expressed what they want to do rather than how they want do it
81-
3. Stream API provides a unified language for data processing
82-
4. No boilerplate code
86+
1. Java 8 code clearly reflect developer intent of filtering, sorting, etc.
87+
88+
2. Developers express what they want to do rather than how they want do it by using higher level functions like filter, map, etc.
89+
90+
3. Stream API provides a unified language for data processing. Now developers will have the common vocabulary when they are talking about data processing. When two developers talk about `filter` function you can be sure that they both are applying a data filtering operation.
91+
92+
4. No boilerplate code required to express data processing. Developers now don't have to write explicit for loops or create temporary collections to store data. All is taken care by the Stream API itself.
8393

84-
### What is a Stream?
94+
## What is a Stream?
8595

86-
Stream is a sequence of elements where elements are computed on demand. Streams are lazy by nature and they are only computed when accessed. This allows us to produce infinite streams. In Java before version 8, there was no way to produce infinite elements. With Java 8, you can very easily write a Stream that will produce infinite unique identifiers as shown below.
96+
Stream is an abstract view over some data. For example, Stream can be a view over a list or lines in a file or any other sequence of elements. Stream API provides aggregate operations that can be performed sequentially or in parallel. Streams are lazy by nature and they are only computed when accessed. This allows us to produce infinite streams of data. In Java 8, you can very easily write a Stream that will produce infinite unique identifiers as shown below.
8797

8898
```
8999
public static void main(String[] args) {
90100
Stream<String> uuidStream = Stream.generate(() -> UUID.randomUUID().toString());
91101
}
92102
```
93103

94-
The code shown above will create a Stream that can produce infinite UUID's. If you run this program nothing will happen as Streams are lazy and until they are accessed nothing will be computed. If we update the program to the one shown below we will see UUID printing to the console. The program will never terminate.
104+
There are various static factory methods like `of`, `generate`, and `iterate` in the Stream interface that one can use to create Stream instances. The `generate` method shown above takes a `Supplier`. `Supplier` is a functional interface to describe a function that does not take any input and produce a value. We passed the `generate` method a supplier that when invoked generates a unique identifier.
105+
106+
```java
107+
Supplier<String> uuids = () -> UUID.randomUUID().toString()
108+
```
109+
110+
If you run this program nothing will happen as Streams are lazy and until they are accessed nothing will be computed. If we update the program to the one shown below we will see UUID printing to the console. The program will never terminate.
95111

96112
```java
97113
public static void main(String[] args) {
@@ -439,9 +455,42 @@ Arrays.stream(tags, 1, 3).map(String::toUpperCase).forEach(System.out::println);
439455

440456
### Parallel Streams
441457

442-
One advantage that you get by using Stream abstraction is that now library can effectively manage parallelism as iteration is internal. So, to process a stream in parallel it is as easy as shown below.
458+
One advantage that you get by using Stream abstraction is that now library can effectively manage parallelism as iteration is internal. You can make a stream parallel by calling `parallel` method on it. The `parallel` method underneath uses the fork-join API introduced in JDK 7. By default, it will spawn up threads equal to number of CPU in your machine. In the code show below, we are grouping numbers by thread that processed them. You will learn about `collect` and `groupingBy` functions in chapter 4. For now just understand that they allow you to group elements based on a key.
459+
460+
```java
461+
public class ParallelStreamExample {
462+
463+
public static void main(String[] args) {
464+
Map<String, List<Integer>> numbersPerThread = IntStream.rangeClosed(1, 160)
465+
.parallel()
466+
.boxed()
467+
.collect(groupingBy(i -> Thread.currentThread().getName()));
468+
469+
numbersPerThread.forEach((k, v) -> System.out.println(String.format("%s >> %s", k, v)));
470+
}
471+
}
472+
```
473+
474+
The output of the above program on my machine looks like as shown below.
475+
476+
```
477+
ForkJoinPool.commonPool-worker-7 >> [46, 47, 48, 49, 50]
478+
ForkJoinPool.commonPool-worker-1 >> [41, 42, 43, 44, 45, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130]
479+
ForkJoinPool.commonPool-worker-2 >> [146, 147, 148, 149, 150]
480+
main >> [106, 107, 108, 109, 110]
481+
ForkJoinPool.commonPool-worker-5 >> [71, 72, 73, 74, 75]
482+
ForkJoinPool.commonPool-worker-6 >> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160]
483+
ForkJoinPool.commonPool-worker-3 >> [21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 76, 77, 78, 79, 80]
484+
ForkJoinPool.commonPool-worker-4 >> [91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145]
485+
```
486+
487+
Not every thread process same number of elements. You can control the size of fork join thread pool by setting a system property `System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2")`
488+
489+
Another example where you can use `parallel` operation is when you are processing a list of URLs as shown below.
443490

444491
```java
445492
String[] urls = {"https://www.google.co.in/", "https://twitter.com/", "http://www.facebook.com/"};
446493
Arrays.stream(urls).parallel().map(url -> getUrlContent(url)).forEach(System.out::println);
447494
```
495+
496+
If you need to understand when to use Parallel Stream I would recommend you read this article by Doug Lea and few other Java folks [http://gee.cs.oswego.edu/dl/html/StreamParallelGuidance.html](http://gee.cs.oswego.edu/dl/html/StreamParallelGuidance.html) to gain better understanding.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.shekhargulati.java8_tutorial.ch03;
2+
3+
4+
import com.shekhargulati.java8_tutorial.domain.Task;
5+
import com.shekhargulati.java8_tutorial.domain.TaskType;
6+
7+
import java.util.ArrayList;
8+
import java.util.Collections;
9+
import java.util.Comparator;
10+
import java.util.List;
11+
12+
import static com.shekhargulati.java8_tutorial.utils.DataUtils.getTasks;
13+
14+
15+
public class Example1_Java7 {
16+
17+
public static void main(String[] args) {
18+
List<Task> tasks = getTasks();
19+
List<Task> readingTasks = new ArrayList<>();
20+
for (Task task : tasks) {
21+
if (task.getType() == TaskType.READING) {
22+
readingTasks.add(task);
23+
}
24+
}
25+
Collections.sort(readingTasks, new Comparator<Task>() {
26+
@Override
27+
public int compare(Task t1, Task t2) {
28+
return t1.getTitle().length() - t2.getTitle().length();
29+
}
30+
});
31+
for (Task readingTask : readingTasks) {
32+
System.out.println(readingTask.getTitle());
33+
}
34+
}
35+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.shekhargulati.java8_tutorial.ch03;
2+
3+
4+
import com.shekhargulati.java8_tutorial.domain.Task;
5+
import com.shekhargulati.java8_tutorial.domain.TaskType;
6+
7+
import java.util.List;
8+
import java.util.stream.Collectors;
9+
10+
import static com.shekhargulati.java8_tutorial.utils.DataUtils.getTasks;
11+
12+
13+
public class Example1_Stream {
14+
15+
public static void main(String[] args) {
16+
List<Task> tasks = getTasks();
17+
18+
List<String> readingTasks = tasks.stream()
19+
.filter(task -> task.getType() == TaskType.READING)
20+
.sorted((t1, t2) -> t1.getTitle().length() - t2.getTitle().length())
21+
.map(Task::getTitle)
22+
.collect(Collectors.toList());
23+
24+
readingTasks.forEach(System.out::println);
25+
}
26+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.shekhargulati.java8_tutorial.ch03;
2+
3+
import java.util.List;
4+
import java.util.Map;
5+
import java.util.stream.IntStream;
6+
7+
import static java.util.stream.Collectors.groupingBy;
8+
9+
public class ParallelStreamExample {
10+
11+
public static void main(String[] args) {
12+
Map<String, List<Integer>> numbersPerThread = IntStream.rangeClosed(1, 160)
13+
.parallel()
14+
.boxed()
15+
.collect(groupingBy(i -> Thread.currentThread().getName()));
16+
17+
numbersPerThread.forEach((k, v) -> System.out.println(String.format("%s >> %s", k, v)));
18+
}
19+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package com.shekhargulati.java8_tutorial.domain;
2+
3+
import java.time.LocalDate;
4+
import java.util.Collections;
5+
import java.util.HashSet;
6+
import java.util.Objects;
7+
import java.util.Set;
8+
9+
public class Task {
10+
11+
private final String title;
12+
private final String description;
13+
private final TaskType type;
14+
private LocalDate createdOn;
15+
private Set<String> tags = new HashSet<>();
16+
17+
public Task(final String title, final TaskType type) {
18+
this(title, title, type, LocalDate.now());
19+
}
20+
21+
public Task(final String title, final TaskType type, final LocalDate createdOn) {
22+
this(title, title, type, createdOn);
23+
}
24+
25+
public Task(final String title, final String description, final TaskType type, final LocalDate createdOn) {
26+
this.title = title;
27+
this.description = description;
28+
this.type = type;
29+
this.createdOn = createdOn;
30+
}
31+
32+
public String getTitle() {
33+
return title;
34+
}
35+
36+
public String getDescription() {
37+
return description;
38+
}
39+
40+
public TaskType getType() {
41+
return type;
42+
}
43+
44+
public LocalDate getCreatedOn() {
45+
return createdOn;
46+
}
47+
48+
public Task addTag(String tag) {
49+
this.tags.add(tag);
50+
return this;
51+
}
52+
53+
public Set<String> getTags() {
54+
return Collections.unmodifiableSet(tags);
55+
}
56+
57+
@Override
58+
public String toString() {
59+
return "Task{" +
60+
"title='" + title + '\'' +
61+
", type=" + type +
62+
'}';
63+
}
64+
65+
@Override
66+
public boolean equals(Object o) {
67+
if (this == o) return true;
68+
if (o == null || getClass() != o.getClass()) return false;
69+
Task task = (Task) o;
70+
return Objects.equals(title, task.title) &&
71+
Objects.equals(type, task.type);
72+
}
73+
74+
@Override
75+
public int hashCode() {
76+
return Objects.hash(title, type);
77+
}
78+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.shekhargulati.java8_tutorial.domain;
2+
3+
public enum TaskType {
4+
5+
READING, CODING, BLOGGING
6+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.shekhargulati.java8_tutorial.utils;
2+
3+
4+
import com.shekhargulati.java8_tutorial.domain.Task;
5+
import com.shekhargulati.java8_tutorial.domain.TaskType;
6+
7+
import java.io.IOException;
8+
import java.nio.file.Files;
9+
import java.nio.file.Paths;
10+
import java.time.LocalDate;
11+
import java.time.Month;
12+
import java.util.List;
13+
import java.util.stream.IntStream;
14+
import java.util.stream.Stream;
15+
16+
import static java.util.stream.Collectors.toList;
17+
18+
19+
public class DataUtils {
20+
21+
public static Stream<String> lines() {
22+
return filePathToStream("src/main/resources/book.txt");
23+
}
24+
25+
public static Stream<String> negativeWords() {
26+
return filePathToStream("src/main/resources/negative-words.txt");
27+
}
28+
29+
public static Stream<String> filePathToStream(String path) {
30+
try {
31+
return Files.lines(Paths.get("training", path));
32+
} catch (IOException e) {
33+
throw new RuntimeException(e);
34+
}
35+
}
36+
37+
public static IntStream range(int start, int end) {
38+
return IntStream.rangeClosed(start, end);
39+
}
40+
41+
public static List<Task> getTasks() {
42+
Task task1 = new Task("Read Java 8 in action", TaskType.READING, LocalDate.of(2015, Month.SEPTEMBER, 20)).addTag("java").addTag("java8").addTag("books");
43+
Task task2 = new Task("Write factorial program in Haskell", TaskType.CODING, LocalDate.of(2015, Month.SEPTEMBER, 20)).addTag("program").addTag("haskell").addTag("functional");
44+
Task task3 = new Task("Read Effective Java", TaskType.READING, LocalDate.of(2015, Month.SEPTEMBER, 21)).addTag("java").addTag("books");
45+
Task task4 = new Task("Write a blog on Stream API", TaskType.BLOGGING, LocalDate.of(2015, Month.SEPTEMBER, 21)).addTag("writing").addTag("stream").addTag("java8");
46+
Task task5 = new Task("Write prime number program in Scala", TaskType.CODING, LocalDate.of(2015, Month.SEPTEMBER, 22)).addTag("scala").addTag("functional").addTag("program");
47+
return Stream.of(task1, task2, task3, task4, task5).collect(toList());
48+
}
49+
}

0 commit comments

Comments
 (0)