Skip to content

Commit c0d0a6a

Browse files
committed
3_2
1 parent a5a873d commit c0d0a6a

File tree

7 files changed

+417
-0
lines changed

7 files changed

+417
-0
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ example `1_2` assumes you are already familiar with `1_1`. Currently the followi
7979
2.3: Video pipeline with ImageCroppingCalculator (dynamic crop)
8080
2.4: Video pipeline with FeatureDetectorCalculator and custom image processing
8181

82+
3.1: Why MediaPipe is not real-time?
83+
3.2: Packet loss with FlowLimiterCalculator
8284

8385
Why Bazel?
8486
--------
@@ -102,5 +104,6 @@ Finally, is there any IDE for Bazel C++ projects? CLion plugin is advertized, bu
102104
VS code also has Bazel plugins, which did not work for me either. However, if I open the entire MP tree as a single
103105
VS Code project, VS Code can (mostly) find MP headers, and thus the "Show symbol definition" function works.
104106
To me this is the main reason to use IDE (as opposed to a text editor). So I edited the code in VS Code, and executed it from the terminal.
107+
Downside: VS code runs `cpptools` almost non-stop, taking computer resources.
105108

106109
I wonder what IDE do they use in Google?

first_steps/3_1/BUILD

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
cc_binary(
2+
name="3_1",
3+
srcs=["main.cpp", "slow_calculator.cpp"],
4+
deps=[
5+
"//mediapipe/framework:calculator_framework",
6+
"//mediapipe/framework/formats:image_frame",
7+
"//mediapipe/framework/formats:image_frame_opencv",
8+
"//mediapipe/framework/port:opencv_highgui",
9+
"//mediapipe/framework/port:opencv_imgproc",
10+
"//mediapipe/framework/port:parse_text_proto",
11+
"//mediapipe/framework/port:status",
12+
],
13+
)

