Skip to content

Commit 06d969d

Browse files
author
Matthieu Riou
committed
Starting to move stuff around to have more treatment happening in pure code.
1 parent feb9263 commit 06d969d

File tree

4 files changed

+127
-79
lines changed

4 files changed

+127
-79
lines changed

src/Comm.hs

Lines changed: 27 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module Comm
22
( IPPeer(..), parseHeader, localServer, serverDispatch, tunnel,
3-
toCharArray, fromCharArray
3+
buildHeader, toCharArray, fromCharArray
44
) where
55

66
import Network.Socket
@@ -21,7 +21,7 @@ import Debug.Trace
2121

2222
import KTable
2323
import Globals
24-
import Kad (storeReceive, nodeLookupReceive, nodeLookupCallback, valueLookupCallback)
24+
import Kad -- (storeReceive, nodeLookupReceive, nodeLookupCallback, valueLookupCallback)
2525

2626
-- TODO shut out nodes that are too chatty
2727
-- Header: version - 1 byte
@@ -48,6 +48,13 @@ instance Peer IPPeer where
4848
sendLookupReply = sendLookupReplyIP
4949
sendStore = sendStoreIP
5050
sendValueReply = sendValueReplyIP
51+
sendToPeer = sendToPeerIP
52+
serPeer p = (serIP $ host p) ++ (serPort $ port p) ++ toCharArray (nodeId p) 20
53+
54+
deserIP = intercalate "." . map (show . ord)
55+
serIP = map (chr . read) . split '.'
56+
deserPort = (show ::Int -> String) . fromCharArray
57+
serPort = (flip toCharArray 2) . read
5158

5259
instance Node IPPeer where
5360
nodeId = ipNodeId
@@ -65,35 +72,35 @@ data Header = Header { msgVersion ::Int, msgOp ::KadOp, msgUid ::Word64, sender
6572
-- operation itself is tracked by a secondary id stored locally. Supports both
6673
-- node and value lookup using the last boolean
6774
sendLookupIP peers nid lookupId valL = forM peers (\p -> do
68-
msgId <- liftIO newUid
75+
msgId <- newUid
6976
me <- askLocalId
7077
let msg = buildHeader (if valL then ValueLookupOp else NodeLookupOp) msgId me ++ toCharArray nid 20
7178

7279
newWaitingReply p (if valL then ValueLookupReplyOp else NodeLookupReplyOp) msgId lookupId
73-
liftIO $ sendToPeer msg p ) >> return ()
80+
liftIO $ sendToPeerIP msg p ) >> return ()
7481

7582
-- Sends the reply to a node lookup query, sending k nodes and reproducing the
7683
-- received message id.
7784
sendLookupReplyIP peer nodes msgId valL = do
7885
me <- askLocalId
7986
let msg = buildHeader (if valL then ValueLookupReplyOp else NodeLookupReplyOp) msgId me ++ serPeers nodes
80-
liftIO $ sendToPeer msg peer
87+
liftIO $ sendToPeerIP msg peer
8188

8289
sendValueReplyIP peer val msgId = do
8390
me <- askLocalId
8491
let msg = buildHeader ValueLookupReplyOp msgId me ++
8592
if length val `mod` 26 == 0 then val ++ " " else val
86-
liftIO $ sendToPeer msg peer
93+
liftIO $ sendToPeerIP msg peer
8794

8895
sendStoreIP peers key value storeId = forM peers (\p -> do
89-
msgId <- liftIO newUid
96+
msgId <- newUid
9097
me <- askLocalId
9198
let msg = buildHeader StoreOp msgId me ++ toCharArray key 20 ++ value
9299

93100
newWaitingReply p StoreReplyOp msgId storeId
94-
liftIO $ sendToPeer msg p ) >> return ()
101+
liftIO $ sendToPeerIP msg p ) >> return ()
95102

96-
sendToPeer msg peer = do
103+
sendToPeerIP msg peer = do
97104
phandle <- openPeerHandle (host peer) (port peer)
98105
sendstr phandle msg
99106
closePeerHandle phandle
@@ -176,6 +183,16 @@ parseHeader msg = runState parseHeader' msg
176183
put r
177184
return $ fn v
178185

