Skip to content

Commit 056611f

Browse files
committed
Add pipe support
1 parent 7287589 commit 056611f

File tree

4 files changed

+258
-6
lines changed

4 files changed

+258
-6
lines changed

src/Libuv.Tests/webserver.cs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ static void Main ()
1717
uv_init();
1818

1919
var watch = new PrepareWatcher(() => {
20-
Console.WriteLine("Prepare Watcher Called");
20+
//Console.WriteLine("Prepare Watcher Called");
2121
});
2222
watch.Start();
2323
var server = new TcpServer((socket) => {
@@ -47,24 +47,51 @@ static void Main ()
4747
byte[] message = System.Text.Encoding.ASCII.GetBytes("Hello World\n");
4848
client.Write(message, message.Length);
4949
});
50+
var pipeserver = new PipeServer((socket) => {
51+
clientcount++;
52+
socket.Write(System.Text.Encoding.ASCII.GetBytes(clientcount.ToString()), 1);
53+
if (clientcount > 5) {
54+
socket.Close();
55+
}
56+
Console.WriteLine("Pipe Client Connected");
57+
socket.OnData += (data) => {
58+
Console.WriteLine("Pipe Data Recieved: {0}", System.Text.Encoding.ASCII.GetString(data, 0, data.Length));
59+
socket.Write(data, data.Length);
60+
};
61+
//socket.OnClose += () => {
62+
// Console.WriteLine("Client Disconnected");
63+
//};
64+
});
65+
pipeserver.Listen("libuv-csharp");
66+
var pipeclient = new PipeSocket();
67+
pipeclient.OnData += (data) => {
68+
Console.WriteLine("Pipe Client Recieved: {0}", System.Text.Encoding.ASCII.GetString(data, 0, data.Length));
69+
watch.Stop();
70+
watch.Dispose();
71+
pipeclient.Close();
72+
};
73+
pipeclient.Connect("libuv-csharp", () => {
74+
byte[] message = System.Text.Encoding.ASCII.GetBytes("Hello World\n");
75+
pipeclient.Write(message, message.Length);
76+
});
5077
var watch2 = new PrepareWatcher(() => {
51-
Console.WriteLine("Prepare Watcher 2 Called");
78+
//Console.WriteLine("Prepare Watcher 2 Called");
5279
});
5380
watch2.Start();
5481
var check = new CheckWatcher(() => {
55-
Console.WriteLine("Check Watcher Called");
82+
//Console.WriteLine("Check Watcher Called");
5683
});
5784
check.Start();
5885
var idle = new IdleWatcher(() => {
59-
Console.WriteLine("Idle Watcher Called");
86+
//Console.WriteLine("Idle Watcher Called");
6087
});
6188
idle.Start();
6289
var after = new TimerWatcher(new TimeSpan(0,0,5), new TimeSpan(1,0,0), () => {
63-
Console.WriteLine("After 5 Seconds");
90+
//Console.WriteLine("After 5 Seconds");
6491
});
6592
after.Start();
6693
var every = new TimerWatcher(new TimeSpan(0,0,5), () => {
67-
Console.WriteLine("Every 5 Seconds");
94+
//Console.WriteLine("Every 5 Seconds");
6895
// after.Stop();
6996
});
7097
every.Start();

src/Libuv/Libuv.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
<Reference Include="System" />
4141
</ItemGroup>
4242
<ItemGroup>
43+
<Compile Include="PipeSocket.cs" />
44+
<Compile Include="PipeServer.cs" />
4345
<Compile Include="TcpSocket.cs" />
4446
<Compile Include="TcpServer.cs" />
4547
<Compile Include="Watcher.cs" />

src/Libuv/PipeServer.cs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
using System;
2+
using System.Net;
3+
using System.Runtime.InteropServices;
4+
5+
namespace Libuv {
6+
public class PipeServer {
7+
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
8+
public delegate void uv_connection_cb(IntPtr server, int status);
9+
[DllImport("uv")]
10+
internal static extern int uv_pipe_init(IntPtr prepare);
11+
[DllImport("uv")]
12+
internal static extern int uv_pipe_bind(IntPtr prepare, string name);
13+
[DllImport("uv")]
14+
internal static extern int uv_listen(IntPtr stream, int backlog, uv_connection_cb cb);
15+
16+
private static uv_connection_cb unmanaged_callback;
17+
18+
static PipeServer()
19+
{
20+
unmanaged_callback = StaticCallback;
21+
}
22+
23+
private Action<PipeSocket> callback;
24+
private IntPtr _handle;
25+
private GCHandle me;
26+
27+
public PipeServer(Action<PipeSocket> callback)
28+
{
29+
this.callback = callback;
30+
this._handle = Marshal.AllocHGlobal(Sizes.PipeTSize);
31+
uv_pipe_init(this._handle);
32+
var handle = (uv_handle_t)Marshal.PtrToStructure(this._handle, typeof(uv_handle_t));
33+
this.me = GCHandle.Alloc(this);
34+
handle.data = GCHandle.ToIntPtr(this.me);
35+
Marshal.StructureToPtr(handle, this._handle, true);
36+
}
37+
public void Listen(string endpoint)
38+
{
39+
uv_pipe_bind(this._handle, endpoint);
40+
uv_listen(this._handle, 128, unmanaged_callback);
41+
}
42+
public static void StaticCallback(IntPtr server_ptr, int status)
43+
{
44+
var handle = (uv_handle_t)Marshal.PtrToStructure(server_ptr, typeof(uv_handle_t));
45+
var instance = GCHandle.FromIntPtr(handle.data);
46+
var server = (PipeServer)instance.Target;
47+
server.callback(new PipeSocket(server._handle));
48+
}
49+
}
50+
}

