Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: Alemiz112/Network
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: master
Choose a base ref
...
head repository: Alemiz112/Network
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: improvements-and-metrics
Choose a head ref

Commits on Jan 9, 2021

  1. Copy the full SHA
    a3f72a4 View commit details
  2. Copy the full SHA
    eda0c52 View commit details
  3. Close session with client

    Alemiz112 committed Jan 9, 2021
    Copy the full SHA
    30cb7d1 View commit details

Commits on Jan 12, 2021

  1. Copy the full SHA
    779741a View commit details
  2. Copy the full SHA
    384a8c6 View commit details

Commits on Jan 17, 2021

  1. Copy the full SHA
    1b30447 View commit details

Commits on Feb 3, 2021

  1. Merge pull request CloudburstMC#20 from Alemiz112/next-mirror

    Several improvements
    SupremeMortal authored Feb 3, 2021
    Copy the full SHA
    45ddfe2 View commit details

Commits on Feb 6, 2021

  1. Copy the full SHA
    38c1e87 View commit details
  2. Copy the full SHA
    7b68a77 View commit details
  3. Copy the full SHA
    0498bf3 View commit details

Commits on Feb 7, 2021

  1. Copy the full SHA
    7c2aeca View commit details

Commits on Feb 17, 2021

  1. RakNet server HAProxy protocol support (CloudburstMC#18)

    * Add PROXY protocol support
    
    PROXY header decoding utilities are based on netty-codec-haproxy with
    few features (such as TLV support) removed.
    
    * Expose both connection and proxied remote address
    
    * Add utility method for getting source InetSocketAddress
    
    * Prevent storing remote address when session already exists; Also accept header when there's no session
    
    * Improve PROXY header detection and caching
    
    * Remove resource leak detector
    
    HAProxyMessage does not store TLVs, thus it does not need manual resources releasing.
    
    Co-authored-by: SupremeMortal <6178101+SupremeMortal@users.noreply.github.com>
    mikroskeem and SupremeMortal authored Feb 17, 2021
    Copy the full SHA
    a94d2dd View commit details

Commits on Mar 19, 2021

  1. Copy the full SHA
    7f239ca View commit details

Commits on Apr 1, 2021

  1. Copy the full SHA
    a2f7e1f View commit details

Commits on Apr 2, 2021

  1. Metrics

    Alemiz112 committed Apr 2, 2021
    Copy the full SHA
    9a2901a View commit details
  2. Copy the full SHA
    569e02a View commit details
  3. Stuff

    Alemiz112 committed Apr 2, 2021
    Copy the full SHA
    f594e6c View commit details
  4. More stuff

    Alemiz112 committed Apr 2, 2021
    Copy the full SHA
    66ab979 View commit details
  5. Copy the full SHA
    6935e5e View commit details

Commits on Apr 12, 2021

  1. Copy the full SHA
    b66ed38 View commit details
  2. Cleanup and improvements

    Alemiz112 committed Apr 12, 2021
    Copy the full SHA
    f347215 View commit details

Commits on Apr 13, 2021

  1. Copy the full SHA
    87633d7 View commit details

Commits on Apr 20, 2021

  1. Stuff

    Alemiz112 committed Apr 20, 2021
    Copy the full SHA
    23a26a6 View commit details
  2. Bump version

    Alemiz112 committed Apr 20, 2021
    Copy the full SHA
    17b7527 View commit details

Commits on Apr 30, 2021

  1. Copy the full SHA
    7da2727 View commit details
Showing with 2,255 additions and 481 deletions.
  1. +70 −21 Jenkinsfile
  2. +3 −3 README.md
  3. +1 −1 common/pom.xml
  4. +4 −0 common/src/main/java/com/nukkitx/network/SessionConnection.java
  5. +1 −14 pom.xml
  6. +1 −1 query/pom.xml
  7. +7 −1 raknet/pom.xml
  8. +31 −0 raknet/src/main/java/com/nukkitx/network/raknet/RakMetrics.java
  9. +60 −30 raknet/src/main/java/com/nukkitx/network/raknet/RakNet.java
  10. +94 −101 raknet/src/main/java/com/nukkitx/network/raknet/RakNetClient.java
  11. +4 −0 raknet/src/main/java/com/nukkitx/network/raknet/RakNetConstants.java
  12. +116 −164 raknet/src/main/java/com/nukkitx/network/raknet/RakNetServer.java
  13. +16 −1 raknet/src/main/java/com/nukkitx/network/raknet/RakNetServerListener.java
  14. +187 −142 raknet/src/main/java/com/nukkitx/network/raknet/RakNetSession.java
  15. +2 −1 raknet/src/main/java/com/nukkitx/network/raknet/RakNetUtils.java
  16. +67 −0 raknet/src/main/java/com/nukkitx/network/raknet/pipeline/ClientMessageHandler.java
  17. +71 −0 raknet/src/main/java/com/nukkitx/network/raknet/pipeline/ProxyServerHandler.java
  18. +25 −0 raknet/src/main/java/com/nukkitx/network/raknet/pipeline/RakExceptionHandler.java
  19. +31 −0 raknet/src/main/java/com/nukkitx/network/raknet/pipeline/RakOutboundHandler.java
  20. +85 −0 raknet/src/main/java/com/nukkitx/network/raknet/pipeline/ServerDatagramHandler.java
  21. +36 −0 raknet/src/main/java/com/nukkitx/network/raknet/pipeline/ServerMessageHandler.java
  22. +70 −0 raknet/src/main/java/com/nukkitx/network/raknet/proxy/HAProxyCommand.java
  23. +475 −0 raknet/src/main/java/com/nukkitx/network/raknet/proxy/HAProxyMessage.java
  24. +52 −0 raknet/src/main/java/com/nukkitx/network/raknet/proxy/HAProxyProtocolException.java
  25. +69 −0 raknet/src/main/java/com/nukkitx/network/raknet/proxy/HAProxyProtocolVersion.java
  26. +237 −0 raknet/src/main/java/com/nukkitx/network/raknet/proxy/HAProxyProxiedProtocol.java
  27. +108 −0 raknet/src/main/java/com/nukkitx/network/raknet/proxy/ProxyProtocolConstants.java
  28. +331 −0 raknet/src/main/java/com/nukkitx/network/raknet/proxy/ProxyProtocolDecoder.java
  29. +1 −1 rcon/pom.xml
91 changes: 70 additions & 21 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -7,37 +7,86 @@ pipeline {
options {
buildDiscarder(logRotator(artifactNumToKeepStr: '1'))
}

stages {
stage ('Build') {
when { not { anyOf {
branch 'master'
branch 'develop'
}}}

steps {
sh 'mvn clean package'
}
}

stage ('Javadoc') {
stage ('Deploy') {
when {
branch "master"
}
steps {
sh 'mvn javadoc:aggregate -DskipTests'
}
post {
success {
step([
$class: 'JavadocArchiver',
javadocDir: 'target/site/apidocs',
keepAll: true
])
anyOf {
branch 'master'
branch 'develop'
}
}
}

stage ('Deploy') {
when {
branch "master"
}
steps {
sh 'mvn javadoc:jar source:jar deploy -DskipTests'
stages {
stage('Setup') {
steps {
rtMavenDeployer(
id: "maven-deployer",
serverId: "opencollab-artifactory",
releaseRepo: "maven-releases",
snapshotRepo: "maven-snapshots"
)
rtMavenResolver(
id: "maven-resolver",
serverId: "opencollab-artifactory",
releaseRepo: "release",
snapshotRepo: "snapshot"
)
}
}

stage('Release') {
when {
branch 'master'
}

steps {
rtMavenRun(
pom: 'pom.xml',
goals: 'javadoc:jar source:jar install',
deployerId: "maven-deployer",
resolverId: "maven-resolver"
)
sh 'mvn javadoc:aggregate -DskipTests'
}
post {
success {
step([$class: 'JavadocArchiver', javadocDir: 'target/site/apidocs', keepAll: false])
}
}
}

stage('Snapshot') {
when {
branch 'develop'
}
steps {
rtMavenRun(
pom: 'pom.xml',
goals: 'javadoc:jar source:jar install',
deployerId: "maven-deployer",
resolverId: "maven-resolver"
)
}
}

stage('Publish') {
steps {
rtPublishBuildInfo(
serverId: "opencollab-artifactory"
)
}
}
}
}
}
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -24,19 +24,19 @@ Network components used within NukkitX.
<dependency>
<groupId>com.nukkitx.network</groupId>
<artifactId>query</artifactId>
<version>1.6.25</version>
<version>1.6.27-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.nukkitx.network</groupId>
<artifactId>raknet</artifactId>
<version>1.6.25</version>
<version>1.6.27-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.nukkitx.network</groupId>
<artifactId>rcon</artifactId>
<version>1.6.25</version>
<version>1.6.27-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
2 changes: 1 addition & 1 deletion common/pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<artifactId>network-parent</artifactId>
<groupId>com.nukkitx.network</groupId>
<version>1.6.25</version>
<version>1.6.27-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Original file line number Diff line number Diff line change
@@ -8,6 +8,10 @@ public interface SessionConnection<T> {

InetSocketAddress getAddress();

default InetSocketAddress getRealAddress() {
return getAddress();
}

void close();

void close(DisconnectReason reason);
15 changes: 1 addition & 14 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@
<groupId>com.nukkitx.network</groupId>
<artifactId>network-parent</artifactId>
<packaging>pom</packaging>
<version>1.6.25</version>
<version>1.6.27-SNAPSHOT</version>
<inceptionYear>2018</inceptionYear>
<url>https://github.com/NukkitX/Network</url>

@@ -51,19 +51,6 @@
<url>https://github.com/NukkitX/Network</url>
</scm>

<distributionManagement>
<repository>
<id>releases</id>
<name>opencollab-releases</name>
<url>https://repo.opencollab.dev/maven-releases</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<name>opencollab-snapshots</name>
<url>https://repo.opencollab.dev/maven-snapshots</url>
</snapshotRepository>
</distributionManagement>

<repositories>
<repository>
<id>opencollab-repo-release</id>
2 changes: 1 addition & 1 deletion query/pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<artifactId>network-parent</artifactId>
<groupId>com.nukkitx.network</groupId>
<version>1.6.25</version>
<version>1.6.27-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

8 changes: 7 additions & 1 deletion raknet/pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<artifactId>network-parent</artifactId>
<groupId>com.nukkitx.network</groupId>
<version>1.6.25</version>
<version>1.6.27-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

@@ -24,5 +24,11 @@
<version>${netty.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>net.jodah</groupId>
<artifactId>expiringmap</artifactId>
<version>0.5.9</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
31 changes: 31 additions & 0 deletions raknet/src/main/java/com/nukkitx/network/raknet/RakMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.nukkitx.network.raknet;

public interface RakMetrics {

default void bytesIn(int count) {
}

default void bytesOut(int count) {
}

default void rakDatagramsIn(int count) {
}

default void rakDatagramsOut(int count) {
}

default void rakStaleDatagrams(int count) {
}

default void ackIn(int count) {
}

default void ackOut(int count) {
}

default void nackIn(int count) {
}

default void nackOut(int count) {
}
}
90 changes: 60 additions & 30 deletions raknet/src/main/java/com/nukkitx/network/raknet/RakNet.java
Original file line number Diff line number Diff line change
@@ -3,65 +3,67 @@
import com.nukkitx.network.util.Bootstraps;
import com.nukkitx.network.util.Preconditions;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.DatagramPacket;

import javax.annotation.Nonnegative;
import javax.annotation.ParametersAreNonnullByDefault;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

@ParametersAreNonnullByDefault
public abstract class RakNet implements AutoCloseable {
final long guid = ThreadLocalRandom.current().nextLong();
final Bootstrap bootstrap;
final InetSocketAddress bindAddress;
protected final long guid = ThreadLocalRandom.current().nextLong();
protected int protocolVersion = RakNetConstants.RAKNET_PROTOCOL_VERSION;

private final AtomicBoolean running = new AtomicBoolean(false);
protected final AtomicBoolean closed = new AtomicBoolean(false);
protected final Bootstrap bootstrap;
private ScheduledFuture<?> tickFuture;
int protocolVersion = RakNetConstants.RAKNET_PROTOCOL_VERSION;
private volatile boolean closed;

RakNet(InetSocketAddress bindAddress, EventLoopGroup eventLoopGroup) {
this.bindAddress = bindAddress;
protected final Map<String, Consumer<Throwable>> exceptionHandlers = new HashMap<>();
protected RakMetrics metrics;

RakNet(EventLoopGroup eventLoopGroup) {
this.bootstrap = new Bootstrap().option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);

Bootstraps.setupBootstrap(this.bootstrap, true);
this.bootstrap.group(eventLoopGroup);
}

static void send(ChannelHandlerContext ctx, InetSocketAddress recipient, ByteBuf buffer) {
ctx.writeAndFlush(new DatagramPacket(buffer, recipient), ctx.voidPromise());
Bootstraps.setupBootstrap(this.bootstrap, true);
}

public CompletableFuture<Void> bind() {
Preconditions.checkState(this.running.compareAndSet(false, true), "RakNet has already been started");

CompletableFuture<Void> future = bindInternal();
CompletableFuture<Void> future = this.bindInternal();

future.whenComplete((aVoid, throwable) -> {
if (throwable != null) {
// Failed to start. Set running to false
this.running.compareAndSet(true, false);
} else {
this.closed = false;
this.tickFuture = this.getEventLoopGroup().next().scheduleAtFixedRate(this::onTick,
0, 10, TimeUnit.MILLISECONDS);
return;
}

this.closed.set(false);
this.tickFuture = this.nextEventLoop().scheduleAtFixedRate(this::onTick, 0, 10, TimeUnit.MILLISECONDS);
});
return future;
}

public void close() {
this.closed = true;
this.close(false);
}

public void close(boolean force) {
this.closed.set(true);
if (this.tickFuture != null) {
this.tickFuture.cancel(false);
}
@@ -76,31 +78,59 @@ public boolean isRunning() {
}

public boolean isClosed() {
return closed;
return this.closed.get();
}

public Bootstrap getBootstrap() {
return bootstrap;
return this.bootstrap;
}

@Nonnegative
public int getProtocolVersion() {
return protocolVersion;
return this.protocolVersion;
}

public void setProtocolVersion(@Nonnegative int protocolVersion) {
this.protocolVersion = protocolVersion;
}

public InetSocketAddress getBindAddress() {
return bindAddress;
}
public abstract InetSocketAddress getBindAddress();

public long getGuid() {
return guid;
return this.guid;
}

protected EventLoopGroup getEventLoopGroup() {
return this.bootstrap.config().group();
}

protected EventLoop nextEventLoop() {
return this.getEventLoopGroup().next();
}

public void setMetrics(RakMetrics metrics) {
this.metrics = metrics;
}

public RakMetrics getMetrics() {
return this.metrics;
}

public void addExceptionHandler(String handlerId, Consumer<Throwable> handler) {
Objects.requireNonNull(handlerId, "handlerId is empty");
Objects.requireNonNull(handler, "clientExceptionHandler (handler is null)");
this.exceptionHandlers.put(handlerId, handler);
}

public void removeExceptionHandler(String handlerId) {
this.exceptionHandlers.remove(handlerId);
}

public void clearExceptionHandlers() {
this.exceptionHandlers.clear();
}

public Collection<Consumer<Throwable>> getExceptionHandlers() {
return this.exceptionHandlers.values();
}
}
195 changes: 94 additions & 101 deletions raknet/src/main/java/com/nukkitx/network/raknet/RakNetClient.java
Original file line number Diff line number Diff line change
@@ -1,53 +1,70 @@
package com.nukkitx.network.raknet;

import com.nukkitx.network.raknet.pipeline.ClientMessageHandler;
import com.nukkitx.network.raknet.pipeline.RakExceptionHandler;
import com.nukkitx.network.raknet.pipeline.RakOutboundHandler;
import com.nukkitx.network.util.EventLoops;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import lombok.RequiredArgsConstructor;

import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import static com.nukkitx.network.raknet.RakNetConstants.*;

@ParametersAreNonnullByDefault
public class RakNetClient extends RakNet {
private static final InternalLogger log = InternalLoggerFactory.getInstance(RakNetClient.class);
private final ClientDatagramHandler handler = new ClientDatagramHandler();
private final Queue<PongEntry> inboundPongs = PlatformDependent.newMpscQueue();
private final Map<InetSocketAddress, PingEntry> pings = new HashMap<>();
private final Map<String, Consumer<Throwable>> exceptionHandlers = new HashMap<>();
private final Map<InetSocketAddress, PingEntry> pings = new ConcurrentHashMap<>();

RakNetClientSession session;
protected InetSocketAddress bindAddress;
protected RakNetClientSession session;
private Channel channel;
private EventLoop tickingEventLoop;

public RakNetClient() {
this(null, EventLoops.commonGroup());
}

public RakNetClient(InetSocketAddress bindAddress) {
this(bindAddress, EventLoops.commonGroup());
}

public RakNetClient(InetSocketAddress bindAddress, EventLoopGroup eventLoopGroup) {
super(bindAddress, eventLoopGroup);
exceptionHandlers.put("DEFAULT", (t) -> log.error("An exception occurred in RakNet (Client)", t));
public RakNetClient(@Nullable InetSocketAddress bindAddress, EventLoopGroup eventLoopGroup) {
super(eventLoopGroup);
this.bindAddress = bindAddress;
this.exceptionHandlers.put("DEFAULT", (t) -> log.error("An exception occurred in RakNet Client, address="+bindAddress, t));
}

@Override
protected CompletableFuture<Void> bindInternal() {
ChannelFuture channelFuture = this.bootstrap.handler(this.handler).bind(this.bindAddress);
this.bootstrap.handler(new ClientChannelInitializer());
ChannelFuture channelFuture = this.bindAddress == null? this.bootstrap.bind() : this.bootstrap.bind(this.bindAddress);

CompletableFuture<Void> future = new CompletableFuture<>();
channelFuture.addListener(future1 -> {
if (future1.cause() != null) {
future.completeExceptionally(future1.cause());
channelFuture.addListener((ChannelFuture promise) -> {
if (promise.cause() != null) {
future.completeExceptionally(promise.cause());
return;
}

SocketAddress address = promise.channel().localAddress();
if (!(address instanceof InetSocketAddress)) {
future.completeExceptionally(new IllegalArgumentException("Excepted InetSocketAddress but got "+address.getClass().getSimpleName()));
return;
}
this.bindAddress = (InetSocketAddress) address;
future.complete(null);
});
return future;
@@ -57,7 +74,7 @@ public RakNetClientSession connect(InetSocketAddress address) {
if (!this.isRunning()) {
throw new IllegalStateException("RakNet has not been started");
}
if (session != null) {
if (this.session != null) {
throw new IllegalStateException("Session has already been created");
}

@@ -78,29 +95,16 @@ public CompletableFuture<RakNetPong> ping(InetSocketAddress address, long timeou
return this.pings.get(address).future;
}

long curTime = System.currentTimeMillis();
CompletableFuture<RakNetPong> pongFuture = new CompletableFuture<>();

PingEntry entry = new PingEntry(pongFuture, System.currentTimeMillis() + unit.toMillis(timeout));
PingEntry entry = new PingEntry(pongFuture, curTime + unit.toMillis(timeout));
entry.sendTime = curTime;
this.pings.put(address, entry);
this.sendUnconnectedPing(address);

return pongFuture;
}

public void addExceptionHandler(String handlerId, Consumer<Throwable> handler) {
Objects.requireNonNull(handlerId, "handlerId is empty (client)");
Objects.requireNonNull(handler, "clientExceptionHandler (handler is null)");
this.exceptionHandlers.put(handlerId, handler);
}

public void clearExceptionHandlers() {
this.exceptionHandlers.clear();
}

public void removeExceptionHandler(String handlerId) {
this.exceptionHandlers.remove(handlerId);
}

@Override
protected void onTick() {
final long curTime = System.currentTimeMillis();
@@ -109,50 +113,52 @@ protected void onTick() {
session.eventLoop.execute(() -> session.onTick(curTime));
}

PongEntry pong;
while ((pong = this.inboundPongs.poll()) != null) {
PingEntry ping = this.pings.remove(pong.address);
if (ping == null) {
continue;
}

ping.future.complete(new RakNetPong(pong.pingTime, curTime, pong.guid, pong.userData));
}

Iterator<PingEntry> iterator = this.pings.values().iterator();
Iterator<Map.Entry<InetSocketAddress, PingEntry>> iterator = this.pings.entrySet().iterator();
while (iterator.hasNext()) {
PingEntry entry = iterator.next();
if (curTime >= entry.timeout) {
entry.future.completeExceptionally(new TimeoutException());
Map.Entry<InetSocketAddress, PingEntry> entry = iterator.next();
PingEntry ping = entry.getValue();
if (curTime >= ping.timeout) {
ping.future.completeExceptionally(new TimeoutException());
iterator.remove();
} else if ((curTime - ping.sendTime) >= RAKNET_PING_INTERVAL) {
ping.sendTime = curTime;
this.sendUnconnectedPing(entry.getKey());
}
}
}

@Override
public void close() {
super.close();

if (channel != null) {
channel.close().syncUninterruptibly();
public void onUnconnectedPong(PongEntry entry) {
EventLoop eventLoop = this.nextEventLoop();
if (eventLoop.inEventLoop()) {
this.onUnconnectedPong0(entry);
} else {
eventLoop.execute(() -> this.onUnconnectedPong0(entry));
}
}

private void onUnconnectedPong(DatagramPacket packet) {
ByteBuf content = packet.content();
long pingTime = content.readLong();
long guid = content.readLong();
if (!RakNetUtils.verifyUnconnectedMagic(content)) {
private void onUnconnectedPong0(PongEntry pong) {
PingEntry ping = this.pings.remove(pong.address);
if (ping != null) {
ping.future.complete(new RakNetPong(pong.pingTime, System.currentTimeMillis(), pong.guid, pong.userData));
return;
}

byte[] userData = null;
if (content.isReadable()) {
userData = new byte[content.readUnsignedShort()];
content.readBytes(userData);
if (log.isDebugEnabled()) {
log.debug("Received unexcepted pong from " + pong.address);
}
}

@Override
public void close(boolean force) {
super.close(force);
if (this.session != null && !this.session.isClosed()) {
this.session.close();
}

this.inboundPongs.offer(new PongEntry(packet.sender(), pingTime, guid, userData));
if (this.channel != null) {
ChannelFuture future = this.channel.close();
if (force) future.syncUninterruptibly();
}
}

private void sendUnconnectedPing(InetSocketAddress recipient) {
@@ -165,60 +171,47 @@ private void sendUnconnectedPing(InetSocketAddress recipient) {
this.channel.writeAndFlush(new DatagramPacket(buffer, recipient));
}

@Override
public InetSocketAddress getBindAddress() {
return this.bindAddress;
}

public RakNetClientSession getSession() {
return this.session;
}

@Override
protected EventLoop nextEventLoop() {
if (this.tickingEventLoop == null) {
this.tickingEventLoop = super.nextEventLoop();
}
return this.tickingEventLoop;
}

@RequiredArgsConstructor
private static class PingEntry {
public static class PingEntry {
private final CompletableFuture<RakNetPong> future;
private final long timeout;
private long sendTime;
}

@RequiredArgsConstructor
private static class PongEntry {
public static class PongEntry {
private final InetSocketAddress address;
private final long pingTime;
private final long guid;
private final byte[] userData;
}

private class ClientDatagramHandler extends ChannelInboundHandlerAdapter {
private class ClientChannelInitializer extends ChannelInitializer<Channel> {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof DatagramPacket)) {
return;
}

DatagramPacket packet = (DatagramPacket) msg;
try {
ByteBuf content = packet.content();
int packetId = content.readUnsignedByte();

if (packetId == ID_UNCONNECTED_PONG) {
RakNetClient.this.onUnconnectedPong(packet);
} else if (session != null && session.address.equals(packet.sender())) {
content.readerIndex(0);
if (session.eventLoop.inEventLoop()) {
session.onDatagram(content);
} else {
session.eventLoop.execute(() -> session.onDatagram(content));
}
}
} finally {
packet.release();
}
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
if (ctx.channel().isRegistered()) {
RakNetClient.this.channel = ctx.channel();
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
for (Consumer<Throwable> handler : RakNetClient.this.exceptionHandlers.values()) {
handler.accept(cause);
}
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(RakOutboundHandler.NAME, new RakOutboundHandler(RakNetClient.this));
pipeline.addLast(ClientMessageHandler.NAME, new ClientMessageHandler(RakNetClient.this));
pipeline.addLast(RakExceptionHandler.NAME, new RakExceptionHandler(RakNetClient.this));
RakNetClient.this.channel = channel;
}
}
}
Original file line number Diff line number Diff line change
@@ -34,6 +34,10 @@ public class RakNetConstants {
* Time after {@link RakNetSession} is refreshed due to no activity.
*/
public static final int SESSION_STALE_MS = 5000;
/**
* Time in millis after unconnected ping is resent till pong is not received.
*/
public static final int RAKNET_PING_INTERVAL = 1000;

/*
Flags
280 changes: 116 additions & 164 deletions raknet/src/main/java/com/nukkitx/network/raknet/RakNetServer.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -10,11 +10,26 @@
@ParametersAreNonnullByDefault
public interface RakNetServerListener {

/**
* Called when a new connection is attempted
*
* @param address address of the connection
* @param realAddress address of the real connection - different generally only when server has PROXY protocol enabled
* @return whether the user should be accepted or not
*/
default boolean onConnectionRequest(InetSocketAddress address, InetSocketAddress realAddress) {
return onConnectionRequest(address);
}

/**
* @param address address of user requesting connection
* @return whether the user should be accepted or not
* @deprecated
*/
boolean onConnectionRequest(InetSocketAddress address);
@Deprecated
default boolean onConnectionRequest(InetSocketAddress address) {
throw new UnsupportedOperationException("RakNetServerListener#onConnectionRequest is not implemented");
}

/**
* Called when an unconnected client pings the server to retrieve it's status and MOTD.
329 changes: 187 additions & 142 deletions raknet/src/main/java/com/nukkitx/network/raknet/RakNetSession.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@
@UtilityClass
public class RakNetUtils {

public static void writeIntRanges(ByteBuf buffer, Queue<IntRange> ackQueue, int mtu) {
public static int writeIntRanges(ByteBuf buffer, Queue<IntRange> ackQueue, int mtu) {
int lengthIndex = buffer.writerIndex();
buffer.writeZero(2);
mtu -= 2;
@@ -50,6 +50,7 @@ public static void writeIntRanges(ByteBuf buffer, Queue<IntRange> ackQueue, int
buffer.writerIndex(lengthIndex);
buffer.writeShort(count);
buffer.writerIndex(finalIndex);
return count;
}

public static boolean verifyUnconnectedMagic(ByteBuf buffer) {
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.nukkitx.network.raknet.pipeline;

import com.nukkitx.network.raknet.RakNetClient;
import com.nukkitx.network.raknet.RakNetClientSession;
import com.nukkitx.network.raknet.RakNetUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;

import static com.nukkitx.network.raknet.RakNetConstants.ID_UNCONNECTED_PONG;

public class ClientMessageHandler extends SimpleChannelInboundHandler<DatagramPacket> {
public static final String NAME = "rak-client-message-handler";

private final RakNetClient client;

public ClientMessageHandler(RakNetClient client) {
this.client = client;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
ByteBuf buffer = packet.content();
if (!buffer.isReadable()) {
return;
}

if (this.client.getMetrics() != null) {
this.client.getMetrics().bytesIn(buffer.readableBytes());
}

int packetId = buffer.readUnsignedByte();
if (packetId == ID_UNCONNECTED_PONG) {
this.onUnconnectedPong(packet);
return;
}

final RakNetClientSession session = this.client.getSession();
if (session == null || !session.getAddress().equals(packet.sender())) {
return;
}

ByteBuf buf = buffer.readerIndex(0).retain();
if (session.getEventLoop().inEventLoop()) {
session.onDatagram(buf);
} else {
session.getEventLoop().execute(() -> session.onDatagram(buf));
}
}

private void onUnconnectedPong(DatagramPacket packet) {
ByteBuf content = packet.content();
long pingTime = content.readLong();
long guid = content.readLong();
if (!RakNetUtils.verifyUnconnectedMagic(content)) {
return;
}

byte[] userData = null;
if (content.isReadable()) {
userData = new byte[content.readUnsignedShort()];
content.readBytes(userData);
}
this.client.onUnconnectedPong(new RakNetClient.PongEntry(packet.sender(), pingTime, guid, userData));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.nukkitx.network.raknet.pipeline;

import com.nukkitx.network.raknet.RakNetServer;
import com.nukkitx.network.raknet.RakNetSession;
import com.nukkitx.network.raknet.proxy.HAProxyMessage;
import com.nukkitx.network.raknet.proxy.HAProxyProtocolException;
import com.nukkitx.network.raknet.proxy.ProxyProtocolDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.net.InetAddress;
import java.net.InetSocketAddress;

@ChannelHandler.Sharable
public class ProxyServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private static final InternalLogger log = InternalLoggerFactory.getInstance(ProxyServerHandler.class);
public static final String NAME = "rak-proxy-server-handler";

private final RakNetServer server;

public ProxyServerHandler(RakNetServer server) {
this.server = server;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
ByteBuf content = packet.content();
RakNetSession session = this.server.getSession(packet.sender());
int detectedVersion = session != null ? ProxyProtocolDecoder.findVersion(content) : -1;
InetSocketAddress presentAddress = this.server.getProxiedAddress(packet.sender());

if (presentAddress == null && detectedVersion == -1) {
// We haven't received a header from given address before and we couldn't detect a
// PROXY header, ignore.
return;
}

if (presentAddress == null) {
final HAProxyMessage decoded;
try {
if ((decoded = ProxyProtocolDecoder.decode(content, detectedVersion)) == null) {
// PROXY header was not present in the packet, ignore.
return;
}
} catch (HAProxyProtocolException e) {
log.debug("{} sent malformed PROXY header", packet.sender(), e);
return;
}

presentAddress = decoded.sourceInetSocketAddress();
log.debug("Got PROXY header: (from {}) {}", packet.sender(), presentAddress);
if (log.isDebugEnabled()) {
log.debug("PROXY Headers map size: {}", this.server.getProxiedAddressSize());
}
this.server.addProxiedAddress(packet.sender(), presentAddress);
return;
}

log.trace("Reusing PROXY header: (from {}) {}", packet.sender(), presentAddress);

InetAddress address = presentAddress.getAddress();
if (address == null || !this.server.isBlocked(address)) {
ctx.fireChannelRead(packet.retain());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.nukkitx.network.raknet.pipeline;

import com.nukkitx.network.raknet.RakNet;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;

import java.util.function.Consumer;

@ChannelHandler.Sharable
public class RakExceptionHandler extends ChannelDuplexHandler {
public static final String NAME = "rak-exception-handler";
private final RakNet rakNet;

public RakExceptionHandler(RakNet rakNet) {
this.rakNet = rakNet;
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
for (Consumer<Throwable> handler : this.rakNet.getExceptionHandlers()) {
handler.accept(cause);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.nukkitx.network.raknet.pipeline;

import com.nukkitx.network.raknet.RakNet;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DatagramPacket;

public class RakOutboundHandler extends ChannelOutboundHandlerAdapter {
public static final String NAME = "rak-outbound-handler";
private final RakNet rakNet;

public RakOutboundHandler(RakNet rakNet) {
this.rakNet = rakNet;
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (!(msg instanceof DatagramPacket)) {
super.write(ctx, msg, promise);
return;
}

ByteBuf buffer = ((DatagramPacket) msg).content();
if (this.rakNet.getMetrics() != null) {
this.rakNet.getMetrics().bytesOut(buffer.readableBytes());
}
super.write(ctx, msg, promise);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.nukkitx.network.raknet.pipeline;

import com.nukkitx.network.raknet.RakNetServer;
import com.nukkitx.network.raknet.RakNetServerSession;
import com.nukkitx.network.raknet.RakNetUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;

import static com.nukkitx.network.raknet.RakNetConstants.*;

@ChannelHandler.Sharable
public class ServerDatagramHandler extends SimpleChannelInboundHandler<DatagramPacket> {
public static final String NAME = "rak-server-datagram-handler";
private final RakNetServer server;

public ServerDatagramHandler(RakNetServer server) {
this.server = server;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
ByteBuf buffer = packet.content();
short packetId = buffer.readByte();

// These packets don't require a session
switch (packetId) {
case ID_UNCONNECTED_PING:
this.onUnconnectedPing(ctx, packet);
return;
case ID_OPEN_CONNECTION_REQUEST_1:
this.server.onOpenConnectionRequest1(ctx, packet);
return;
}

buffer.readerIndex(0);

RakNetServerSession session = this.server.getSession(packet.sender());
if (session != null) {
if (session.getEventLoop().inEventLoop()) {
session.onDatagram(buffer.retain());
} else {
ByteBuf buf = buffer.retain();
session.getEventLoop().execute(() -> session.onDatagram(buf));
}
}

if (this.server.getListener() != null) {
this.server.getListener().onUnhandledDatagram(ctx, packet);
}
}

private void onUnconnectedPing(ChannelHandlerContext ctx, DatagramPacket packet) {
if (!packet.content().isReadable(24)) {
return;
}

long pingTime = packet.content().readLong();
if (!RakNetUtils.verifyUnconnectedMagic(packet.content())) {
return;
}

byte[] userData = null;
if (this.server.getListener() != null) {
userData = this.server.getListener().onQuery(packet.sender());
}

if (userData == null) {
userData = new byte[0];
}

int packetLength = 35 + userData.length;

ByteBuf buffer = ctx.alloc().ioBuffer(packetLength, packetLength);
buffer.writeByte(ID_UNCONNECTED_PONG);
buffer.writeLong(pingTime);
buffer.writeLong(this.server.getGuid());
RakNetUtils.writeUnconnectedMagic(buffer);
buffer.writeShort(userData.length);
buffer.writeBytes(userData);
ctx.writeAndFlush(new DatagramPacket(buffer, packet.sender()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.nukkitx.network.raknet.pipeline;

import com.nukkitx.network.raknet.RakNetServer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;

@ChannelHandler.Sharable
public class ServerMessageHandler extends SimpleChannelInboundHandler<DatagramPacket> {
public static final String NAME = "rak-server-message-handler";
private final RakNetServer server;

public ServerMessageHandler(RakNetServer server) {
this.server = server;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
if (this.server.isBlocked(packet.sender().getAddress())) {
// Drop incoming traffic from blocked address
return;
}

ByteBuf buffer = packet.content();
if (!buffer.isReadable()) {
return;
}

if (this.server.getMetrics() != null) {
this.server.getMetrics().bytesIn(buffer.readableBytes());
}
ctx.fireChannelRead(packet.retain());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.nukkitx.network.raknet.proxy;

/**
* The command of an HAProxy proxy protocol header
*/
public enum HAProxyCommand implements ProxyProtocolConstants {
/**
* The LOCAL command represents a connection that was established on purpose by the proxy
* without being relayed.
*/
LOCAL(COMMAND_LOCAL_BYTE),
/**
* The PROXY command represents a connection that was established on behalf of another node,
* and reflects the original connection endpoints.
*/
PROXY(COMMAND_PROXY_BYTE);

/**
* The command is specified in the lowest 4 bits of the protocol version and command byte
*/
private static final byte COMMAND_MASK = 0x0f;

private final byte byteValue;

/**
* Creates a new instance
*/
HAProxyCommand(byte byteValue) {
this.byteValue = byteValue;
}

/**
* Returns the {@link HAProxyCommand} represented by the lowest 4 bits of the specified byte.
*
* @param verCmdByte protocol version and command byte
*/
public static HAProxyCommand valueOf(byte verCmdByte) {
int cmd = verCmdByte & COMMAND_MASK;
switch ((byte) cmd) {
case COMMAND_PROXY_BYTE:
return PROXY;
case COMMAND_LOCAL_BYTE:
return LOCAL;
default:
throw new IllegalArgumentException("unknown command: " + cmd);
}
}

/**
* Returns the byte value of this command.
*/
public byte byteValue() {
return byteValue;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.nukkitx.network.raknet.proxy;

import io.netty.handler.codec.DecoderException;

/**
* A {@link DecoderException} which is thrown when an invalid HAProxy proxy protocol header is encountered
*/
public class HAProxyProtocolException extends DecoderException {

private static final long serialVersionUID = 713710864325167351L;

/**
* Creates a new instance
*/
public HAProxyProtocolException() { }

/**
* Creates a new instance
*/
public HAProxyProtocolException(String message, Throwable cause) {
super(message, cause);
}

/**
* Creates a new instance
*/
public HAProxyProtocolException(String message) {
super(message);
}

/**
* Creates a new instance
*/
public HAProxyProtocolException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.nukkitx.network.raknet.proxy;

/**
* The HAProxy proxy protocol specification version.
*/
public enum HAProxyProtocolVersion implements ProxyProtocolConstants {
/**
* The ONE proxy protocol version represents a version 1 (human-readable) header.
*/
V1(VERSION_ONE_BYTE),
/**
* The TWO proxy protocol version represents a version 2 (binary) header.
*/
V2(VERSION_TWO_BYTE);

/**
* The highest 4 bits of the protocol version and command byte contain the version
*/
private static final byte VERSION_MASK = (byte) 0xf0;

private final byte byteValue;

/**
* Creates a new instance
*/
HAProxyProtocolVersion(byte byteValue) {
this.byteValue = byteValue;
}

/**
* Returns the {@link HAProxyProtocolVersion} represented by the highest 4 bits of the specified byte.
*
* @param verCmdByte protocol version and command byte
*/
public static HAProxyProtocolVersion valueOf(byte verCmdByte) {
int version = verCmdByte & VERSION_MASK;
switch ((byte) version) {
case VERSION_TWO_BYTE:
return V2;
case VERSION_ONE_BYTE:
return V1;
default:
throw new IllegalArgumentException("unknown version: " + version);
}
}

/**
* Returns the byte value of this version.
*/
public byte byteValue() {
return byteValue;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.nukkitx.network.raknet.proxy;

/**
* A protocol proxied by HAProxy which is represented by its transport protocol and address family.
*/
public enum HAProxyProxiedProtocol implements ProxyProtocolConstants {
/**
* The UNKNOWN represents a connection which was forwarded for an unknown protocol and an unknown address family.
*/
UNKNOWN(TPAF_UNKNOWN_BYTE, AddressFamily.AF_UNSPEC, TransportProtocol.UNSPEC),
/**
* The TCP4 represents a connection which was forwarded for an IPv4 client over TCP.
*/
TCP4(TPAF_TCP4_BYTE, AddressFamily.AF_IPv4, TransportProtocol.STREAM),
/**
* The TCP6 represents a connection which was forwarded for an IPv6 client over TCP.
*/
TCP6(TPAF_TCP6_BYTE, AddressFamily.AF_IPv6, TransportProtocol.STREAM),
/**
* The UDP4 represents a connection which was forwarded for an IPv4 client over UDP.
*/
UDP4(TPAF_UDP4_BYTE, AddressFamily.AF_IPv4, TransportProtocol.DGRAM),
/**
* The UDP6 represents a connection which was forwarded for an IPv6 client over UDP.
*/
UDP6(TPAF_UDP6_BYTE, AddressFamily.AF_IPv6, TransportProtocol.DGRAM),
/**
* The UNIX_STREAM represents a connection which was forwarded for a UNIX stream socket.
*/
UNIX_STREAM(TPAF_UNIX_STREAM_BYTE, AddressFamily.AF_UNIX, TransportProtocol.STREAM),
/**
* The UNIX_DGRAM represents a connection which was forwarded for a UNIX datagram socket.
*/
UNIX_DGRAM(TPAF_UNIX_DGRAM_BYTE, AddressFamily.AF_UNIX, TransportProtocol.DGRAM);

private final byte byteValue;
private final AddressFamily addressFamily;
private final TransportProtocol transportProtocol;

/**
* Creates a new instance.
*/
HAProxyProxiedProtocol(
byte byteValue,
AddressFamily addressFamily,
TransportProtocol transportProtocol) {

this.byteValue = byteValue;
this.addressFamily = addressFamily;
this.transportProtocol = transportProtocol;
}

/**
* Returns the {@link HAProxyProxiedProtocol} represented by the specified byte.
*
* @param tpafByte transport protocol and address family byte
*/
public static HAProxyProxiedProtocol valueOf(byte tpafByte) {
switch (tpafByte) {
case TPAF_TCP4_BYTE:
return TCP4;
case TPAF_TCP6_BYTE:
return TCP6;
case TPAF_UNKNOWN_BYTE:
return UNKNOWN;
case TPAF_UDP4_BYTE:
return UDP4;
case TPAF_UDP6_BYTE:
return UDP6;
case TPAF_UNIX_STREAM_BYTE:
return UNIX_STREAM;
case TPAF_UNIX_DGRAM_BYTE:
return UNIX_DGRAM;
default:
throw new IllegalArgumentException(
"unknown transport protocol + address family: " + (tpafByte & 0xFF));
}
}

/**
* Returns the byte value of this protocol and address family.
*/
public byte byteValue() {
return byteValue;
}

/**
* Returns the {@link AddressFamily} of this protocol and address family.
*/
public AddressFamily addressFamily() {
return addressFamily;
}

/**
* Returns the {@link TransportProtocol} of this protocol and address family.
*/
public TransportProtocol transportProtocol() {
return transportProtocol;
}

/**
* The address family of an HAProxy proxy protocol header.
*/
public enum AddressFamily {
/**
* The UNSPECIFIED address family represents a connection which was forwarded for an unknown protocol.
*/
AF_UNSPEC(AF_UNSPEC_BYTE),
/**
* The IPV4 address family represents a connection which was forwarded for an IPV4 client.
*/
AF_IPv4(AF_IPV4_BYTE),
/**
* The IPV6 address family represents a connection which was forwarded for an IPV6 client.
*/
AF_IPv6(AF_IPV6_BYTE),
/**
* The UNIX address family represents a connection which was forwarded for a unix socket.
*/
AF_UNIX(AF_UNIX_BYTE);

/**
* The highest 4 bits of the transport protocol and address family byte contain the address family
*/
private static final byte FAMILY_MASK = (byte) 0xf0;

private final byte byteValue;

/**
* Creates a new instance
*/
AddressFamily(byte byteValue) {
this.byteValue = byteValue;
}

/**
* Returns the {@link AddressFamily} represented by the highest 4 bits of the specified byte.
*
* @param tpafByte transport protocol and address family byte
*/
public static AddressFamily valueOf(byte tpafByte) {
int addressFamily = tpafByte & FAMILY_MASK;
switch((byte) addressFamily) {
case AF_IPV4_BYTE:
return AF_IPv4;
case AF_IPV6_BYTE:
return AF_IPv6;
case AF_UNSPEC_BYTE:
return AF_UNSPEC;
case AF_UNIX_BYTE:
return AF_UNIX;
default:
throw new IllegalArgumentException("unknown address family: " + addressFamily);
}
}

/**
* Returns the byte value of this address family.
*/
public byte byteValue() {
return byteValue;
}
}

/**
* The transport protocol of an HAProxy proxy protocol header
*/
public enum TransportProtocol {
/**
* The UNSPEC transport protocol represents a connection which was forwarded for an unknown protocol.
*/
UNSPEC(TRANSPORT_UNSPEC_BYTE),
/**
* The STREAM transport protocol represents a connection which was forwarded for a TCP connection.
*/
STREAM(TRANSPORT_STREAM_BYTE),
/**
* The DGRAM transport protocol represents a connection which was forwarded for a UDP connection.
*/
DGRAM(TRANSPORT_DGRAM_BYTE);

/**
* The transport protocol is specified in the lowest 4 bits of the transport protocol and address family byte
*/
private static final byte TRANSPORT_MASK = 0x0f;

private final byte transportByte;

/**
* Creates a new instance.
*/
TransportProtocol(byte transportByte) {
this.transportByte = transportByte;
}

/**
* Returns the {@link TransportProtocol} represented by the lowest 4 bits of the specified byte.
*
* @param tpafByte transport protocol and address family byte
*/
public static TransportProtocol valueOf(byte tpafByte) {
int transportProtocol = tpafByte & TRANSPORT_MASK;
switch ((byte) transportProtocol) {
case TRANSPORT_STREAM_BYTE:
return STREAM;
case TRANSPORT_UNSPEC_BYTE:
return UNSPEC;
case TRANSPORT_DGRAM_BYTE:
return DGRAM;
default:
throw new IllegalArgumentException("unknown transport protocol: " + transportProtocol);
}
}

/**
* Returns the byte value of this transport protocol.
*/
public byte byteValue() {
return transportByte;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.nukkitx.network.raknet.proxy;

public interface ProxyProtocolConstants {

/**
* Command byte constants
*/
byte COMMAND_LOCAL_BYTE = 0x00;
byte COMMAND_PROXY_BYTE = 0x01;

/**
* Version byte constants
*/
byte VERSION_ONE_BYTE = 0x10;
byte VERSION_TWO_BYTE = 0x20;

/**
* Transport protocol byte constants
*/
byte TRANSPORT_UNSPEC_BYTE = 0x00;
byte TRANSPORT_STREAM_BYTE = 0x01;
byte TRANSPORT_DGRAM_BYTE = 0x02;

/**
* Address family byte constants
*/
byte AF_UNSPEC_BYTE = 0x00;
byte AF_IPV4_BYTE = 0x10;
byte AF_IPV6_BYTE = 0x20;
byte AF_UNIX_BYTE = 0x30;

/**
* Transport protocol and address family byte constants
*/
byte TPAF_UNKNOWN_BYTE = 0x00;
byte TPAF_TCP4_BYTE = 0x11;
byte TPAF_TCP6_BYTE = 0x21;
byte TPAF_UDP4_BYTE = 0x12;
byte TPAF_UDP6_BYTE = 0x22;
byte TPAF_UNIX_STREAM_BYTE = 0x31;
byte TPAF_UNIX_DGRAM_BYTE = 0x32;

/**
* V2 protocol binary header prefix
*/
byte[] BINARY_PREFIX = {
(byte) 0x0D,
(byte) 0x0A,
(byte) 0x0D,
(byte) 0x0A,
(byte) 0x00,
(byte) 0x0D,
(byte) 0x0A,
(byte) 0x51,
(byte) 0x55,
(byte) 0x49,
(byte) 0x54,
(byte) 0x0A
};

byte[] TEXT_PREFIX = {
(byte) 'P',
(byte) 'R',
(byte) 'O',
(byte) 'X',
(byte) 'Y',
};

/**
* Maximum possible length of a v1 proxy protocol header per spec
*/
int V1_MAX_LENGTH = 108;

/**
* Maximum possible length of a v2 proxy protocol header (fixed 16 bytes + max unsigned short)
*/
int V2_MAX_LENGTH = 16 + 65535;

/**
* Minimum possible length of a fully functioning v2 proxy protocol header (fixed 16 bytes + v2 address info space)
*/
int V2_MIN_LENGTH = 16 + 216;

/**
* Maximum possible length for v2 additional TLV data (max unsigned short - max v2 address info space)
*/
int V2_MAX_TLV = 65535 - 216;

/**
* Binary header prefix length
*/
int BINARY_PREFIX_LENGTH = BINARY_PREFIX.length;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.nukkitx.network.raknet.proxy;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.ProtocolDetectionResult;
import io.netty.util.CharsetUtil;

/**
* Decodes an HAProxy proxy protocol header
*
* @see <a href="https://haproxy.1wt.eu/download/1.5/doc/proxy-protocol.txt">Proxy Protocol Specification</a>
* @see <a href="https://github.com/netty/netty/blob/4.1/codec-haproxy/src/main/java/io/netty/handler/codec/haproxy/HAProxyMessageDecoder.java">Netty implementation</a>
*/
public final class ProxyProtocolDecoder implements ProxyProtocolConstants {
/**
* {@link ProtocolDetectionResult} for {@link HAProxyProtocolVersion#V1}.
*/
private static final ProtocolDetectionResult<HAProxyProtocolVersion> DETECTION_RESULT_V1 =
ProtocolDetectionResult.detected(HAProxyProtocolVersion.V1);

/**
* {@link ProtocolDetectionResult} for {@link HAProxyProtocolVersion#V2}.
*/
private static final ProtocolDetectionResult<HAProxyProtocolVersion> DETECTION_RESULT_V2 =
ProtocolDetectionResult.detected(HAProxyProtocolVersion.V2);

/**
* Used to extract a header frame out of the {@link ByteBuf} and return it.
*/
private HeaderExtractor headerExtractor;

/**
* {@code true} if we're discarding input because we're already over maxLength
*/
private boolean discarding;

/**
* Number of discarded bytes
*/
private int discardedBytes;

/**
* {@code true} if we're finished decoding the proxy protocol header
*/
private boolean finished;

/**
* Protocol specification version
*/
private int decodingVersion = -1;

/**
* The latest v2 spec (2014/05/18) allows for additional data to be sent in the proxy protocol header beyond the
* address information block so now we need a configurable max header size
*/
private final int v2MaxHeaderSize = V2_MAX_LENGTH; // TODO: need to calculate max length if TLVs are desired.

private ProxyProtocolDecoder(int version) {
this.decodingVersion = version;
}

public static HAProxyMessage decode(ByteBuf packet, int version) {
if (version == -1) {
return null;
}
ProxyProtocolDecoder decoder = new ProxyProtocolDecoder(version);
return decoder.decodeHeader(packet);
}

private HAProxyMessage decodeHeader(ByteBuf in) {
final ByteBuf decoded = decodingVersion == 1 ? decodeLine(in) : decodeStruct(in);
if (decoded == null) {
return null;
}

finished = true;
try {
if (decodingVersion == 1) {
return HAProxyMessage.decodeHeader(decoded.toString(CharsetUtil.US_ASCII));
} else {
return HAProxyMessage.decodeHeader(decoded);
}
} catch (HAProxyProtocolException e) {
throw fail(null, e);
}
}

public static int findVersion(final ByteBuf buffer) {
final int n = buffer.readableBytes();
// per spec, the version number is found in the 13th byte
if (n < 13) {
return -1;
}

int idx = buffer.readerIndex();
return match(BINARY_PREFIX, buffer, idx) ? buffer.getByte(idx + BINARY_PREFIX_LENGTH) : 1;
}

/**
* Create a frame out of the {@link ByteBuf} and return it.
*
* @param buffer the {@link ByteBuf} from which to read data
* @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
* be created
*/
private ByteBuf decodeStruct(ByteBuf buffer) {
if (headerExtractor == null) {
headerExtractor = new StructHeaderExtractor(v2MaxHeaderSize);
}
return headerExtractor.extract(buffer);
}

/**
* Create a frame out of the {@link ByteBuf} and return it.
*
* @param buffer the {@link ByteBuf} from which to read data
* @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
* be created
*/
private ByteBuf decodeLine(ByteBuf buffer) {
if (headerExtractor == null) {
headerExtractor = new LineHeaderExtractor(V1_MAX_LENGTH);
}
return headerExtractor.extract(buffer);
}

private void failOverLimit(String length) {
int maxLength = decodingVersion == 1 ? V1_MAX_LENGTH : v2MaxHeaderSize;
throw fail("header length (" + length + ") exceeds the allowed maximum (" + maxLength + ')', null);
}

private HAProxyProtocolException fail(String errMsg, Exception e) {
finished = true;
HAProxyProtocolException ppex;
if (errMsg != null && e != null) {
ppex = new HAProxyProtocolException(errMsg, e);
} else if (errMsg != null) {
ppex = new HAProxyProtocolException(errMsg);
} else if (e != null) {
ppex = new HAProxyProtocolException(e);
} else {
ppex = new HAProxyProtocolException();
}
return ppex;
}

/**
* Returns the {@link ProtocolDetectionResult} for the given {@link ByteBuf}.
*/
public static ProtocolDetectionResult<HAProxyProtocolVersion> detectProtocol(ByteBuf buffer) {
if (buffer.readableBytes() < 12) {
return ProtocolDetectionResult.needsMoreData();
}

int idx = buffer.readerIndex();

if (match(BINARY_PREFIX, buffer, idx)) {
return DETECTION_RESULT_V2;
}
if (match(TEXT_PREFIX, buffer, idx)) {
return DETECTION_RESULT_V1;
}
return ProtocolDetectionResult.invalid();
}

private static boolean match(byte[] prefix, ByteBuf buffer, int idx) {
for (int i = 0; i < prefix.length; i++) {
final byte b = buffer.getByte(idx + i);
if (b != prefix[i]) {
return false;
}
}
return true;
}

/**
* HeaderExtractor create a header frame out of the {@link ByteBuf}.
*/
private abstract class HeaderExtractor {
/** Header max size */
private final int maxHeaderSize;

protected HeaderExtractor(int maxHeaderSize) {
this.maxHeaderSize = maxHeaderSize;
}

/**
* Create a frame out of the {@link ByteBuf} and return it.
*
* @param buffer the {@link ByteBuf} from which to read data
* @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
* be created
* @throws Exception if exceed maxLength
*/
public ByteBuf extract(ByteBuf buffer) {
final int eoh = findEndOfHeader(buffer);
if (!discarding) {
if (eoh >= 0) {
final int length = eoh - buffer.readerIndex();
if (length > maxHeaderSize) {
buffer.readerIndex(eoh + delimiterLength(buffer, eoh));
failOverLimit(String.valueOf(length));
return null;
}
ByteBuf frame = buffer.readSlice(length);
buffer.skipBytes(delimiterLength(buffer, eoh));
return frame;
} else {
final int length = buffer.readableBytes();
if (length > maxHeaderSize) {
discardedBytes = length;
buffer.skipBytes(length);
discarding = true;
failOverLimit("over " + discardedBytes);
}
return null;
}
} else {
if (eoh >= 0) {
final int length = discardedBytes + eoh - buffer.readerIndex();
buffer.readerIndex(eoh + delimiterLength(buffer, eoh));
discardedBytes = 0;
discarding = false;
failOverLimit("over " + length);
} else {
discardedBytes += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}

/**
* Find the end of the header from the given {@link ByteBuf},the end may be a CRLF, or the length given by the
* header.
*
* @param buffer the buffer to be searched
* @return {@code -1} if can not find the end, otherwise return the buffer index of end
*/
protected abstract int findEndOfHeader(ByteBuf buffer);

/**
* Get the length of the header delimiter.
*
* @param buffer the buffer where delimiter is located
* @param eoh index of delimiter
* @return length of the delimiter
*/
protected abstract int delimiterLength(ByteBuf buffer, int eoh);
}

private final class LineHeaderExtractor extends HeaderExtractor {

LineHeaderExtractor(int maxHeaderSize) {
super(maxHeaderSize);
}

/**
* Returns the index in the buffer of the end of line found.
* Returns -1 if no end of line was found in the buffer.
*/
@Override
protected int findEndOfHeader(ByteBuf buffer) {
final int n = buffer.writerIndex();
for (int i = buffer.readerIndex(); i < n; i++) {
final byte b = buffer.getByte(i);
if (b == '\r' && i < n - 1 && buffer.getByte(i + 1) == '\n') {
return i; // \r\n
}
}
return -1; // Not found.
}

@Override
protected int delimiterLength(ByteBuf buffer, int eoh) {
return buffer.getByte(eoh) == '\r' ? 2 : 1;
}
}

private final class StructHeaderExtractor extends HeaderExtractor {

StructHeaderExtractor(int maxHeaderSize) {
super(maxHeaderSize);
}

/**
* Returns the index in the buffer of the end of header if found.
* Returns -1 if no end of header was found in the buffer.
*/
@Override
protected int findEndOfHeader(ByteBuf buffer) {
final int n = buffer.readableBytes();

// per spec, the 15th and 16th bytes contain the address length in bytes
if (n < 16) {
return -1;
}

int offset = buffer.readerIndex() + 14;

// the total header length will be a fixed 16 byte sequence + the dynamic address information block
int totalHeaderBytes = 16 + buffer.getUnsignedShort(offset);

// ensure we actually have the full header available
if (n >= totalHeaderBytes) {
return totalHeaderBytes;
} else {
return -1;
}
}

@Override
protected int delimiterLength(ByteBuf buffer, int eoh) {
return 0;
}
}
}
2 changes: 1 addition & 1 deletion rcon/pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<artifactId>network-parent</artifactId>
<groupId>com.nukkitx.network</groupId>
<version>1.6.25</version>
<version>1.6.27-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>