first_steps/3_1/main.cpp

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/// Example 3.1 : Why MediaPipe is not real-time?
2+
/// By Oleksiy Grechnyev, IT-JIM
3+
/// Here we use SlowCalculator which simulates some slow image-processing operation
4+
/// By default, MP is not real-time. It queues all received package until
5+
/// they are eventually processed by SlowCalculator.
6+
/// This is the idea at least. No packages are ever lost.
7+
/// Since input packets arrive in real-time, and their number is unlimited,
8+
/// it causes an ever-increased lag until the program fills all RAM and crashes.
9+
10+
#include <iostream>
11+
#include <string>
12+
#include <memory>
13+
#include <atomic>
14+
#include <mutex>
15+
#include <cmath>
16+
17+
#include "mediapipe/framework/calculator_framework.h"
18+
#include "mediapipe/framework/formats/image_frame.h"
19+
#include "mediapipe/framework/formats/image_frame_opencv.h"
20+
#include "mediapipe/framework/port/parse_text_proto.h"
21+
#include "mediapipe/framework/port/status.h"
22+
23+
#include "mediapipe/framework/port/opencv_highgui_inc.h"
24+
#include "mediapipe/framework/port/opencv_imgproc_inc.h"
25+
26+
//==============================================================================
27+
mediapipe::Status run() {
28+
using namespace std;
29+
using namespace mediapipe;
30+
31+
// A graph with SlowCalculator and nothing else
32+
string protoG = R"(
33+
input_stream: "in"
34+
output_stream: "out"
35+
node {
36+
calculator: "SlowCalculator"
37+
input_stream: "IMAGE:in"
38+
output_stream: "IMAGE:out"
39+
}
40+
)";
41+
42+
// Parse config and create graph
43+
CalculatorGraphConfig config;
44+
if (!ParseTextProto<mediapipe::CalculatorGraphConfig>(protoG, &config)) {
45+
return absl::InternalError("Cannot parse the graph config !");
46+
}
47+
CalculatorGraph graph;
48+
MP_RETURN_IF_ERROR(graph.Initialize(config));
49+
50+
// Mutex protecting imshow() and the stop flag
51+
mutex mutexImshow;
52+
atomic_bool flagStop(false);
53+
54+
// Add observer to "out", then start the graph
55+
// This callback displays the frame on the screen
56+
auto cb = [&mutexImshow, &flagStop](const Packet &packet)->Status{
57+
58+
// Get cv::Mat from the packet
59+
const ImageFrame & outputFrame = packet.Get<ImageFrame>();
60+
cv::Mat ofMat = formats::MatView(&outputFrame);
61+
cv::Mat frameOut;
62+
cvtColor(ofMat, frameOut, cv::COLOR_RGB2BGR);
63+
cout << packet.Timestamp() << ": RECEIVED VIDEO PACKET size = " << frameOut.size() << endl;
64+
65+
{
66+
lock_guard<mutex> lock(mutexImshow);
67+
// Display frame on screen and quit on ESC
68+
cv::imshow("frameOut", frameOut);
69+
if (27 == cv::waitKey(1)){
70+
cout << "It's time to QUIT !" << endl;
71+
flagStop = true;
72+
}
73+
}
74+
return OkStatus();
75+
};
76+
MP_RETURN_IF_ERROR(graph.ObserveOutputStream("out", cb));
77+
graph.StartRun({});
78+
79+
// Start the camera and check that it works
80+
cv::VideoCapture cap(cv::CAP_ANY);
81+
if (!cap.isOpened())
82+
return absl::NotFoundError("CANNOT OPEN CAMERA !");
83+
cv::Mat frameIn, frameInRGB;
84+
85+
// Camera loop, runs until we get flagStop == true
86+
for (int i=0; !flagStop ; ++i){
87+
// Read next frame from camera
88+
cap.read(frameIn);
89+
if (frameIn.empty())
90+
return absl::NotFoundError("CANNOT OPEN CAMERA !");
91+
92+
cout << "SIZE_IN = " << frameIn.size() << endl;
93+
{
94+
lock_guard<mutex> lock(mutexImshow);
95+
cv::imshow("frameIn", frameIn);
96+
}
97+
98+
// Convert it to a packet and send
99+
cv::cvtColor(frameIn, frameInRGB, cv::COLOR_BGR2RGB);
100+
ImageFrame *inputFrame = new ImageFrame(
101+
ImageFormat::SRGB, frameInRGB.cols, frameInRGB.rows, ImageFrame::kDefaultAlignmentBoundary
102+
);
103+
frameInRGB.copyTo(formats::MatView(inputFrame));
104+
Timestamp ts(i);
105+
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream("in",
106+
Adopt(inputFrame).At(ts)
107+
));
108+
109+
}
110+
// Close the input streams, Wait for the graph to finish
111+
graph.CloseInputStream("in");
112+
MP_RETURN_IF_ERROR(graph.WaitUntilDone());
113+
return OkStatus();
114+
}
115+
116+
//==============================================================================
117+
int main(int argc, char** argv){
118+
using namespace std;
119+
120+
FLAGS_alsologtostderr = 1;
121+
google::SetLogDestination(google::GLOG_INFO, ".");
122+
google::InitGoogleLogging(argv[0]);
123+
124+
cout << "Example 3.1 : Why MediaPipe is not real-time? " << endl;
125+
mediapipe::Status status = run();
126+
cout << "status =" << status << endl;
127+
cout << "status.ok() = " << status.ok() << endl;
128+
return 0;
129+
}

