Skip to content

Commit 2c72d8d

Browse files
author
Matthieu Riou
committed
Protocol level of node lookup initiation.
1 parent a63ca4d commit 2c72d8d

File tree

4 files changed

+272
-4
lines changed

4 files changed

+272
-4
lines changed

src/comm.hs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
module Comm
2+
( KadOp(..), sendLookup, newUid, parseHeader,
3+
) where
4+
5+
import Network.Socket
6+
import Network.BSD
7+
8+
import Data.Word
9+
import Data.List (genericDrop,unfoldr)
10+
import Data.Char(chr,ord)
11+
import Control.Monad(forM, ap, liftM)
12+
import Control.Monad.State
13+
14+
import KTable
15+
import Globals
16+
17+
-- Header: version - 1 byte
18+
-- optype - 1 byte
19+
-- uid - 8 bytes
20+
--
21+
-- Node Lookup: node id - 20 bytes
22+
23+
data PeerHandle = PeerHandle { pSocket :: Socket, pAddress :: SockAddr }
24+
25+
-- Client functions to send operations to other nodes.
26+
--
27+
28+
sendLookup peers nid lookupId = forM peers (\p -> do
29+
msgId <- liftIO newUid
30+
msg <- buildLookupMsg nid msgId
31+
newWaitingReply p NodeLookupOp msgId lookupId
32+
liftIO $ sendToPeer msg p )
33+
34+
where buildLookupMsg nid msgId = liftM (++ toCharArray nid 160) $ buildHeader NodeLookupOp msgId
35+
36+
sendToPeer msg peer = do
37+
phandle <- openPeerHandle (host peer) (port peer)
38+
sendstr phandle msg
39+
closePeerHandle phandle
40+
41+
where sendstr _ [] = return ()
42+
sendstr phandle omsg = do
43+
sent <- sendTo (pSocket phandle) omsg (pAddress phandle)
44+
sendstr phandle (genericDrop sent omsg)
45+
46+
-- Server functions to handle calls coming from other nodes
47+
--
48+
49+
serverDispatch addr msg = do
50+
let (hdr, rest) = parseHeader msg
51+
-- ignoring what we can't handle
52+
if msgVersion hdr /= 1
53+
then return ()
54+
else do wait <- waitingReply (msgUid hdr) (msgOp hdr) (toPeer addr)
55+
case wait of
56+
Nothing -> return ()
57+
opId -> dispatchOp (msgOp hdr) opId rest
58+
59+
where dispatchOp NodeLookupOp opId msg = do
60+
rl <- runningLookup opId
61+
case rl of
62+
Nothing -> return ()
63+
Just rl -> return ()
64+
dispatchOp _ _ _ = return ()
65+
66+
data Header = Header { msgVersion ::Int, msgOp ::KadOp, msgUid ::Word64 }
67+
deriving Show
68+
69+
-- TODO handle things like empty messages, adding error handling
70+
parseHeader msg = runState parseHeader' msg
71+
where parseHeader' = do
72+
ver <- parseVersion
73+
op <- parseOpType
74+
uid <- parseUid
75+
return $ Header ver op uid
76+
77+
parseVersion = consuming 1 (ord . head)
78+
parseOpType = consuming 1 (\x ->
79+
case head x of
80+
'\SOH' -> PingOp
81+
'\STX' -> NodeLookupOp
82+
_ -> UnknownOp )
83+
parseUid = consuming 8 fromCharArray
84+
85+
consuming n fn = do
86+
str <- get
87+
let (v, r) = splitAt n str
88+
put r
89+
return $ fn v
90+
91+
localServer port handlerFn = withSocketsDo $ do
92+
addrinfos <- getAddrInfo (Just (defaultHints {addrFlags = [AI_PASSIVE]})) Nothing (Just port)
93+
let serveraddr = head addrinfos
94+
sock <- socket (addrFamily serveraddr) Datagram defaultProtocol
95+
bindSocket sock (addrAddress serveraddr)
96+
procMessages sock
97+
98+
-- Loops forever processing incoming data
99+
where procMessages sock = do
100+
(msg, _, addr) <- recvFrom sock 1024
101+
handlerFn addr msg
102+
procMessages sock
103+
104+
-- Utility functions to serialize messages
105+
--
106+
107+
buildHeader optype msgId = do
108+
return $ buildHeader' msgId optype
109+
110+
where buildHeader':: (Integral a) => a -> KadOp -> String
111+
buildHeader' uid optype = (chr 1) : optypeStr : toCharArray uid 64
112+
optypeStr = case optype of
113+
PingOp -> chr 1
114+
NodeLookupOp -> chr 2
115+
116+
toCharArray:: (Integral a) => a -> Int -> [Char]
117+
toCharArray num depth = map chr (toBytes num depth)
118+
119+
-- Converts a string to its numeric value by considering that each character is a
120+
-- byte in a n byte number
121+
fromCharArray :: (Num t) => [Char] -> t
122+
fromCharArray str = fst $ foldl (\(acc, exp) ch ->
123+
(acc + fromIntegral (ord ch) * 2^exp, exp+1)) (0,0) (reverse str)
124+
125+
-- Decomposes a number into its successive byte values or, said differently, converts
126+
-- a number to base 2^8.
127+
toBytes :: (Integral t, Integral a) => a -> t -> [Int]
128+
toBytes num depth = unfoldr byteMod (num,depth)
129+
where byteMod (num,exp) = let (d,m) = divMod num (2^exp)
130+
in if exp < 0 then Nothing else Just (fromIntegral d ::Int, (m,exp-8))
131+
132+
toPeer addr =
133+
case addr of
134+
SockAddrInet port host -> Peer (show host) (show port) (-1)
135+
SockAddrInet6 port _ host _ -> Peer (show host) (show port) (-1)
136+
SockAddrUnix str -> error $ "Dont know how to translate " ++ str
137+
138+
openPeerHandle hostname port = do
139+
addrinfos <- getAddrInfo Nothing (Just hostname) (Just port)
140+
let serveraddr = head addrinfos
141+
sock <- socket (addrFamily serveraddr) Datagram defaultProtocol
142+
return $ PeerHandle sock (addrAddress serveraddr)
143+
144+
closePeerHandle phandle = sClose (pSocket phandle)
145+
146+
-- main = do
147+
-- params <- getArgs
148+
-- if head params == "client"
149+
-- then do
150+
-- ph <- openPeerHandle "localhost" "10000"
151+
-- sendPing ph
152+
-- closePeerHandle ph
153+
-- else localServer "10000" (\addr msg -> putStrLn $ msg ++ " / " ++ (show addr))