186+
-- Deserializes peers from message strings using the opposite transformation from ser
187+
deserPeers = map deserPeer . splitEvery 26
188+
deserPeer s =
189+
let (h,rest) = splitAt 4 s
190+
(p,nid) = splitAt 2 rest
191+
in IPPeer (deserIP h) (deserPort p) (fromCharArray nid)
192+
193+
split delim = unfoldr (\b -> fmap (const . (second $ drop 1) . break (==delim) $ b) . listToMaybe $ b)
194+
splitEvery n = takeWhile (not . null) . unfoldr (Just . splitAt n)
195+
179196
tunnel :: (SockAddr -> String -> ServerState p ()) -> ((SockAddr -> String -> IO ()) -> IO ()) -> ServerState p ()
180197
tunnel f k = do
181198
gs <- ask
@@ -196,61 +213,11 @@ localServer port handlerFn = do
196213
handlerFn addr msg
197214
procMessages sock
198215

199-
200-
-- Utility functions to serialize messages
201-
--
202-
203-
buildHeader optype msgId sender =
204-
(chr 1) : optypeStr : toCharArray (toInteger msgId) 8 ++ serPeer sender
205-
where optypeStr = case optype of
206-
PingOp -> chr 1
207-
PingReplyOp -> chr 2
208-
NodeLookupOp -> chr 3
209-
NodeLookupReplyOp -> chr 4
210-
StoreOp -> chr 5
211-
StoreReplyOp -> chr 6
212-
ValueLookupOp -> chr 7
213-
ValueLookupReplyOp -> chr 8
214-
215-
-- Convert an integer to a string of n bytes
216-
toCharArray:: Integer -> Int -> [Char]
217-
toCharArray num depth = map (chr . fromInteger) (toBytes num $ toInteger depth)
218-
219-
-- Converts a string to its numeric value by considering that each character is a
220-
-- byte in a n byte number
221-
fromCharArray :: (Bits a) => [Char] -> a
222-
fromCharArray str = foldl (\acc ch -> shift acc 8 + fromIntegral (ord ch)) 0 str
223-
224-
-- Serializes peers
225-
serPeers = concat . map serPeer
226-
serPeer p = (serIP $ host p) ++ (serPort $ port p) ++ toCharArray (nodeId p) 20
227-
228-
-- Deserializes peers from message strings using the opposite transformation from ser
229-
deserPeers = map deserPeer . splitEvery 26
230-
deserPeer s =
231-
let (h,rest) = splitAt 4 s
232-
(p,nid) = splitAt 2 rest
233-
in IPPeer (deserIP h) (deserPort p) (fromCharArray nid)
234-
235-
deserIP = intercalate "." . map (show . ord)
236-
serIP = map (chr . read) . split '.'
237-
deserPort = (show ::Int -> String) . fromCharArray
238-
serPort = (flip toCharArray 2) . read
239-
240-
split delim = unfoldr (\b -> fmap (const . (second $ drop 1) . break (==delim) $ b) . listToMaybe $ b)
241-
splitEvery n = takeWhile (not . null) . unfoldr (Just . splitAt n)
242-
243-
-- Decomposes a number into its successive byte values or, said differently, converts
244-
-- a number to base 2^8.
245-
toBytes :: Integer -> Integer -> [Integer]
246-
toBytes num depth = unfoldr byteMod (num,depth-1)
247-
where byteMod (num,exp) = let (d,m) = divMod num (2^(exp*8))
248-
in if exp < 0 then Nothing else Just (d, (m,exp-1))
249-
250216
-- Build a new peer from packet address and header peer info
251217
toPeer addr pr = let (h,p) = span (/=':') (show addr)
252218
in IPPeer h (port pr) (nodeId pr)
253219

220+
254221
-- Tried below but host is a Word 32...
255222
-- case addr of
256223
-- SockAddrInet port host -> Peer (show host) (show port) (-1)

src/Globals.hs

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@
33