first_steps/3_1/slow_calculator.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#include <chrono>
2+
#include <thread>
3+
4+
#include "mediapipe/framework/calculator_framework.h"
5+
#include "mediapipe/framework/formats/image_frame.h"
6+
#include "mediapipe/framework/formats/image_frame_opencv.h"
7+
#include "mediapipe/framework/port/status.h"
8+
9+
#include "mediapipe/framework/port/opencv_highgui_inc.h"
10+
#include "mediapipe/framework/port/opencv_imgproc_inc.h"
11+
12+
//==============================================================================
13+
namespace mediapipe {
14+
/// A custom image-processing calculator
15+
/// It applies photo-negative to the central 1/9 of the image
16+
/// The catch: we slow it down deliberately with a 0.2s delay (5 ~fps)
17+
/// To simulate the effect of a slow image-processing calcualtor
18+
class SlowCalculator : public CalculatorBase {
19+
public:
20+
static Status GetContract(CalculatorContract *cc) {
21+
using namespace std;
22+
// 1 input: image and keypoints
23+
cc->Inputs().Tag("IMAGE").Set<ImageFrame>();
24+
// 1 output: image with keypoints painted
25+
cc->Outputs().Tag("IMAGE").Set<ImageFrame>();
26+
return OkStatus();
27+
}
28+
29+
Status Process(CalculatorContext *cc) override {
30+
using namespace std;
31+
using namespace cv;
32+
// Get input packets
33+
Packet pIn = cc->Inputs().Tag("IMAGE").Value();
34+
35+
// Create a new ImageFrame by copying the one from from pIn, then modify the copy
36+
ImageFrame *iFrame = new ImageFrame();
37+
iFrame->CopyFrom(pIn.Get<ImageFrame>(), 1);
38+
Mat img = formats::MatView(iFrame);
39+
40+
// Apply photo negative to img central 1/9
41+
int nc = img.cols / 3, nr = img.rows / 3;
42+
Rect r(nc, nr, nc, nr);
43+
Mat m(img, r);
44+
bitwise_not(m, m);
45+
46+
// Slow down artificially: wait for 200 ms !
47+
this_thread::sleep_for(chrono::milliseconds(200));
48+
49+
// Create output packet from iFrame and send it
50+
Packet pOut = Adopt<ImageFrame>(iFrame).At(cc->InputTimestamp());
51+
cc->Outputs().Tag("IMAGE").AddPacket(pOut);
52+
return OkStatus();
53+
}
54+
};
55+
REGISTER_CALCULATOR(SlowCalculator);
56+
}
57+
//==============================================================================

first_steps/3_2/BUILD

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
cc_binary(
2+
name="3_2",
3+
srcs=["main.cpp", "slow_calculator.cpp"],
4+
deps=[
5+
"//mediapipe/framework:calculator_framework",
6+
"//mediapipe/calculators/core:flow_limiter_calculator",
7+
"//mediapipe/framework/formats:image_frame",
8+
"//mediapipe/framework/formats:image_frame_opencv",
9+
"//mediapipe/framework/port:opencv_highgui",
10+
"//mediapipe/framework/port:opencv_imgproc",
11+
"//mediapipe/framework/port:parse_text_proto",
12+
"//mediapipe/framework/port:status",
13+
],
14+
)