src/globals.hs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
2+
{-# LANGUAGE NoMonomorphismRestriction #-}
3+
4+
module Globals (
5+
RunningOps(..), ServerState,KadOp(..),
6+
runServer, askRoutingT, askRunningOpsT, askKTree, readKTree,
7+
newRunningLookup, runningLookup, newWaitingReply, waitingReply, newUid,
8+
modifyTVar
9+
) where
10+
11+
import Data.Word
12+
import Control.Monad.Reader
13+
import Control.Concurrent.STM
14+
import System.Random
15+
16+
import qualified Data.Map as M
17+
18+
import KTable
19+
20+
type RoutingTable = M.Map Word64 WaitingReply
21+
22+
data KadOp = UnknownOp | NodeLookupOp | PingOp
23+
deriving (Show, Eq)
24+
25+
data WaitingReply = WaitingReply { waitFromPeer:: Peer, waitOp:: KadOp, waitOpUid:: Word64 }
26+
27+
type RunningOpsTable = M.Map Word64 RunningOps
28+
29+
data RunningOps = RunningLookup { remaining:: [Peer], pending:: [Peer], queried:: [Peer], closest:: [Peer] }
30+
31+
type GlobalData = (TVar RoutingTable, TVar RunningOpsTable, TVar KTree)
32+
33+
newtype ServerState a = ServerState {
34+
runSS:: ReaderT GlobalData IO a
35+
} deriving (Monad, MonadIO, MonadReader GlobalData)
36+
37+
runServer rt rot kt st = runReaderT (runSS st) (rt, rot, kt)
38+
39+
askRoutingT = do { (rt, rot, kt) <- ask; return rt }
40+
askRunningOpsT = do { (rt, rot, kt) <- ask; return rot }
41+
askKTree = do { (rt, rot, kt) <- ask; return kt }
42+
43+
readRoutingT = do { rt <- askRoutingT; liftIO . atomically $ readTVar rt }
44+
readRunningOpsT = do { rot <- askRunningOpsT; liftIO . atomically $ readTVar rot }
45+
readKTree = do { kt <- askKTree; liftIO . atomically $ readTVar kt }
46+
47+
modifyTVar :: TVar a -> (a -> a) -> STM ()
48+
modifyTVar tv f = readTVar tv >>= writeTVar tv . f
49+
50+
-- Inserts a new running lookup in the table of current operations
51+
--
52+
newRunningLookup rs ps lookupId = do
53+
rot <- askRunningOpsT
54+
let nrl = RunningLookup rs ps [] []
55+
liftIO . atomically $ modifyTVar rot (M.insert lookupId nrl)
56+
57+
runningLookup lookupId = do
58+
rot <- readRunningOpsT
59+
return $ M.lookup lookupId rot
60+
61+
newWaitingReply:: Peer -> KadOp -> Word64 -> Word64 -> ServerState ()
62+
newWaitingReply peer op msgId opId = do
63+
rt <- askRoutingT
64+
liftIO . atomically $ modifyTVar rt (M.insert msgId $ WaitingReply peer op opId)
65+
66+
waitingReply msgId op peer = do
67+
rt <- readRoutingT
68+
case M.lookup msgId rt of
69+
Nothing -> return Nothing
70+
Just wr -> return $ if waitFromPeer wr == peer && waitOp wr == op then Just (waitOpUid wr) else Nothing
71+
72+
newUid:: IO Word64
73+
newUid = liftM fromInteger $ randomRIO (0, 2^64 - 1)
74+

src/kad.hs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
2+
module Kad
3+
( nodeLookup
4+
) where
5+
6+
import qualified Data.Map as M
7+
import Data.Word
8+
import Control.Monad(liftM, liftM2, liftM3)
9+
import Control.Concurrent.STM
10+
import Control.Monad.Trans(liftIO)
11+
12+
import KTable
13+
import Globals
14+
import Comm
15+
16+
-- TODO shut out nodes that are too chatty
17+
18+
alpha = 3
19+
20+
nodeLookup:: Integer -> ServerState ()
21+
nodeLookup nid = do
22+
uid <- liftIO newUid
23+
ktree <- readKTree
24+
let kc = kclosest ktree nid
25+
let (rs, ps) = pickAlphaNodes kc
26+
newRunningLookup rs ps uid
27+
sendLookup ps nid uid
28+
return ()
29+
30+
-- Pick the "best" alpha nodes out of the k selected for lookup. For now we only
31+
-- select the 3 last ones but it would be the right place to plug in Vivaldi
32+
-- coordinates.
33+
--
34+
pickAlphaNodes kc =
35+
let idx = length kc - alpha
36+
in case idx of
37+
idx | idx > 0 -> splitAt idx kc
38+
| idx <= 0 -> ([], kc)
39+

src/routing.hs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ data Peer = Peer { host:: String, port:: String, nodeId:: Integer }
2525
deriving Show
2626

2727
instance Eq Peer where
28-
p1 == p2 = nodeId p1 == nodeId p2
28+
p1 == p2 = if nodeId p1 /= -1 && nodeId p2 /= -1
29+
then nodeId p1 == nodeId p2
30+
else host p1 == host p2 && port p1 == port p2
2931

3032
newtype KBucket = KBucket (S.Seq Peer)
3133
deriving (Show, Eq)
@@ -46,7 +48,7 @@ bucketToList (KBucket seq) = F.toList seq
4648
-- ex: 101110 `xor` 100101 = 001011
4749
nxor a b = (a .|. b) `xor` (a .&. b)
4850

49-
-- Inserts the provided peer in the routing table
51+
-- Inserts the provided peer in the node tree
5052
--
5153
kinsert pivot kt peer = update_closest_bucket kt nid insertOrSplit
5254
where
@@ -56,7 +58,7 @@ kinsert pivot kt peer = update_closest_bucket kt nid insertOrSplit
5658
else if bucketLength kb < kdepth
5759
then KLeaf $ bucketInsert kb peer
5860
else if pos == 0 || (nid `nxor` pivot > 2 ^ (pos+1))
59-
then KLeaf kb
61+
then KLeaf kb -- TODO before dropping it, check that it wouldn't be a good replacement for one of the existing bucket node
6062
else traverseKTree KNode KNode pos (splitBucket kb pos) nid insertOrSplit
6163

6264
splitBucket (KBucket seq) pos = pairToNode $ F.foldl separateVals (S.empty,S.empty) seq
@@ -66,7 +68,7 @@ kinsert pivot kt peer = update_closest_bucket kt nid insertOrSplit
6668

6769
nid = nodeId peer
6870

69-
-- Finds at least k nodes closest to the provided id in the routing table.
71+
-- Finds at least k nodes closest to the provided id in the node tree
7072
--
7173
kclosest kt nid = with_closest_bucket kt nid (returnOrRewind [] nid)
7274
where

0 commit comments

Comments
 (0)