Skip to content

Commit 15bb278

Browse files
committed
Start using UVStream in Tcp Socket
1 parent f539c1c commit 15bb278

File tree

3 files changed

+138
-97
lines changed

3 files changed

+138
-97
lines changed

src/Libuv/Libuv.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
<Reference Include="System" />
4141
</ItemGroup>
4242
<ItemGroup>
43+
<Compile Include="UVStream.cs" />
4344
<Compile Include="ChildProcess.cs" />
4445
<Compile Include="PipeSocket.cs" />
4546
<Compile Include="PipeServer.cs" />

src/Libuv/TcpSocket.cs

Lines changed: 13 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -4,58 +4,16 @@
44

55
namespace Libuv {
66
public class TcpSocket {
7-
static uv_buf_t alloc_cb(IntPtr tcp, IntPtr size)
8-
{
9-
uv_buf_t buf;
10-
buf.data = Marshal.AllocHGlobal((int)size);
11-
#if __MonoCS__
12-
buf.len = size;
13-
#else
14-
buf.len = (ulong)size;
15-
#endif
16-
return buf;
17-
}
18-
static void unmanaged_read_cb(IntPtr stream, IntPtr nread, uv_buf_t buf)
19-
{
20-
int size = (int)nread;
21-
if (size < 0) {
22-
if ((int)buf.data != 0)
23-
Marshal.FreeHGlobal(buf.data);
24-
IntPtr shutdown = Marshal.AllocHGlobal(Sizes.ShutdownTSize);
25-
uv_shutdown(shutdown, stream, after_shutdown);
26-
return;
27-
}
28-
if (size == 0) {
29-
Marshal.FreeHGlobal(buf.data);
30-
return;
31-
}
32-
byte[] data = new byte[size];
33-
Marshal.Copy(buf.data, data, 0, size);
34-
var handle = (uv_handle_t)Marshal.PtrToStructure(stream, typeof(uv_handle_t));
35-
var instance = GCHandle.FromIntPtr(handle.data);
36-
var watcher_instance = (TcpSocket)instance.Target;
37-
watcher_instance.HandleRead(data, size);
38-
Marshal.FreeHGlobal(buf.data);
39-
}
7+
public UVStream Stream { get; private set; }
408
static void unmanaged_connect_cb(IntPtr connection, int status)
419
{
42-
if (status != 0) {
43-
throw new Exception(uv_strerror(uv_last_error()));
44-
}
10+
Util.CheckError(status);
4511
var tmp = (uv_connect_t)Marshal.PtrToStructure(connection, typeof(uv_connect_t));
4612
var handle = (uv_handle_t)Marshal.PtrToStructure(tmp.handle, typeof(uv_handle_t));
4713
var instance = GCHandle.FromIntPtr(handle.data);
48-
var watcher_instance = (TcpSocket)instance.Target;
49-
watcher_instance.HandleConnect();
50-
uv_read_start(tmp.handle, alloc_cb, unmanaged_read_cb);
51-
}
52-
static void after_shutdown(IntPtr shutdown, int status)
53-
{
54-
// It'd be very difficult to get handle out of req
55-
// So we'll store it in data & cast to uv_req_t
56-
uv_shutdown_t tmp = (uv_shutdown_t)Marshal.PtrToStructure(shutdown, typeof(uv_shutdown_t));
57-
uv_close(tmp.handle, on_close);
58-
Marshal.FreeHGlobal(shutdown);
14+
var socket_instance = (TcpSocket)instance.Target;
15+
socket_instance.HandleConnect();
16+
this.Stream = new Stream(socket_instance._handle);
5917
}
6018
static void on_close(IntPtr socket)
6119
{
@@ -66,63 +24,38 @@ static void on_close(IntPtr socket)
6624
watcher_instance.me.Free();
6725
Marshal.FreeHGlobal(socket);
6826
}
69-
static void after_write(IntPtr write_req, int status)
70-
{
71-
var req = (uv_req_t)Marshal.PtrToStructure(write_req, typeof(uv_req_t));
72-
//var handle = GCHandle.FromIntPtr(req.data);
73-
//handle.Free();
74-
Marshal.FreeHGlobal(write_req);
75-
}
7627
private IntPtr _handle;
7728
public event Action<byte[]> OnData;
7829
public event Action OnConnect;
7930
private GCHandle me;
8031
private IntPtr connection;
8132
public TcpSocket()
8233
{
83-
this._handle = Marshal.AllocHGlobal(Sizes.TcpTSize);
84-
uv_tcp_init(this._handle);
34+
this._handle = Marshal.AllocHGlobal(Sizes.TcpT);
35+
Util.CheckError(uv_tcp_init(this._handle));
8536
var handle = (uv_handle_t)Marshal.PtrToStructure(this._handle, typeof(uv_handle_t));
8637
this.me = GCHandle.Alloc(this);
8738
handle.data = GCHandle.ToIntPtr(this.me);
8839
Marshal.StructureToPtr(handle, this._handle, true);
89-
this.connection = Marshal.AllocHGlobal(Sizes.ConnectTSize);
40+
this.connection = Marshal.AllocHGlobal(Sizes.ConnectT);
9041
//can't attach anything to connect_t, it would get nulled
9142
}
9243
public void Connect(IPEndPoint endpoint, Action OnConnect)
9344
{
9445
var info = uv_ip4_addr(endpoint.Address.ToString(), endpoint.Port);
95-
uv_tcp_connect(this.connection, this._handle, info, unmanaged_connect_cb);
46+
Util.CheckError(uv_tcp_connect(this.connection, this._handle, info, unmanaged_connect_cb));
9647
this.OnConnect += OnConnect;
9748
}
9849
public TcpSocket(IntPtr ServerHandle)
9950
{
100-
this._handle = Marshal.AllocHGlobal(Sizes.TcpTSize);
101-
uv_tcp_init(this._handle);
102-
uv_accept(ServerHandle, this._handle);
51+
this._handle = Marshal.AllocHGlobal(Sizes.TcpT);
52+
Util.CheckError(uv_tcp_init(this._handle));
53+
Util.CheckError(uv_accept(ServerHandle, this._handle));
10354
var handle = (uv_handle_t)Marshal.PtrToStructure(this._handle, typeof(uv_handle_t));
10455
this.me = GCHandle.Alloc(this);
10556
handle.data = GCHandle.ToIntPtr(this.me);
10657
Marshal.StructureToPtr(handle, this._handle, true);
107-
uv_read_start(this._handle, alloc_cb, unmanaged_read_cb);
108-
}
109-
public void Write(byte[] data, int length)
110-
{
111-
IntPtr write_request = Marshal.AllocHGlobal(Sizes.WriteTSize);
112-
var dataptrhandle = GCHandle.Alloc(data, GCHandleType.Pinned);
113-
// This is not being freed, which needs to be fixed
114-
IntPtr dat = dataptrhandle.AddrOfPinnedObject();
115-
uv_buf_t[] buf = new uv_buf_t[1];
116-
buf[0].data = dat;
117-
#if __MonoCS__
118-
buf[0].len = (IntPtr)length;
119-
#else
120-
buf[0].len = (ulong)length;
121-
#endif
122-
var req = (uv_req_t)Marshal.PtrToStructure(write_request, typeof(uv_req_t));
123-
req.data = dat;
124-
Marshal.StructureToPtr(req, write_request, true);
125-
uv_write(write_request, this._handle, buf, 1, after_write);
58+
this.Stream = new Stream(this._handle);
12659
}
12760
private void HandleConnect()
12861
{
@@ -131,13 +64,6 @@ private void HandleConnect()
13164
OnConnect();
13265
}
13366
}
134-
private void HandleRead(byte[] data, int nread)
135-
{
136-
if (OnData != null)
137-
{
138-
OnData(data);
139-
}
140-
}
14167
public void Close()
14268
{
14369
uv_close(this._handle, on_close);
@@ -147,20 +73,10 @@ public void Close()
14773
[DllImport ("uv")]
14874
internal static extern int uv_accept(IntPtr socket, IntPtr stream);
14975
[DllImport ("uv")]
150-
internal static extern int uv_read_start(IntPtr stream, uv_alloc_cb alloc_cb, uv_read_cb read);
151-
[DllImport ("uv")]
152-
internal static extern int uv_write(IntPtr req, IntPtr handle, uv_buf_t[] bufs, int bufcnt, uv_write_cb cb);
153-
[DllImport ("uv")]
154-
internal static extern int uv_shutdown(IntPtr shutdown, IntPtr handle, uv_shutdown_cb cb);
155-
[DllImport ("uv")]
15676
internal static extern int uv_close(IntPtr handle, uv_close_cb cb);
15777
[DllImport ("uv")]
15878
internal static extern int uv_tcp_connect(IntPtr connect, IntPtr tcp_handle, sockaddr_in address, uv_connect_cb cb);
15979
[DllImport ("uv")]
16080
internal static extern sockaddr_in uv_ip4_addr(string ip, int port);
161-
[DllImport ("uv")]
162-
public static extern uv_err_t uv_last_error();
163-
[DllImport ("uv")]
164-
public static extern string uv_strerror(uv_err_t err);
16581
}
16682
}