first_steps/3_2/main.cpp

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/// Example 3.2 : Packet loss with FlowLimiterCalculator
2+
/// By Oleksiy Grechnyev, IT-JIM
3+
/// In the previous example, we saw how a slow calculator breaks down a real-time pipeline.
4+
/// We need to lose some packets to prevent queue (buffer) ovverflow.
5+
/// This is a standard situation in real-time pipelines.
6+
/// Solution? FlowLimiterCalculator !
7+
/// It starts to lose packets if the buffer AFTER FlowLimiterCalculator fills up.
8+
/// Result: The output video has low FPS, but no ever-increasing lag, and
9+
/// does not fill all RAM
10+
11+
#include <iostream>
12+
#include <string>
13+
#include <memory>
14+
#include <atomic>
15+
#include <mutex>
16+
#include <cmath>
17+
18+
#include "mediapipe/framework/calculator_framework.h"
19+
#include "mediapipe/framework/formats/image_frame.h"
20+
#include "mediapipe/framework/formats/image_frame_opencv.h"
21+
#include "mediapipe/framework/port/parse_text_proto.h"
22+
#include "mediapipe/framework/port/status.h"
23+
24+
#include "mediapipe/framework/port/opencv_highgui_inc.h"
25+
#include "mediapipe/framework/port/opencv_imgproc_inc.h"
26+
27+
//==============================================================================
28+
mediapipe::Status run() {
29+
using namespace std;
30+
using namespace mediapipe;
31+
32+
// A graph with FlowLimiterCalculator followed by SlowCalculator
33+
// Note how FlowLimiterCalculator receives the output stream out, the final output
34+
// as a second input stream. A loopback!
35+
// It is used so that FlowLimiterCalculator can see the current buffer size by comparing
36+
// the timestamps of two input streams
37+
string protoG = R"(
38+
input_stream: "in"
39+
output_stream: "out"
40+
node {
41+
calculator: "FlowLimiterCalculator"
42+
input_stream: "in"
43+
input_stream: "FINISHED:out"
44+
input_stream_info: {
45+
tag_index: "FINISHED"
46+
back_edge: true
47+
}
48+
output_stream: "out1"
49+
}
50+
node {
51+
calculator: "SlowCalculator"
52+
input_stream: "IMAGE:out1"
53+
output_stream: "IMAGE:out"
54+
}
55+
)";
56+
57+
// Parse config and create graph
58+
CalculatorGraphConfig config;
59+
if (!ParseTextProto<mediapipe::CalculatorGraphConfig>(protoG, &config)) {
60+
return absl::InternalError("Cannot parse the graph config !");
61+
}
62+
CalculatorGraph graph;
63+
MP_RETURN_IF_ERROR(graph.Initialize(config));
64+
65+
// Mutex protecting imshow() and the stop flag
66+
mutex mutexImshow;
67+
atomic_bool flagStop(false);
68+
69+
// Add observer to "out", then start the graph
70+
// This callback displays the frame on the screen
71+
auto cb = [&mutexImshow, &flagStop](const Packet &packet)->Status{
72+
73+
// Get cv::Mat from the packet
74+
const ImageFrame & outputFrame = packet.Get<ImageFrame>();
75+
cv::Mat ofMat = formats::MatView(&outputFrame);
76+
cv::Mat frameOut;
77+
cvtColor(ofMat, frameOut, cv::COLOR_RGB2BGR);
78+
cout << packet.Timestamp() << ": RECEIVED VIDEO PACKET size = " << frameOut.size() << endl;
79+
80+
{
81+
lock_guard<mutex> lock(mutexImshow);
82+
// Display frame on screen and quit on ESC
83+
cv::imshow("frameOut", frameOut);
84+
if (27 == cv::waitKey(1)){
85+
cout << "It's time to QUIT !" << endl;
86+
flagStop = true;
87+
}
88+
}
89+
return OkStatus();
90+
};
91+
MP_RETURN_IF_ERROR(graph.ObserveOutputStream("out", cb));
92+
graph.StartRun({});
93+
94+
// Start the camera and check that it works
95+
cv::VideoCapture cap(cv::CAP_ANY);
96+
if (!cap.isOpened())
97+
return absl::NotFoundError("CANNOT OPEN CAMERA !");
98+
cv::Mat frameIn, frameInRGB;
99+
100+
// Camera loop, runs until we get flagStop == true
101+
for (int i=0; !flagStop ; ++i){
102+
// Read next frame from camera
103+
cap.read(frameIn);
104+
if (frameIn.empty())
105+
return absl::NotFoundError("CANNOT OPEN CAMERA !");
106+
107+
cout << "SIZE_IN = " << frameIn.size() << endl;
108+
{
109+
lock_guard<mutex> lock(mutexImshow);
110+
cv::imshow("frameIn", frameIn);
111+
}
112+
113+
// Convert it to a packet and send
114+
cv::cvtColor(frameIn, frameInRGB, cv::COLOR_BGR2RGB);
115+
ImageFrame *inputFrame = new ImageFrame(
116+
ImageFormat::SRGB, frameInRGB.cols, frameInRGB.rows, ImageFrame::kDefaultAlignmentBoundary
117+
);
118+
frameInRGB.copyTo(formats::MatView(inputFrame));
119+
Timestamp ts(i);
120+
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream("in",
121+
Adopt(inputFrame).At(ts)
122+
));
123+
124+
}
125+
// Close the input streams, Wait for the graph to finish
126+
graph.CloseInputStream("in");
127+
MP_RETURN_IF_ERROR(graph.WaitUntilDone());
128+
return OkStatus();
129+
}
130+
131+
//==============================================================================
132+
int main(int argc, char** argv){
133+
using namespace std;
134+
135+
FLAGS_alsologtostderr = 1;
136+
google::SetLogDestination(google::GLOG_INFO, ".");
137+
google::InitGoogleLogging(argv[0]);
138+
139+
cout << "Example 3.2 : Packet loss with FlowLimiterCalculator " << endl;
140+
mediapipe::Status status = run();
141+
cout << "status =" << status << endl;
142+
cout << "status.ok() = " << status.ok() << endl;
143+
return 0;
144+
}

0 commit comments

Comments
 (0)