src/Libuv/PipeSocket.cs

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
using System;
2+
using System.Net;
3+
using System.Runtime.InteropServices;
4+
5+
namespace Libuv {
6+
public class PipeSocket {
7+
static uv_buf_t alloc_cb(IntPtr pipe, IntPtr size)
8+
{
9+
uv_buf_t buf;
10+
buf.data = Marshal.AllocHGlobal((int)size);
11+
#if __MonoCS__
12+
buf.len = size;
13+
#else
14+
buf.len = (ulong)size;
15+
#endif
16+
return buf;
17+
}
18+
static void unmanaged_read_cb(IntPtr stream, IntPtr nread, uv_buf_t buf)
19+
{
20+
int size = (int)nread;
21+
if (size < 0) {
22+
if ((int)buf.data != 0)
23+
Marshal.FreeHGlobal(buf.data);
24+
IntPtr shutdown = Marshal.AllocHGlobal(Sizes.ShutdownTSize);
25+
uv_shutdown(shutdown, stream, after_shutdown);
26+
return;
27+
}
28+
if (size == 0) {
29+
Marshal.FreeHGlobal(buf.data);
30+
return;
31+
}
32+
byte[] data = new byte[size];
33+
Marshal.Copy(buf.data, data, 0, size);
34+
var handle = (uv_handle_t)Marshal.PtrToStructure(stream, typeof(uv_handle_t));
35+
var instance = GCHandle.FromIntPtr(handle.data);
36+
var watcher_instance = (PipeSocket)instance.Target;
37+
watcher_instance.HandleRead(data, size);
38+
Marshal.FreeHGlobal(buf.data);
39+
}
40+
static void unmanaged_connect_cb(IntPtr connection, int status)
41+
{
42+
if (status != 0) {
43+
throw new Exception(uv_strerror(uv_last_error()));
44+
}
45+
var tmp = (uv_connect_t)Marshal.PtrToStructure(connection, typeof(uv_connect_t));
46+
var handle = (uv_handle_t)Marshal.PtrToStructure(tmp.handle, typeof(uv_handle_t));
47+
var instance = GCHandle.FromIntPtr(handle.data);
48+
var watcher_instance = (PipeSocket)instance.Target;
49+
watcher_instance.HandleConnect();
50+
uv_read_start(tmp.handle, alloc_cb, unmanaged_read_cb);
51+
}
52+
static void after_shutdown(IntPtr shutdown, int status)
53+
{
54+
// It'd be very difficult to get handle out of req
55+
// So we'll store it in data & cast to uv_req_t
56+
uv_shutdown_t tmp = (uv_shutdown_t)Marshal.PtrToStructure(shutdown, typeof(uv_shutdown_t));
57+
uv_close(tmp.handle, on_close);
58+
Marshal.FreeHGlobal(shutdown);
59+
}
60+
static void on_close(IntPtr socket)
61+
{
62+
var handle = (uv_handle_t)Marshal.PtrToStructure(socket, typeof(uv_handle_t));
63+
var instance = GCHandle.FromIntPtr(handle.data);
64+
var watcher_instance = (PipeSocket)instance.Target;
65+
//dont think this is what should happen here
66+
watcher_instance.me.Free();
67+
Marshal.FreeHGlobal(socket);
68+
}
69+
static void after_write(IntPtr write_req, int status)
70+
{
71+
var req = (uv_req_t)Marshal.PtrToStructure(write_req, typeof(uv_req_t));
72+
//var handle = GCHandle.FromIntPtr(req.data);
73+
//handle.Free();
74+
Marshal.FreeHGlobal(write_req);
75+
}
76+
private IntPtr _handle;
77+
public event Action<byte[]> OnData;
78+
public event Action OnConnect;
79+
private GCHandle me;
80+
private IntPtr connection;
81+
public PipeSocket()
82+
{
83+
this._handle = Marshal.AllocHGlobal(Sizes.PipeTSize);
84+
uv_pipe_init(this._handle);
85+
var handle = (uv_handle_t)Marshal.PtrToStructure(this._handle, typeof(uv_handle_t));
86+
this.me = GCHandle.Alloc(this);
87+
handle.data = GCHandle.ToIntPtr(this.me);
88+
Marshal.StructureToPtr(handle, this._handle, true);
89+
this.connection = Marshal.AllocHGlobal(Sizes.ConnectTSize);
90+
//can't attach anything to connect_t, it would get nulled
91+
}
92+
public void Connect(string path, Action OnConnect)
93+
{
94+
uv_pipe_connect(this.connection, this._handle, path, unmanaged_connect_cb);
95+
this.OnConnect += OnConnect;
96+
}
97+
public PipeSocket(IntPtr ServerHandle)
98+
{
99+
this._handle = Marshal.AllocHGlobal(Sizes.PipeTSize);
100+
uv_pipe_init(this._handle);
101+
uv_accept(ServerHandle, this._handle);
102+
var handle = (uv_handle_t)Marshal.PtrToStructure(this._handle, typeof(uv_handle_t));
103+
this.me = GCHandle.Alloc(this);
104+
handle.data = GCHandle.ToIntPtr(this.me);
105+
Marshal.StructureToPtr(handle, this._handle, true);
106+
uv_read_start(this._handle, alloc_cb, unmanaged_read_cb);
107+
}
108+
public void Write(byte[] data, int length)
109+
{
110+
IntPtr write_request = Marshal.AllocHGlobal(Sizes.WriteTSize);
111+
var dataptrhandle = GCHandle.Alloc(data, GCHandleType.Pinned);
112+
// This is not being freed, which needs to be fixed
113+
IntPtr dat = dataptrhandle.AddrOfPinnedObject();
114+
uv_buf_t[] buf = new uv_buf_t[1];
115+
buf[0].data = dat;
116+
#if __MonoCS__
117+
buf[0].len = (IntPtr)length;
118+
#else
119+
buf[0].len = (ulong)length;
120+
#endif
121+
var req = (uv_req_t)Marshal.PtrToStructure(write_request, typeof(uv_req_t));
122+
req.data = dat;
123+
Marshal.StructureToPtr(req, write_request, true);
124+
uv_write(write_request, this._handle, buf, 1, after_write);
125+
}
126+
private void HandleConnect()
127+
{
128+
if (OnConnect != null)
129+
{
130+
OnConnect();
131+
}
132+
}
133+
private void HandleRead(byte[] data, int nread)
134+
{
135+
if (OnData != null)
136+
{
137+
OnData(data);
138+
}
139+
}
140+
public void Close()
141+
{
142+
uv_close(this._handle, on_close);
143+
}
144+
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
145+
public delegate void uv_shutdown_cb(IntPtr req, int status);
146+
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
147+
public delegate uv_buf_t uv_alloc_cb(IntPtr stream, IntPtr suggested_size);
148+
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
149+
public delegate void uv_read_cb(IntPtr req, IntPtr nread, uv_buf_t buf);
150+
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
151+
public delegate void uv_write_cb(IntPtr req, int status);
152+
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
153+
public delegate void uv_connect_cb(IntPtr conn, int status);
154+
[DllImport("uv")]
155+
internal static extern int uv_pipe_init(IntPtr prepare);
156+
[DllImport ("uv")]
157+
internal static extern int uv_accept(IntPtr socket, IntPtr stream);
158+
[DllImport ("uv")]
159+
internal static extern int uv_read_start(IntPtr stream, uv_alloc_cb alloc_cb, uv_read_cb read);
160+
[DllImport ("uv")]
161+
internal static extern int uv_write(IntPtr req, IntPtr handle, uv_buf_t[] bufs, int bufcnt, uv_write_cb cb);
162+
[DllImport ("uv")]
163+
internal static extern int uv_shutdown(IntPtr shutdown, IntPtr handle, uv_shutdown_cb cb);
164+
[DllImport ("uv")]
165+
internal static extern int uv_close(IntPtr handle, uv_close_cb cb);
166+
[DllImport ("uv")]
167+
internal static extern int uv_pipe_connect(IntPtr connect, IntPtr tcp_handle, string path, uv_connect_cb cb);
168+
[DllImport ("uv")]
169+
public static extern uv_err_t uv_last_error();
170+
[DllImport ("uv")]
171+
public static extern string uv_strerror(uv_err_t err);
172+
}
173+
}

0 commit comments

Comments
 (0)