Introduction
As a more advanced and practical exercise for learning how to use the data streaming library conduit, I thought I would recreate the asynchronous chat server from Simon Marlow's book, Parallel and Concurrent Programming in Haskell. If you haven't read it yet, it's required reading for anyone who wants to build performant, real-world Haskell applications.
This tutorial assumes you that already know Haskell, and that you've read these other conduit tutorials:
Now, let's get started.
Setting Up
The first thing we will do is set up a new cabal sandbox and install some dependencies:
mkdir conduit-chat
cd conduit-chat
cabal sandbox init
cabal install conduit conduit-combinators network-conduit
I ran into a bug with cabal 1.20, so you might have to run this in order to install conduit-combinators:
cabal install conduit-combinators --constraint 'vector-instances==3.3' --max-backjumps=-1
Simple Server
Let's start with the most basic TCP server.
import Conduit
import Data.Conduit.Network
import qualified Data.ByteString.Char8 as BS
main :: IO ()
main = runTCPServer (serverSettings 4000 "*") $ \appData ->
appSource appData $$ awaitForever $ liftIO . BS.putStr
Build and run the server using:
cabal exec runhaskell server.hs
And connect to it using telnet:
telnet 127.0.0.1 4000
Not a very interesting server, but as you might have noticed with the original async chat server there is a lot of boilerplate when working with async and network Handles. network-conduit relieves most of that boilerplate.
Logging In
The first thing we want to do is allow users to log in.
The original login function is called readName
.
The only login requirement is a username
that doesn't conflict with other users' names.
We are going to have the same function, but we're going to use conduits.
We might want to do something like this with readName
as a Conduit
:
main :: IO ()
main = do
server <- newServer
runTCPServer (serverSettings 4000 "*") $ \appData ->
appSource appData $$ readName server =$ appSink appData
However, we need a way to get the Client
after readName
finishes,
but the type of appSink
is Sink ByteString IO ()
.
We can use fuseUpstream for this.
Here is the result:
{-# LANGUAGE OverloadedStrings, RecordWildCards, LambdaCase #-}
import Conduit
import Data.Conduit
import Data.Conduit.Network
import qualified Data.ByteString.Char8 as BS
import Text.Printf (printf)
import Control.Concurrent.STM
import qualified Data.Map as Map
import Data.Word8 (_cr)
import Control.Monad
import Control.Concurrent.Async (concurrently)
import Control.Exception (finally)
type ClientName = BS.ByteString
data Client = Client
{ clientName :: ClientName
, clientApp :: AppData
}
instance Show Client where
show client =
BS.unpack (clientName client) ++ "@"
++ show (appSockAddr $ clientApp client)
data Server = Server {
clients :: TVar (Map.Map ClientName Client)
}
data Message = Notice BS.ByteString
| Tell ClientName BS.ByteString
| Broadcast ClientName BS.ByteString
| Command BS.ByteString
deriving Show
newServer :: IO Server
newServer = do
c <- newTVarIO Map.empty
return Server { clients = c }
newClient :: ClientName -> AppData -> STM Client
newClient name app = do
return Client { clientName = name
, clientApp = app
}
checkAddClient :: Server -> ClientName -> AppData -> IO (Maybe Client)
checkAddClient server@Server{..} name app = atomically $ do
clientmap <- readTVar clients
if Map.member name clientmap then
return Nothing
else do
client <- newClient name app
writeTVar clients $ Map.insert name client clientmap
return (Just client)
-- show
readName :: Server -> AppData -> ConduitM BS.ByteString BS.ByteString IO Client
readName server app = go
where
go = do
yield "What is your name? " $$ appSink app
name <- lineAsciiC $ takeCE 80 =$= filterCE (/= _cr) =$= foldC
if BS.null name then
go
else do
ok <- liftIO $ checkAddClient server name app
case ok of
Nothing -> do
respond "The name '%s' is in use, please choose another\n" name
go
Just client -> do
respond "Welcome, %s!\n" name
return client
respond msg name = yield $ BS.pack $ printf msg $ BS.unpack name
main :: IO ()
main = do
server <- newServer
runTCPServer (serverSettings 4000 "*") $ \app -> do
client <-
appSource app $$ readName server app `fuseUpstream` appSink app
printf "%s has connected" $ BS.unpack $ clientName client
Let's Chat!
Now that we have a client connected and logged in, let's start chatting!
In the original chat server,
a client has a TChan
to receive messages.
We can use conduits for this.
First, install stm-conduit:
cabal install stm-conduit
and we will use it like this:
import Data.Conduit.TMChan
data Client = Client
{ clientName :: ClientName
, clientChan :: TMChan Message
, clientApp :: AppData
}
newClient :: ClientName -> AppData -> STM Client
newClient name app = do
chan <- newTMChan
return Client { clientName = name
, clientApp = app
, clientChan = chan
}
main :: IO ()
main = do
server <- newServer
runTCPServer (serverSettings 4000 "*") $ \app -> do
(fromClient, client) <-
appSource app $$+ readName server app `fuseUpstream` appSink app
void $ concurrently
(fromClient $$+- linesUnboundedAsciiC =$= mapC Command =$ sinkTMChan (clientChan client) True)
(sourceTMChan (clientChan client) $$ handleMessage server client =$ appSink app)
We must be sure to use a ResumableSource
with $$+
or else there could be data loss.
With this, all lines sent to the client's socket are turned into Command
s
and written to the client's TMChan
.
Concurrently, Message
s sent to the client's TMChan
are processed by
the conduit handleMessage
.
This is very similar to the original handleMessage
but with two simple changes.
First, instead of readTVar
at the beginning of the message handling loop,
we will use awaitForever
.
Second, instead of output
writing to a socket Handle
,
we write to the client's Sink
using yield
.
Thus output
becomes:
output s = yield $ s `BS.append` "\n"
Here is the result:
sendMessage :: Client -> Message -> STM ()
sendMessage Client{..} msg = writeTMChan clientChan msg
sendToName :: Server -> ClientName -> Message -> STM Bool
sendToName server@Server{..} name msg = do
clientmap <- readTVar clients
case Map.lookup name clientmap of
Nothing -> return False
Just client -> sendMessage client msg >> return True
(<++>) = BS.append
handleMessage :: Server -> Client -> Conduit Message IO BS.ByteString
handleMessage server client@Client{..} = awaitForever $ \case
Notice msg -> output $ "*** " <++> msg
Tell name msg -> output $ "*" <++> name <++> "*: " <++> msg
Broadcast name msg -> output $ "<" <++> name <++> ">: " <++> msg
Command msg -> case BS.words msg of
["/tell", who, what] -> do
ok <- liftIO $ atomically $
sendToName server who $ Tell clientName what
unless ok $ output $ who <++> " is not connected."
-- ignore empty strings
[""] -> return ()
[] -> return ()
-- broadcasts
ws ->
if BS.head (head ws) == '/' then
output $ "Unrecognized command: " <++> msg
else
liftIO $ atomically $
broadcast server $ Broadcast clientName msg
where
output s = yield $ s <++> "\n"
Most other functions are just about the same as the original. Once you put it all together, you should have a working chat server.
Quitting
One last feature we haven't implemented yet is quitting. There are multiple ways a user can quit. First is using the '/quit' command. Another might be to just drop the TCP connection.
There are several ways to implement this, but it seems the simplest method would be to raise an exception to exit out of the conduits and then notify the other users in the exception handler.
Let's do it!
First we will copy over the removeClient
from the original async chat server (with a minor change):
removeClient :: Server -> Client -> IO ()
removeClient server@Server{..} client@Client{..} = atomically $ do
modifyTVar' clients $ Map.delete clientName
broadcast server $ Notice (clientName <++> " has disconnected")
Next we will use it in main
(The conduits have been moved to a separate function called runClient
):
runClient :: ResumableSource IO BS.ByteString -> Server -> Client -> IO ()
runClient clientSource server client@Client{..} =
void $ concurrently
(clientSource $$+- linesUnboundedAsciiC =$ clientSink client)
(sourceTMChan clientChan $$ handleMessage server client =$ appSink clientApp)
removeClient :: Server -> Client -> IO ()
removeClient server@Server{..} client@Client{..} = atomically $ do
modifyTVar' clients $ Map.delete clientName
broadcast server $ Notice (clientName <++> " has disconnected")
-- show
main :: IO ()
main = do
server <- newServer
runTCPServer (serverSettings 4000 "*") $ \app -> do
(fromClient, client) <-
appSource app $$+ readName server app =$ appSink app
(runClient fromClient server client)
`finally` (removeClient server client)
Lastly, we implement the '/quit' command:
handleMessage :: Server -> Client -> Conduit Message IO BS.ByteString
handleMessage server client@Client{..} = awaitForever $ \case
Notice msg -> output $ "*** " <++> msg
Tell name msg -> output $ "*" <++> name <++> "*: " <++> msg
Broadcast name msg -> output $ "<" <++> name <++> ">: " <++> msg
Command msg -> case BS.words msg of
["/tell", who, what] -> do
ok <- liftIO $ atomically $
sendToName server who $ Tell clientName what
unless ok $ output $ who <++> " is not connected."
-- show
["/quit"] -> do
error . BS.unpack $ clientName <++> " has quit"
-- /show
-- ignore empty strings
[""] -> return ()
[] -> return ()
-- broadcasts
ws ->
if BS.head (head ws) == '/' then
output $ "Unrecognized command: " <++> msg
else
liftIO $ atomically $
broadcast server $ Broadcast clientName msg
where
output s = yield $ s <++> "\n"
Conclusion
Here is the whole server put together.
There are many more features that could be added, like more commands (such as '/kick', '/list', and '/help') or support for rooms, but those are exercises for the reader.
If you are looking for more conduit practice, then I recommend implementing a better chat client than telnet.
I hope this has helped you understand conduits in a more applied way.
Happy Hacking!