44
module Globals (
55
Peer(..),
6-
RunningOps(..), ServerState, KadOp(..), GlobalData(..), HandlerFn(..),
6+
RunningOps(..), ServerState, KadOp(..), GlobalData(..), HandlerFn(..), WaitingReply(..),
77
runServer, askRoutingT, askRunningOpsT, askKTree, readKTree, askLocalId,
8+
readRoutingT, readRunningOpsT,
89
newRunningLookup, runningLookup, runningLookupDone, readStore,
9-
newWaitingReply, waitingReply, newUid, insertInKTree,
10+
newWaitingReply, waitingReply, nrandoms, newUid, insertInKTree,
1011
insertInStore, lookupInStore, deleteInStore,
1112
modifyTVar
1213
) where
1314

1415
import Data.Word
1516
import Control.Monad.Reader
1617
import Control.Concurrent.STM
17-
import System.Random
18+
import Control.Applicative((<$>))
1819

1920
import qualified Data.Map as M
2021
import Debug.Trace
@@ -26,6 +27,8 @@ class Peer p where
2627
sendLookupReply :: p -> [p] -> Word64 -> Bool -> ServerState p ()
2728
sendStore :: [p] -> Integer -> String -> Word64 -> ServerState p ()
2829
sendValueReply :: p -> String -> Word64 -> ServerState p ()
30+
sendToPeer :: String -> p -> IO ()
31+
serPeer :: p -> String
2932

3033
type RoutingTable p = M.Map Word64 (WaitingReply p)
3134

@@ -50,21 +53,22 @@ data GlobalData p = GlobalData {
5053
runningOpsTable :: TVar (RunningOpsTable p),
5154
globalKTree :: TVar (KTree p),
5255
localPeer :: p,
53-
localStore :: TVar (M.Map Integer String)
56+
localStore :: TVar (M.Map Integer String),
57+
randoms :: TVar [Integer]
5458
}
5559

5660
newtype ServerState p a = ServerState {
5761
runSS:: ReaderT (GlobalData p) IO a
58-
} deriving (Monad, MonadIO, MonadReader (GlobalData p))
62+
} deriving (Monad, MonadIO, MonadReader (GlobalData p), Functor)
5963

6064
-- runServer rt rot kt localId st = runReaderT (runSS st) (rt, rot, kt, localId)
6165
runServer gd st = runReaderT (runSS st) gd
6266

63-
askRoutingT = liftM routingTable ask
64-
askRunningOpsT = liftM runningOpsTable ask
65-
askKTree = liftM globalKTree ask
66-
askLocalId = liftM localPeer ask
67-
askLocalStore = liftM localStore ask
67+
askRoutingT = routingTable <$> ask
68+
askRunningOpsT = runningOpsTable <$> ask
69+
askKTree = globalKTree <$> ask
70+
askLocalId = localPeer <$> ask
71+
askLocalStore = localStore <$> ask
6872

6973
readRoutingT = do { rt <- askRoutingT; liftIO . atomically $ readTVar rt }
7074
readRunningOpsT = do { rot <- askRunningOpsT; liftIO . atomically $ readTVar rot }
@@ -74,13 +78,21 @@ readStore = do { s <- askLocalStore; liftIO . atomically $ readTVar s }
7478
modifyTVar :: (a -> a) -> TVar a -> STM ()
7579
modifyTVar f tv = readTVar tv >>= writeTVar tv . f
7680

81+
nrandoms n = do
82+
rst <- randoms <$> ask
83+
liftIO . atomically $ do
84+
rs <- readTVar rst
85+
let nrs = take n rs
86+
writeTVar rst (drop n rs)
87+
return $ map fromInteger nrs
88+
7789
-- Helper to execute atomically a ST action on a value asked in ServerState
7890
atomicOnAsk :: (MonadIO m) => m a -> (a -> STM b) -> m b
7991
atomicOnAsk asked action = asked >>= liftIO . atomically . action
8092

8193
insertInStore k v = atomicOnAsk askLocalStore (modifyTVar $ M.insert k v)
8294

83-
lookupInStore k = liftM (M.lookup k) $ atomicOnAsk askLocalStore readTVar
95+
lookupInStore k = M.lookup k <$> atomicOnAsk askLocalStore readTVar
8496

