Java Guide Series
Architecture | Knowledge Base | Networking | Containers | Threads
The main entry point into the MADARA transport layer is the QoSTransportSettings class. The QoSTransportSettings class contains dozens of network-specific settings that govern basic configuration, quality-of-service, and filtering.
Creating a networked transport in MADARA generally requires editing QoS_Transport_Settings.type
(which specifies the network transport type) and a list of hosts (for UDP, Multicast, Broadcast) or domains (map to topics in DDS transports).
UDP requires a hosts vector with the ip:port of the sender first (where we want to bind and how we want to be known to others in the originator field of messages) and all peers in the rest of the vector. Multicast requires a valid multicast ip and port in the first entry of the hosts vector. Similarly, broadcast requires a valid broadcast ip (for the subnet of your machine) and port in the first entry of the hosts vector.
Example of UDP transport constructor example
import ai.madara.knowledge.KnowledgeBase; import ai.madara.transport.QoSTransportSettings; import ai.madara.transport.TransportType; //Create transport settings for a multicast transport QoSTransportSettings settings = new QoSTransportSettings(); settings.setHosts(new String[]{"localhost:40000", "localhost:40001}); settings.setType(TransportType.UDP_TRANSPORT); //create a knowledge base with the multicast transport settings KnowledgeBase knowledge = new KnowledgeBase("", settings); //send all agents listening on this multicast address the knowledge //that all agents are currently ready knowledge.evaluate("all_agents_ready = 1");
Example of UDP registry transport constructor example
UDP Registry is a special UDP transport that dynamically recognizes new and leaving hosts after deployment. To use the UDP Registry transport, you need to have a madara_registry running (located at $MADARA_ROOT/bin/madara_registry). This UDP registry should work over 3G, 4G, and from behind NATs as long as the NAT maintains the UDP host:port for the madara_registry binding.
If the madara_registry goes offline, the agents that have talked with the madara_registry before will continue to use the last hosts they knew of. Whenever a madara_registry comes back online (e.g., when you restart it), everything should work without any issues.
import ai.madara.knowledge.KnowledgeBase; import ai.madara.transport.QoSTransportSettings; import ai.madara.transport.TransportType; import ai.madara.filters.EndpointClear; //create transport settings for a multicast transport QoSTransportSettings settings = new QoSTransportSettings(); //assume a registry server on the local host at port 40000 settings.setHosts(new String[]{"localhost:40000"}); settings.setType(TransportType.REGISTRY_CLIENT); //create endpoint clear filter, which will tidy up registry updates EndpointClear filter = new EndpointClear(); filter.addReceiveFilterTo(settings); //create a knowledge base with the multicast transport settings KnowledgeBase knowledge = new KnowledgeBase("", settings); //send all agents listening on this multicast address the knowledge //that all agents are currently ready knowledge.evaluate("all_agents_ready = 1");
Example of multicast transport constructor (Multicast IP range)
import ai.madara.knowledge.KnowledgeBase; import ai.madara.transport.QoSTransportSettings; import ai.madara.transport.TransportType; //Create transport settings for a multicast transport QoSTransportSettings settings = new QoSTransportSettings (); settings.setHosts(new String[]{"239.255.0.1:4150"}); settings.setType(TransportType.MULTICAST_TRANSPORT); //create a knowledge base with the multicast transport settings KnowledgeBase knowledge = new KnowledgeBase("agent1", settings); //send all agents listening on this multicast address the knowledge //that all agents are currently ready knowledge.evaluate("all_agents_ready = 1");
Example of broadcast transport constructor
import ai.madara.knowledge.KnowledgeBase; import ai.madara.transport.QoSTransportSettings; import ai.madara.transport.TransportType; //Create transport settings for a multicast transport QoSTransportSettings settings = new QoSTransportSettings (); settings.setHosts(new String[]{"192.168.1.255:15000"}); settings.setType(TransportType.BROADCAST_TRANSPORT); //create a knowledge base with the multicast transport settings KnowledgeBase knowledge = new KnowledgeBase("", settings); //send all agents listening on this multicast address the knowledge //that all agents are currently ready knowledge.evaluate("all_agents_ready = 1"); }
When using real robots in a wide area, it may not be possible to reach all neighbors in a single message send. To aid developers trying to solve problems where multi-hops are required for message routing, MADARA supports rebroadcast TTLs set on the sender side. MADARA also supports opting out of rebroadcasts. In fact, all transports opt out by default.
Example routing between agent1 and agent3 that is not possible without rebroadcasts
Example of sending a packet with a rebroadcast ttl of 3
import ai.madara.knowledge.KnowledgeBase; import ai.madara.transport.QoSTransportSettings; import ai.madara.transport.TransportType; //Create transport settings for a multicast transport QoSTransportSettings settings = new QoSTransportSettings (); settings.setHosts(new String[]{"239.255.0.1:4150"}); settings.setType(TransportType.MULTICAST_TRANSPORT); //create a knowledge base with the multicast transport settings KnowledgeBase knowledge = new KnowledgeBase("", settings); // set rebroadcast ttl to 3. Madara::Transport::QoS_Transport_Settings settings; settings.setRebroadcastTtl(3); KnowledgeBase knowledge = new KnowledgeBase("", settings);
Example of enabling participation in rebroadcasts
import ai.madara.knowledge.KnowledgeBase; import ai.madara.transport.QoSTransportSettings; import ai.madara.transport.TransportType; //Create transport settings for a multicast transport QoSTransportSettings settings = new QoSTransportSettings (); settings.setHosts(new String[]{"239.255.0.1:4150"}); settings.setType(TransportType.MULTICAST_TRANSPORT); settings.setRebroadcastTtl(2); settings.enableParticipantTtl(1); KnowledgeBase knowledge = new KnowledgeBase("", settings);
The MADARA architecture monitors two different types of bandwidth usage: sending and receiving bandwidth. Setting a limit for either of these will serve as a hard limit that does not differentiate between priorities of information. Once a packet is sent, the bandwidth counters are updated, and if the bytes per second rate is higher than the limit you set, all packets will be dropped until the rate dips below the limit. These limiters do not currently look at the size of the packet that is being sent out, so if you are at 99,500 B/s and your limit is 100,000 B/s, it will try to send any next packet (even if it 1MB), update the bandwidth counter, and then not send another packet until you reach 99,500 B/s sent over the past 10s.
For a more flexible bandwidth option that you can configure, see Bandwidth Filters.
Deadline enforcement is concerned with enforcing latency deadlines between reasoning entitites on the network. Deadline enforcement requires some type of time synchronization protocol between agents in the network, preferably accurate to within a second, for it to be useful to the MADARA entities.
Enforcing send bandwidth limits
This type of enforcement is useful if you want to make sure no agent is using more than a certain limit (e.g., 100KB/s) individually.
Example of enforcing a send bandwidth limit
import ai.madara.knowledge.KnowledgeBase; import ai.madara.transport.QoSTransportSettings; import ai.madara.transport.TransportType; //Create transport settings for a multicast transport QoSTransportSettings settings = new QoSTransportSettings (); settings.setHosts(new String[]{"239.255.0.1:4150"}); settings.setType(TransportType.MULTICAST_TRANSPORT); settings.setSendBandwidthLimit(100000); KnowledgeBase knowledge = new KnowledgeBase("", settings);
Enforcing send bandwidth limits based on received bandwidth
This type of enforcement is useful if you want to make sure the agent is not violating a collective bandwidth limit (e.g., 2MB/s) and is based on the amount of data received per second over the past 10s.
Example of enforcing a total bandwidth limit
import ai.madara.knowledge.KnowledgeBase; import ai.madara.transport.QoSTransportSettings; import ai.madara.transport.TransportType; //Create transport settings for a multicast transport QoSTransportSettings settings = new QoSTransportSettings (); settings.setHosts(new String[]{"239.255.0.1:4150"}); settings.setType(TransportType.MULTICAST_TRANSPORT); settings.setTotalBandwidthLimit(100000); KnowledgeBase knowledge = new KnowledgeBase("", settings);
Note that even though this is keying off received bandwidth, it affects messages being sent (i.e., enforcement is applied when the agent attempts to send or rebroadcast knowledge to the network).
Deadline enforcement aims to discard received packets that are too old to be useful to agent state reasoning. Handling old packets can significantly impede performance in an agent-saturated network, so clearing your queues quickly can aid the agent network.
Example of enforcing a transport latency deadline
import ai.madara.knowledge.KnowledgeBase; import ai.madara.transport.QoSTransportSettings; import ai.madara.transport.TransportType; //Create transport settings for a multicast transport QoSTransportSettings settings = new QoSTransportSettings (); settings.setHosts(new String[]{"239.255.0.1:4150"}); settings.setType(TransportType.MULTICAST_TRANSPORT); settings.setDeadline(10); KnowledgeBase knowledge = new KnowledgeBase("", settings);
User callbacks can also be inserted into the transport layer to modify payloads. The MADARA transport system allows users to insert callbacks into three key operations: receive, send, and rebroadcast.
Filter callbacks are given a number of arguments that are relevant to the filtering operation. As of version 1.1.9, these arguments include the following:
[0]
: The knowledge record that the filter is acting upon[1]
: The name of the knowledge record, if applicable ("" if unnamed, but this should never happen)[2]
: The type of operation calling the filter (integer valued). Valid types are: IDLE_OPERATION
(should never see), SENDING_OPERATION
(transport is trying to send the record), RECEIVING_OPERATION
(transport has received the record and is ready to apply the update), REBROADCASTING_OPERATION
(transport is trying to rebroadcast the record -- only happens if rebroadcast is enabled in Transport Settings)[3]
: Bandwidth used while sending through this transport, measured in bytes per second.[4]
: Bandwidth used while receiving from this transport, measured in bytes per second.[5]
: Message timestamp (when the message was originally sent, in seconds)[6]
: Current timestamp (the result of time (NULL))[7]
: Knowledge Domain (partition of the knowledge updates)[8]
: Originator (identifier of sender, usually host:port)The filter can add data to the payload by pushing a variable name (string) followed by a value, which can be a double, string, integer, byte array, or array of integers or doubles, just as you would do with a set operation. This can be useful if other reasoners in the network are expecting additional meta data for the update (which they are free to strip out or ignore in a receive filter, if they don't need the information).
The filter can also access any variable in the Knowledge_Base through the Variables facade. With the arguments and variables interfaces, developers can respond to transport events in highly dynamic and extensible ways.
To create a bandwidth filter, we simply need to create a filter that looks at argument index 3 (how much bandwidth we have used sending) or argument index 4 (how much bandwidth is being used--i.e., what we have received in bytes per second), depending on which type of bandwidth enforcement we want to do.
In the following example, we drop any packet members if we are over 100,000 bytes per second sending within the past 10s.
Example of enforcing a send bandwidth limit via filtering
import ai.madara.knowledge.KnowledgeRecord; import ai.madara.knowledge.Variables; import ai.madara.knowledge.KnowledgeList; import ai.madara.transport.filters.RecordFilter; import ai.madara.knowledge.KnowledgeType; public class SendBandwidthLimit implements RecordFilter { // Do not allow more than 100k bytes per second public KnowledgeRecord filter(KnowledgeList args, Variables variables) { KnowledgeRecord result = new KnowledgeRecord(); // only modify the return value with arg[0] if we are under 100KB/s if (args.get(3).toIntegerValue() < 100000) { result = args[0]; } return result; } public static void main (String...args) throws InterruptedException, Exception { QoSTransportSettings settings = new QoSTransportSettings(); settings.setHosts(new String[]{"239.255.0.1:4150"}); settings.setType(TransportType.MULTICAST_TRANSPORT); // add the above filter for all file types, applied before sending settings.add_send_filter (KnowledgeType.ALL_TYPES, new SendBandwidthLimit()); settings.add_rebroadcast_filter (KnowledgeType.ALL_TYPES, new SendBandwidthLimit()); // create a knowledge base with the multicast transport settings KnowledgeBase knowledge = new KnowledgeBase("", settings); // do normal reasoning, read files, or whatever other logic is needed ... } }
Filters can inspect argument indices 5 and 6 for information on the sent and received time for each packet. The difference between these two arguments is called the packet latency, and this latency value can inform the filter of deadline violations.
Example of enforcing a network latency deadline via filtering
import ai.madara.knowledge.KnowledgeRecord; import ai.madara.knowledge.Variables; import ai.madara.knowledge.KnowledgeList; import ai.madara.transport.filters.RecordFilter; public class DeadlineEnforcement implements RecordFilter { // Check the deadline public KnowledgeRecord filter(KnowledgeList args, Variables variables) { KnowledgeRecord result = new KnowledgeRecord(); // args[5] is sent time, args[6] is current time // keep any packet with a latency of less than 5 seconds if (args[6].toIntegerValue() - args[5].toIntegerValue() < 5) { result = args[0]; } return result; } public static void main (String...args) throws InterruptedException, Exception { QoSTransportSettings settings = new QoSTransportSettings(); settings.setHosts(new String[]{"239.255.0.1:4150"}); settings.setType(TransportType.MULTICAST_TRANSPORT); // add the above filter for all file types, applied before sending settings.add_send_filter (KnowledgeType.ALL_TYPES, new DeadlineEnforcement()); settings.add_rebroadcast_filter (KnowledgeType.ALL_TYPES, new DeadlineEnforcement()); // create a knowledge base with the multicast transport settings KnowledgeBase knowledge = new KnowledgeBase("", settings); // do normal reasoning, read files, or whatever other logic is needed ... } }
Filters that drop packets can be very useful, but developers also have unlimited options for inflating or reducing packets and updates within the packets. Packet shaping is any operation that mutates a packet element into a different form. This can be operations like converting packets into XML, resizing an image payload, or even encrypting part of a packet with a private key.
In the following example, we simply encapsulate any string payload with the xml elements <item>
and </item>
.
Example of shaping a payload before it gets sent out
import ai.madara.knowledge.KnowledgeRecord; import ai.madara.knowledge.Variables; import ai.madara.knowledge.KnowledgeList; import ai.madara.transport.filters.RecordFilter; import java.util.StringBuffer; public class AddItemTag implements RecordFilter { // Add an item tag around the record public KnowledgeRecord filter(KnowledgeList args, Variables variables) { KnowledgeRecord result = new KnowledgeRecord(); // encase it in an <item> tag StringBuffer buffer = new StringBuffer(); buffer.add("<item>"); buffer.add(args[0].toString(); buffer.add("</item>"); result.set(buffer.toString()); return result; } public static void main (String...args) throws InterruptedException, Exception { QoSTransportSettings settings = new QoSTransportSettings(); settings.setHosts(new String[]{"239.255.0.1:4150"}); settings.setType(TransportType.MULTICAST_TRANSPORT); // add the above filter for all file types, applied before sending settings.add_send_filter (KnowledgeType.ALL_TYPES, new AddItemTag()); settings.add_rebroadcast_filter (KnowledgeType.ALL_TYPES, new AddItemTag()); // create a knowledge base with the multicast transport settings KnowledgeBase knowledge = new KnowledgeBase("", settings); // do normal reasoning, read files, or whatever other logic is needed ... } }
Aggregate filters differ from individual filters in that aggregate filters take the entire map of records, are generally more efficient, and are more flexible.
We will highlight the utility and power of aggregate filters with a simple example which adds the id of the current process as metadata to every message sent. It then strips the id from the aggregate before applying the information to the knowledge base.
import ai.madara.knowledge.KnowledgeRecord; import ai.madara.knowledge.Variables; import ai.madara.knowledge.KnowledgeList; import ai.madara.knowledge.EvalSettings; import ai.madara.transport.filters.RecordFilter; import java.util.StringBuffer; public class AddOrEraseId implements RecordFilter { // Add an id if it doesn't exist. Strip id if it does. public KnowledgeRecord filter(Packet packet, TransportContext context, Variables variables) { KnowledgeRecord result = new KnowledgeRecord(); if(!packet.exists("id")) { packet.add("id",variables.get(".id")); } else { packet.erase("id"); } return result; } public static void main (String...args) throws InterruptedException, Exception { QoSTransportSettings settings = new QoSTransportSettings(); settings.setHosts(new String[]{"239.255.0.1:4150"}); settings.setType(TransportType.MULTICAST_TRANSPORT); // add the above filter for all file types, applied before sending settings.addSendFilter (new AddOrEraseId()); settings.addReceiveFilter (new AddOrEraseId()); // create a knowledge base with the multicast transport settings KnowledgeBase knowledge = new KnowledgeBase("", settings); EvalSettings queueUntilLater = new EvalSettings(); queueUntilLater.setDelaySendingModifieds(true); // set id so we have access to it in the aggregate outgoing filter knowledge.set (".id", 1); // build a packet from this id with some information knowledge.set ("occupation", "Banker", queueUntilLater); knowledge.set ("age", 43, queueUntilLater); // a final piece of data with default settings will activate the // aggregate filter knowledge.set ("money", 553200.50); } }
MADARA transports can be configured to trust or ban lists of peers. The banned list can be useful for mitigating known faulty sensors that are flooding the network as well as enforcing basic security. The accept list is more useful for security policies and is significantly more stringent. Once a node is added to the trusted list, anything not on the trusted list will be automatically denied any update abilities to the local context.
Example of adding peers to trusted list
import ai.madara.transport.QoSTransportSettings; // only trust the agents "agent2" and "agent3" QoSTransportSettings settings = new QoSTransportSettings(); settings.addTrustedPeer("agent2"); settings.addTrustedPeer("agent3"); // others could add "agent1" to their trusted list KnowledgeBase knowledge = new KnowledgeBase("", settings);
The above code has agent1 set his unique identifier to "agent1". Let's show how we could make a peer that doesn't trust knowledge from the above entity.
Example of adding peers to banned list
import ai.madara.transport.QoSTransportSettings; // Do not trust "agent1" QoSTransportSettings settings = new QoSTransportSettings(); settings.addBannedPeer("agent1"); // others could add "agent1" to their trusted list KnowledgeBase knowledge = new KnowledgeBase("", settings);
Before deploying MADARA applications into real-world, real-time situations (especially wireless situations), developers should probably test that their applications work despite packet loss. MADARA provides first class support for dropping packets at deterministic and random intervals.
The deterministic policy is enforced via a stride scheduler within the Packet Scheduler.
Example of setting a deterministic non-bursty packet drop policy
import ai.madara.transport.QoSTransportSettings; import ai.madara.transport.DropType; QoSTransportSettings settings = new QoSTransportSettings(); /** * Set a drop rate of 20% in a deterministic manner (1 drop, then 4 successful sends) **/ settings.updateDropRate(.2, DropType.PACKET_DROP_DETERMINISTIC, 1); KnowledgeBase knowledge = new KnowledgeBase("", settings);
Burst usage
When updating a drop rate, you can specify a drop burst type. Bursts are sequences of drops in a row that may prove useful when trying to mimic real world drop patterns. Burst rates can cause unusual patterns within the deterministic policy settings that may cause a higher rate than intended.
Example of setting a deterministic bursty packet drop policy
import ai.madara.transport.QoSTransportSettings; import ai.madara.transport.DropType; QoSTransportSettings settings = new QoSTransportSettings(); /** * Set a drop rate of 20% in a deterministic manner with 2 successive drops (burst rate = 2) **/ settings.updateDropRate(.2, DropType.PACKET_DROP_DETERMINISTIC, 2); KnowledgeBase knowledge = new KnowledgeBase("", settings);
The probablistic policy enforces a uniform distribution of drops at a target rate.
Normal usage
The following example shows how to set a 20% drop rate in a probablistic manner without scheduled bursts.
Example of setting a deterministic non-bursty packet drop policy
import ai.madara.transport.QoSTransportSettings; import ai.madara.transport.DropType; QoSTransportSettings settings = new QoSTransportSettings(); // Set a drop rate of 20% in a probablistic manner settings.updateDropRate(.2, DropType.PACKET_DROP_PROBABLISTIC, 1); KnowledgeBase knowledge = new KnowledgeBase("", settings);
Burst usage
When updating a drop rate, you can specify a drop burst type. Bursts are sequences of drops in a row that may prove useful when trying to mimic real world drop patterns.
Example of setting a deterministic bursty packet drop policy
import ai.madara.transport.QoSTransportSettings; import ai.madara.transport.DropType; QoSTransportSettings settings = new QoSTransportSettings(); /** * Set a drop rate of 20% in a probablistic manner with 2 successive drops (burst rate = 2) **/ settings.updateDropRate(.2, DropType.PACKET_DROP_PROBABLISTIC, 2); KnowledgeBase knowledge = new KnowledgeBase("", settings);
The Java module is fully documented with standard java documentation. Please use the API at JavaDocs or tell your editor of choice to load source files from the Git repository.
Java Guide Series
Architecture | Knowledge Base | Networking | Containers | Threads
Wiki: Home
Wiki: JavaInteractingWithTheKnowledgeBase
Wiki: JavaKnowledgeContainers
Wiki: JavaMadaraArchitecture
Wiki: JavaWorkingWithThreads