src/Libuv/UVStream.cs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
using System;
2+
using System.Runtime.InteropServices;
3+
4+
namespace Libuv {
5+
public class UVStream {
6+
private IntPtr handle;
7+
private GCHandle gchandle;
8+
public event Action<byte[]> OnRead;
9+
public event Action OnClose;
10+
// Pass an already init-ed uv_handle_t subclass to stream from
11+
public UVStream(IntPtr Handle) {
12+
this.gchandle = GCHandle.Alloc(this);
13+
this.handle = Handle;
14+
//attach handle of ourself to the c struct so we can access instance callbacks in static callbacks
15+
//static callbacks are used as to not force the clr to thunk a new one for every callback on every object
16+
var handle = (uv_handle_t)Marshal.PtrToStructure(this.handle, typeof(uv_handle_t));
17+
this.gchandle = GCHandle.Alloc(this);
18+
handle.data = GCHandle.ToIntPtr(this.gchandle);
19+
Marshal.StructureToPtr(handle, this.handle, true);
20+
}
21+
// Instance Methods
22+
public void ReadStart()
23+
{
24+
Util.CheckError(uv_read_start(this.handle, static_alloc, static_read));
25+
}
26+
public void ReadStop()
27+
{
28+
Util.CheckError(uv_read_stop(this.handle));
29+
}
30+
public void Write(byte[] data, int length)
31+
{
32+
IntPtr write_request = Marshal.AllocHGlobal(Sizes.WriteT);
33+
var dataptrhandle = GCHandle.Alloc(data, GCHandleType.Pinned);
34+
// This is not being freed, which needs to be fixed
35+
IntPtr dat = dataptrhandle.AddrOfPinnedObject();
36+
uv_buf_t[] buf = new uv_buf_t[1];
37+
buf[0].data = dat;
38+
#if __MonoCS__
39+
buf[0].len = (IntPtr)length;
40+
#else
41+
buf[0].len = (ulong)length;
42+
#endif
43+
var req = (uv_req_t)Marshal.PtrToStructure(write_request, typeof(uv_req_t));
44+
req.data = dat;
45+
Marshal.StructureToPtr(req, write_request, true);
46+
Util.CheckError(uv_write(write_request, this.handle, buf, 1, after_write));
47+
}
48+
private void HandleRead(byte[] data)
49+
{
50+
if (OnRead != null)
51+
{
52+
OnRead(data);
53+
}
54+
}
55+
private void HandleClose()
56+
{
57+
if (OnClose != null)
58+
{
59+
OnClose();
60+
}
61+
}
62+
// Static Callbacks
63+
static uv_buf_t static_alloc(IntPtr tcp, IntPtr size)
64+
{
65+
uv_buf_t buf;
66+
buf.data = Marshal.AllocHGlobal((int)size);
67+
#if __MonoCS__
68+
buf.len = size;
69+
#else
70+
buf.len = (ulong)size;
71+
#endif
72+
return buf;
73+
}
74+
static void static_read(IntPtr stream, IntPtr nread, uv_buf_t buf)
75+
{
76+
int size = (int)nread;
77+
if (size < 0) {
78+
if ((int)buf.data != 0)
79+
Marshal.FreeHGlobal(buf.data);
80+
IntPtr shutdown = Marshal.AllocHGlobal(Sizes.ShutdownT);
81+
Util.CheckError(uv_shutdown(shutdown, stream, after_shutdown));
82+
return;
83+
}
84+
if (size == 0) {
85+
Marshal.FreeHGlobal(buf.data);
86+
return;
87+
}
88+
byte[] data = new byte[size];
89+
Marshal.Copy(buf.data, data, 0, size);
90+
var handle = (uv_handle_t)Marshal.PtrToStructure(stream, typeof(uv_handle_t));
91+
var instance_gchandle = GCHandle.FromIntPtr(handle.data);
92+
var stream_instance = (UVStream)instance_gchandle.Target;
93+
stream_instance.HandleRead(data);
94+
Marshal.FreeHGlobal(buf.data);
95+
}
96+
static void after_shutdown(IntPtr shutdown, int status)
97+
{
98+
Util.CheckError(status);
99+
var tmp = (uv_shutdown_t)Marshal.PtrToStructure(shutdown, typeof(uv_shutdown_t));
100+
var handle = (uv_handle_t)Marshal.PtrToStructure(tmp.handle, typeof(uv_handle_t));
101+
var instance = GCHandle.FromIntPtr(handle.data);
102+
var watcher_instance = (UVStream)instance.Target;
103+
watcher_instance.HandleClose();
104+
Marshal.FreeHGlobal(shutdown);
105+
}
106+
static void after_write(IntPtr write_req, int status)
107+
{
108+
Util.CheckError(status);
109+
var req = (uv_req_t)Marshal.PtrToStructure(write_req, typeof(uv_req_t));
110+
//var handle = GCHandle.FromIntPtr(req.data);
111+
//handle.Free();
112+
Marshal.FreeHGlobal(write_req);
113+
}
114+
// Externs
115+
[DllImport ("uv")]
116+
internal static extern int uv_read_start(IntPtr stream, uv_alloc_cb alloc_cb, uv_read_cb read);
117+
[DllImport ("uv")]
118+
internal static extern int uv_read_stop(IntPtr stream);
119+
[DllImport ("uv")]
120+
internal static extern int uv_write(IntPtr req, IntPtr handle, uv_buf_t[] bufs, int bufcnt, uv_write_cb cb);
121+
[DllImport ("uv")]
122+
internal static extern int uv_shutdown(IntPtr req, IntPtr handle, uv_shutdown_cb cb);
123+
}
124+
}

0 commit comments

Comments
 (0)