8597
deleteInStore k = atomicOnAsk askLocalStore (\lst -> do
8698
ls <- readTVar lst
@@ -94,7 +106,7 @@ newRunningLookup nid rs ps qs lookupId handlerFn =
94106
let nrl = RunningLookup nid rs ps qs handlerFn
95107
in atomicOnAsk askRunningOpsT $ modifyTVar (M.insert lookupId nrl)
96108

97-
runningLookup lookupId = liftM (M.lookup lookupId) readRunningOpsT
109+
runningLookup lookupId = M.lookup lookupId <$> readRunningOpsT
98110

99111
runningLookupDone:: Word64 -> ServerState p (Maybe (RunningOps p))
100112
runningLookupDone lookupId = atomicOnAsk askRunningOpsT (\rot -> do
@@ -123,6 +135,6 @@ insertInKTree peer = do
123135
lid <- askLocalId
124136
atomicOnAsk askKTree $ modifyTVar (flip (kinsert $ nodeId lid) $ peer)
125137

126-
newUid:: IO Word64
127-
newUid = liftM fromInteger $ randomRIO (0, 2^64 - 1)
138+
newUid:: (ServerState p) Word64
139+
newUid = fromInteger <$> head <$> nrandoms 1
128140

src/Kad.hs

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
module Kad
44
( startNode, nodeLookup, valueLookup, store,
5-
nodeLookupReceive, storeReceive, nodeLookupCallback, valueLookupCallback
5+
nodeLookupReceive, storeReceive, nodeLookupCallback, valueLookupCallback,
6+
fromCharArray, toCharArray, buildHeader, serPeers
67
) where
78

89
import Prelude hiding (error)
@@ -11,10 +12,13 @@ import Data.Word
1112
import Data.Maybe
1213
import Data.List(sortBy, (\\), delete, nub)
1314
import Data.Bits
15+
import Data.Char(chr,ord)
16+
import Data.List (genericDrop,unfoldr,intercalate)
1417

1518
import Control.Monad(liftM, liftM2, liftM3, forM)
1619
import Control.Concurrent.STM
1720
import Control.Monad.Trans(liftIO)
21+
import Control.Arrow(second)
1822

1923
import System.Random
2024
import System.Log.Logger
@@ -49,7 +53,7 @@ valueLookup = genericLookup True
4953

5054
genericLookup valL nid handlerFn = do
5155
debug $ "Starting a node lookup for node " ++ show nid
52-
uid <- liftIO newUid
56+
uid <- newUid
5357
ktree <- readKTree
5458

5559
-- k closest elements in the k tree
@@ -64,9 +68,36 @@ genericLookup valL nid handlerFn = do
6468
-- sending a lookup on the alpha nodes picked
6569
sendLookup ps nid uid valL
6670

71+
genericLookup2 valL nid handlerFn = do
72+
debug $ "Starting a node lookup for node " ++ show nid
73+
uids <- nrandoms (alpha + 1)
74+
let (lookupId, msgIds) = splitAt 1 uids
75+
ktree <- readKTree
76+
me <- askLocalId
77+
routeT <- readRoutingT
78+
opsT <- readRunningOpsT
79+
80+
let (msgs, nopsT, nrouteT) = prepareLookups valL nid handlerFn opsT routeT me ktree (head lookupId) msgIds
81+
-- TODO update maps
82+
liftIO . mapM (uncurry sendToPeer) $ msgs
83+
84+
prepareLookups valL nid handlerFn opsT routeT me ktree lookupId msgIds = (mps, newOpsT, newRouteT)
85+
where (mps, newRouteT) = foldr (\(mid,p) acc ->
86+
comb (prepareLookup valL nid p routeT me lookupId mid) acc) ([], routeT) (zip msgIds ps)
87+
comb (mp, rt) (arr, ort) = (mp:arr, rt)
88+
kc = kclosest ktree nid
89+
ps = pickAlphaNodes kc
90+
newOpsT = M.insert lookupId (RunningLookup nid kc ps [] handlerFn) opsT
91+
92+
prepareLookup valL nid peer routeT me lookupId msgId = ((msg, peer), newRouteT)
93+
where msg = buildHeader lookpOp msgId me ++ toCharArray nid 20
94+
newRouteT = M.insert msgId (WaitingReply peer replyOp lookupId) routeT
95+
lookpOp = if valL then ValueLookupOp else NodeLookupOp
96+
replyOp = if valL then ValueLookupReplyOp else NodeLookupReplyOp
97+
6798
-- Stores a key / value pair by doing a node lookup and sending a store command
6899
-- to the closest nodes found
69-
store key kdata = nodeLookup key $ PeersHandler (\peers -> (liftIO . putStrLn $ "Sending store to peers " ++ show peers) >> liftIO newUid >>= sendStore peers key kdata)
100+
store key kdata = nodeLookup key $ PeersHandler (\peers -> (liftIO . putStrLn $ "Sending store to peers " ++ show peers) >> newUid >>= sendStore peers key kdata)
70101

71102
-- Received a node lookup request, queries the KTable for the k closest and
72103
-- sends the information back.
@@ -138,7 +169,7 @@ storeReceive msgId key val peer = (liftIO . putStrLn $ "local store for " ++ sho
138169
-- in Vivaldi coordinates.
139170
pickAlphaNodes kc =
140171
let idx = length kc - alpha
141-
in snd $ splitAt idx kc
172+
in drop idx kc
142173

143174
closestNode pivot p1 p2 =
144175
let d1 = nodeId p1 `xor` pivot
@@ -151,3 +182,37 @@ closestNodes pivot = take kdepth . sortBy (closestNode pivot)
151182
debug s = liftM ((++ " " ++ s) . show . nodeId) askLocalId >>= liftIO . debugM "Kad"
152183
error s = liftM ((++ " " ++ s) . show . nodeId) askLocalId >>= liftIO . errorM "Kad"
153184

185+
-- Utility functions to serialize messages
186+
--
187+
188+
buildHeader optype msgId sender =
189+
(chr 1) : optypeStr : toCharArray (toInteger msgId) 8 ++ serPeer sender
190+
where optypeStr = case optype of
191+
PingOp -> chr 1
192+
PingReplyOp -> chr 2
193+
NodeLookupOp -> chr 3
194+
NodeLookupReplyOp -> chr 4
195+
StoreOp -> chr 5
196+
StoreReplyOp -> chr 6
197+
ValueLookupOp -> chr 7
198+
ValueLookupReplyOp -> chr 8
199+
200+
-- Convert an integer to a string of n bytes
201+
toCharArray:: Integer -> Int -> [Char]
202+
toCharArray num depth = map (chr . fromInteger) (toBytes num $ toInteger depth)
203+
204+
-- Converts a string to its numeric value by considering that each character is a
205+
-- byte in a n byte number
206+
fromCharArray :: (Bits a) => [Char] -> a
207+
fromCharArray str = foldl (\acc ch -> shift acc 8 + fromIntegral (ord ch)) 0 str
208+
209+
-- Serializes peers
210+
serPeers = concat . map serPeer
211+
212+
-- Decomposes a number into its successive byte values or, said differently, converts
213+
-- a number to base 2^8.
214+
toBytes :: Integer -> Integer -> [Integer]
215+
toBytes num depth = unfoldr byteMod (num,depth-1)
216+
where byteMod (num,exp) = let (d,m) = divMod num (2^(exp*8))
217+
in if exp < 0 then Nothing else Just (d, (m,exp-1))
218+

src/Main.hs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ import Control.Concurrent(forkIO,threadDelay)
1010
import Control.Concurrent.STM
1111
import Control.Monad.Trans(liftIO)
1212
import Control.Monad(forM)
13+
import Control.Applicative((<$>))
1314

1415
import System.Log.Logger
16+
import System.Random
1517
import Debug.Trace
1618

1719
import KTable
@@ -26,8 +28,10 @@ newNode myPort otherPort = do
2628
trot <- newTVarIO M.empty
2729
tkt <- newTVarIO $ kbleaf S.empty
2830
ls <- newTVarIO M.empty
31+
rs <- randomRs (0, 2^64 - 1) <$> newStdGen
32+
rsv <- newTVarIO rs
2933
let myId = intSHA1 myPort
30-
let gd = GlobalData trt trot tkt (IPPeer "127.0.0.1" (show myPort) myId) ls
34+
let gd = GlobalData trt trot tkt (IPPeer "127.0.0.1" (show myPort) myId) ls rsv
3135
-- nodeId = sha1 port
3236
debug $ "Starting on port " ++ show myPort
3337
forkIO $ runServer gd (start myPort otherPort)

0 commit comments

Comments
 (0)