Introduction
I have a problem: How I present the few applicative and monadic combinators that I just developed. I could present them as:
- something for multithreaded event handling without inversion of control. Or
- something for paralelization of processes: async without the wait
- for automatic thread control
- for alternative and applicative composition of parallel IO actions
- for indeterminism and asynchronicity effects
- for high level programming at the specification level
- for creating and composing applications by means of a single expression
- for overcoming futures and promises of Scala and JavaScript making them unnecessary
Too much stuff for a single article. Maybe I should split it into pieces and taking more time to write something more extensive and less dense. But I'm lazy and moreover they are only a few primitives, four or five, six with the two state combinators: async
waitEvents
spawn
and react
getSData
and setSData
. No new operators. Breaking the article would hide the big picture. That would not display the beautiful unity of the common solution.
- If you are interested in how the idea came about read the next paragraph
- If you are interested in the internals, read the section "Enter the monad"
- If you are interested in how the monad control multiple threads see "Implicit thread control"
- If you are interested in examples read from "Example" on
- If you are interested in Async, promises and futures, read "Beyond futures and promises"
- If you are interested in running the examples read "Composition of Programs"
- If you are interested in de-inverting the control of callbacks see "deinverting callbacks"
The problem: parallelization, concurrency and inversion of control
Suppose that I have a blocking computation that return data when something happens. It may be also a long running computation that blocks the thread for a time:
receive: IO a
I can use it as such, but it blocks. I can not use it in a context where other events are firing. I must create a thread for each blocking IO call. All of these threads probably modify a central state. Otherwise there will be no communication of data among threads. Alternatively someone may have created a kind of framework for this particular problem, where these blocking calls are managed. It may be a GUI toolkit, or a Web application framework, A browser environment or a library for the management of an ATM machine etc. in any case, what the programmer see is a set of blocking synchronous calls and/or a set of callbacks or handlers that he has to program and configure in the framework.
Blocking IO creates the need to resort to manual thread management and concurrency. That means that the code is split into parallel and concurrent chunks which are hard to code and debug. In the second case I have non blocking IO, since the thread management is done implicitly by the framework, but, in the other side, I have to split the program logic into disconnected chunks. As a result, the program logic is very hard to grasp. This is know as the callback hell, a consequence of the inversion of control.
The OOP -half- solution
The second scenario appears when threading is managed by a framework. Essentially it is the same case. In both cases we end up with disconnected chunks of code and a mutable state. The standard way to manage this messy central state has been to divide it into smaller states and encapsulate them together with the methods that modify and serve the state. This is the Object Oriented Programming solution; The first OOP languages were created for managing events (SIMULA), and mouse events in a interactive GUI (Smalltalk)
Object-oriented programming is an exceptionally bad idea which could only have originated in California. - Dijkstra
Object Oriented Programming naturally fit with this inversion of control, that pervades IT problems. Whenever there are more than one asynchronous input, there is multitasking or inversion of control with state. The solution is a state machine. What OOP does is to split this state machine into smaller state machines called objects, that interact among them. But that implies the need to "deconstruct" the specifications.
Deconstruct the specification recipe considered harmful
Usually the specification of something that must be done is naturally expressed as if this "something" is a process, in the third person active perspective. People connect mentally the relevant elements directly, without concern for other secondary problems.
And there may be many of these intermediate elements. In a recipe people say : "you must fry the eggs". You don´t say "there is fire and there are eggs, you start the fire and the eggs will be fried by the fire". You see that a complete description in terms of active and passive elements implies to give protagonism to low level elements that you are not interesting when writing an specification.
But in OOP developments, these elements must appear in the form of objects. Creating an OOP solution implies the deconstruction of the specification recipe into multiple third person passive perspectives, one for each class or object that alternatively act as passive and active elements.
In OOP, you can not create a function or method called fryTheEggs
that stop the fire when the eggs are fried, without blocking the execution of everything else. So you need to manage explicitly at least two threads that sooner or later will have to communicate asynchronously. Alternatively, you may define start-fire
, stop-fire
in the fire
class, and a callback called egg-fried
so that you wire-up the application with these elements.
There is no way to express fry-the-eggs in a single self contained expression that you can reuse. As a consequence, OOP can not produce composable programs. In any case, in OOP, the programs are made of disconnected pieces, the software does not follow the natural flow as it would have been naturally extracted from the specifications and low level details emerge at top level. The resulting code is hard to maintain.
The application/service created with this deconstruction is not composable. There is no way to insert your service within something bigger with a single invocation. Even with the objects that you fully control, since an object, by definition, is a box with many connectors, so it can not be assembled in a pipeline. This derives in the scarce reusability of the software, and the need of profuse documentation. It is inelegant, buggy, hard to maintain, and permits an huge number of arbitrary alternatives in the design space that aggravates the mentioned problems. It may be though that this is good for the IT business, because it justifies big IT department budgets, but this is not good in the medium-long term. There are better ways to do it.
In contrast a true functional solution follow closely the user specification because the very elements of the problem that the user manages should be first class in the program. There is an algebra in which each individual top level element of the user specification is a term in an equation. Therefore, reusability and composability are the natural consequence. That algebra is instantiated in an embedded domain-specific language EDSL.
But there are reasons why functional programs are not composable. The main obstacle for composability in functional languages are the asynchronous inputs. In the past there were a good effort into using continuations to deal with them, but lately they have been abandoned due to the irruption of OOP programmers in the functional arena.
What we need
Simplicity is prerequisite for reliability. - Dijkstra
The application must be programmed following the natural flow defined in the specification. The code must not split the specifications into explicit parallel running tasks, neither invert the control and deconstruct the specification into objects. The design space must be limited so everyone should program the same specification the same way. So other's code can be grasped immediately without the aid of external documentation. The application must transport user-defined state, that can be inspected and updated, added and deleted, but this state must be instrumental. It should not be the center, because the center is the process described in the specification.
We need an EDSL for hardworking IT programmers, that use Java, JavaScript, Scala, C#, PHP, Ruby or Python and don't know Haskell. They need to experiment an immediate and huge advantage using Haskell. Not a monad stack but a simple monad, not more complex to use than IO, that may liberate them from the Oppressive Object Paradigm, or OOP inversion of control, without forcing them to sacrifice time and effort to the gods of Category Theory. With applicative and alternative combinators and a few primitives for implicit parallelization and thread control and for de-inversion of callbacks in the IO monad. Plus user-defined state management and early termination
What we need is a software connector that works like a hardware serial bus . The hardware designers invented the serial bus for the same problem. Their chips had many more pins than the software objects have methods, so connecting between them directly was impossible. For that purpose they invented the serial bus, that receive injected signals at different points. How a connector for different elements that inject events from GUI widgets, asynchronous responses, callbacks from frameworks and hardware interruptions would look like?
“Simplicity is a great virtue but it requires hard work to achieve it and education to appreciate it. And to make matters worse: complexity sells. - Dijkstra
Enter the monad
A monad with the asynchronicity effect can rescue the industry from the inversion of control trap for which OOP was originally designed while allowing implicit parallelization and thread control. A entire application can be coded in a single monadic expression with little or no plumbing. That allows the creation of composable applications and services of the A -> A -> A kind.
In A monad for reactive programming I defined a monad that de-invert the control when there are different events. The Transient
monad can listen for events at different points in the monadic expression. Current solutions have a single listen point for events. This single watching point has different names: At the OS level there are calls like select
. GUI and Client Web frameworks have an event loop at the lower level. But at the top level they send events to different UI elements. That kind of interface invert the control, since the programmer has to define callbacks. The reactive solutions bubble up the events to a single listen point again, and attach an event preprocessor to it together with a single expression that act like a big event handler.
The solution of the above mentioned article keep the events in the UI elements that produced the event without inverting the control. Moreover, these events listeners do not block, so every event watching point is active in the monad at the same time.
The events in the above mentioned article are injected by a simulated event loop in the state monad. This time I will show how to listen for IO computations without the help of a framework that bring events. These events may be hardware buttons, device driver inputs, requests from users, responses from databases, requests from other systems in the cloud etc.
What we intend here is to formulate a general solution that permit coding close to the user requirement document, that is expressive enough to code an entire application as a single monadic expression even if involves multiple inputs and parallel executions. This expression will spawn communicate and kill tasks whenever necessary automatically. We will see that we can improve the readability and reduce the complexity, so we can increase the maintainability, enabling composability of entire services or applications.
Since I have to deal with dirty things like blocking, threads and IO, don't expect what follows to be a walk in the Platonic realm. I start with a monad like the Transient
monad, that can be stopped with empty
and continued with runCont cont
where cont
is the continuation context, set with getCont
. (explanation below)
data Transient m x= Transient {runTrans :: m (Maybe x)}
data EventF = forall a b . EventF
{xcomp :: (TransientIO a)
,fcomp :: [a -> TransientIO b]
, ... other ....}
type StateIO= StateT EventF IO
type TransientIO= Transient StateIO
instance Monad TransientIO where
return x = Transient $ return $ Just x
x >>= f = Transient $ do
cont <- setEventCont x f
mk <- runTrans x
resetEventCont cont
case mk of
Just k -> runTrans $ f k
Nothing -> return Nothing
instance Applicative TransientIO where
pure a = Transient . return $ Just a
Transient f <*> Transient g= Transient $ do
k <- f
x <- g
return $ k <*> x
instance Alternative TransientIO where
empty= Transient $ return Nothing
Transient f <|> Transient g= Transient $ do
k <- f
x <- g
return $ k <|> x
getCont ::(MonadState EventF m) => m EventF
getCont = get
runCont :: EventF -> StateIO ()
runCont (EventF x fs ...)= do runIt x (unsafeCoerce fs); return ()
where
runIt x fs= runTrans $ x >>= compose fs
compose []= const empty
compose (f: fs)= \x -> f x >>= compose fs
For a view of how this monad has evolved look at the first article A monad for reactive programming part 1 where I present a simpler version of this monad that has some shortcomings. In the second part I solved these shortcomings. I think that this is the best way to understand it.
What this monad does is to store the closure x
and the continuations f
in the state. getCont
captures the execution state at the point and runCont
executes it.
As far as "continuation" is taken here, there may be more than one of them.
For example, in this expression:
x0 >>=((x >>= f1) >>= f2) >>= f3
for the closure generated at the execution point x
, the continuations are
f1 >>= f2 >>= f3
And the closure is the result of the execution of x0 >>= x
What setEventCont
and resetEventCont
does is to compose the list of continuations (one for each nested expression) in a 'flattened' representation, as a list in fcomp
. Since the list does not "know" that the continuations types match, I have to erase the types using unsafeCoerce
.
Each level is recursive. that means that if I have:
do
(a >> b) <|> (c >> d)
e
When the expression is executed, the closure is (a >> b) <|> (c >> d)
and the continuation is e.
when a >> b is executed setEventcont
put a
as closure and b >> e
as continuation. If a
has an statement that uses the continuation mechanism, for example async
(see below) it will execute a >>b >>e
.
But look at how Alternative operator is defined:
f <|> g = Transient $ do
k <- runTrans f
x <- runTrans g
return $ k <|> x
where f is a >> b
and g is c >> d
. Then both operands are executed. When c is executed, (d >> e) is the continuation. So if a and c receive different events (see below) they would execute their respective continuations, that have e in common.
Parallelization
With these three primitives getCont
runCont
and empty
I will define a async
primitive that will run a blocking IO action in a new thread and will execute the continuation in that thread when something is received:
buffer :: Dynamic
buffer= unsafePerformIO $ newEmptyMVar
async :: IO a -> TransientIO a
async receive = do
cont <- getCont
r <- liftIO $ tryTakeMVar buffer
case r of
Nothing ->do
liftIO . forkIO $ do
r <- receive
putMVar buffer $ toDync r
runCont cont
return()
empty
Just r -> return $ formDynamic r
Essentially, async
get the continuation, then inspect the buffer. If there is Nothing then spawn receive
in a new thread. The current thread is finished (empty
). When something arrives, it is put in the buffer, then runCont
will continue at the beginning of receive' in the new thread. It does so because getCont
got the Transient
continuation there. This time, there will be something in the buffer and will return it, so the procedure will continue after the event arrives, but in a new thread.
Note that receive
only fill the buffer. when runCont
executes the closure it will inspect the buffer again. This time there will be something, the closure will succeed and the continuation will fire.
getCont
and runCont
are similar to setjmp and longjmp in C. Moreover, the mechanism is not very different form how the IO scheduler in GHC or in any operating system. But this time it runs at the application level rather than at the GHC level.
Wait for events
If we want to trigger the continuation repeatedly whenever something is received by receive
, it is a matter of adding a loop to the Nothing
branch. Then the continuation will be called for every received event.
let's call this variant waitEvents
:
waitEvents :: IO a -> TransientIO a
waitEvents receive = do
cont <- getCont
r <- tryTakeMVar buffer
case r of
Nothing ->do
liftIO . forkIO $ loop $ do
r <- receive
putMVar buffer r
runCont cont
return()
empty
Just r -> return r
where
loop x= x >> loop x
Example
This program will say hello to every name entered.
runTransient :: TransientIO x -> IO (Maybe x, EventF)
runTransient t= runStateT (runTrans t) eventf0
main= do
runTransient $ do
name <- waitEvents getLine
liftIO $ putStrLn $ "hello "++ name
stay
Note that there is no loop. waitEvent install getLine at the start of a process that execute the continuation, what is after getLine, for each entry. the loop is internal to waitEvents
Here runTransient
execute a transient computation down to the IO monad.
stay
is whatever that keep the console application from exiting. That is because since the transient branch that wait for events is non-blocking, it would finish immediately. After async or waitEvents, the current thread dies and the rest of the monadic computation run in a different thread
Implicit thread control
Since each event in any part of the monadic computation are active and trigger the continuation of the monad at that point, the monadic expression is multithreaded and non determinist.
How to control the threads?. It is natural to think that since waitEvents
and async
execute continuations within the monadic expression, then once something happens in a statement then their continuations must be invalidated.
That means that whenever async
of waitEvents
receive something, the threads that are running below must be killed. Then this statement, with the new buffered input will execute his closure and rebuild the continuation again.
This is the natural thread management that I implemented. I do not detail the modifications necessary for waitEvents
to permit this behaviour. It is a matter of keeping in the state the list of spawned threads so that each waitEvents
has the information about all the threads that are triggered after it. Additionally, this list contain also a buffer for each of these threads.
In this example:
main= do
runTransient $ do
waitEvents watchReset <|> return ()
name <- waitEvents getLine
liftIO $ putStrLn $ "hello "++ name
stay
The return()
composed with the alternative operator <|>
would bypass immediately the wait for the reset event, but as soon as the reset is pressed, all the event handlers spawned after it will be killed. Immediately they will be spawned again.
This is a slightly different version:
main= do
runTransient $ do
r <- (waitEvents watchStop >> return True) <|> return False
if r then liftIO $ putStrln "STOP" else do
name <- waitEvents getLine
liftIO $ putStrLn $ "hello "++ name
stay
In this case the program will be stopped and will not be re-spawned when watchStop
is activated. since now the branch of the monad executed is different. It prints the stop message and finalizes.
Non blocking IO
Let's create one nonblocking keyboard input thing called option
. At the same time this is a good example of inter-thread communication within the Transient
monad:
option :: (Typeable a, Show a, Read a, Eq a) =>
a -> [Char] -> TransientIO a
option ret message= do
liftIO $ putStrLn $ message++"("++show ret++")"
waitEvents "" getLine'
where
getLine'= do
atomically $ do
mr <- readTVar getLineRef
case mr of
Nothing -> retry
Just r ->
case readsPrec 0 r of
[] -> retry
(s,_):_ -> if ret== s
then do
writeTVar getLineRef Nothing
return ret
else retry
_ -> retry
getLineRef= unsafePerformIO $ newTVarIO Nothing
inputLoop :: IO ()
inputLoop= do
r<- getLine !> "started inputLoop"
if r=="end" return True else do
atomically . writeTVar getLineRef $ Just r
inputLoop
Applicative and Alternative combinators
option
read in nonblocking mode the standard input, so many options can be combined using applicative or alternative operators. option
shows a message and wait for inputLoop
to enter a input line. If some option
match, it return the value. If it does not match, it fails with empty
, but the loop in waitEvents
re-executes getLine'
again for this option. In this way, the options are continuosly watching the input. Note that more than one option can be triggered simultaneously, in a different thread.
inputLoop
is initialized by async
. It wait for input, and expose it to all the running getLine'
processes (one per option
) in a TVar
. if the user press "end" inputLoop
return and async
kill all the watching threads below.
main= do
runTransient choose
stay
choose :: TransientIO()
choose= do
r <- async inputLoop <|> return False
case r of
True -> return ()
False-> do
r <- option (1 :: Int) "red" <|> option 2 "green" <|> option 3 "blue"
liftIO $ print r
The above program will print repeatedly the option chosen. We see that option
is composable using the alternative operator.
Now let's create another event generator, a number is sent every second, while two options are waiting for keyboard input:
data Option= Option String String | Number Int deriving Show
choose= do
r <- ( Option <$> ( option "1" "red" <*> option "2" "green"))
<|> ( Number <$> waitEvents waitnumber )
liftIO $ putStrLn $ "result=" ++ show r
where
waitnumber= do
threadDelay 1000000
return 42
Applicative and alternative combinators can be used fully. The Applicative wait for both events to be triggered to have data in their respective buffers. waitnumber
produce an event each second.
Each Option run a different waitEvent in a different thread, but each one of them execute the same closure (xcomp
) that is the whole applicative expression. The three have a TVar waiting for new input. they fill their respective buffers when they validate. The thread that fill the last buffer succeed and execute the continuation. The other two fail, but stay ready for the next input, since option
uses waitEvents
, which has a loop.
Beyond futures and promises
Scala Futures and the haskell library async uses placeholders, that receive the result. These placeholders can be used instad of the result of the computation, but some waiting operation must be put somewhere either at the end of the chain of sentences that operate with the future.
Scala Futures uses them in nice chains of multi threaded lists that can be transformed in the style of map-reduce.
In this sense they are similar to the javaScript promises, which chain code with then
, but the latter does not perform multiple tasks like in the case of Scala futures.
For some needs, Scala and JavaScript must use callbacks since the constraints of their frameworks do not allow enough flexibility. futures and promises forces the programmer to enter in a different kind of computation model, different from the one of the native languages. In the case of Scala it is mostly monoidal. in the case of Javascript is a restricted form of bind operation.
This library put the continuation code at the end of the receiving pipeline and parallelize the execution, but the continuation is the plain code that is after the receive call in the monadic expression, so there is no restriction about what can be done.
async
can be used for any process that we want to parallelize using applicative notation. This program sum the words in google and haskell homepages in parallel. Using Network.HTTP
sum= do
(r,r') <- (,) <$> async (worker "http://www.haskell.org/")
<*> async (worker "http://www.google.com/")
liftIO $ putStrLn $ "result=" ++ show r + r'
where
getURL= simpleHTTP . getRequest
worker :: String -> IO Int
worker url=do
r <- getURL url
body <- getResponseBody r
return . length . words $ body
That is a complete working example. Note that unlike in the async library, there is no wait
primitive and no explicit constructon for parallelization. The applicative instance does the parallelization and async
does also the wait
. All the processing is done in the worker
in his own thread. More on that below.
We also can do parallel IO processing in the style of futures of the Scala language using the Monoid
instance of TransientIO
. But this time since we use continuations, futures are no longer necessary since the thread of the download that finalizes the latter is the one that continues the execution. A future, instead would yield the control to a third main thread that coordinates the rest. In transient, there is no main thread. The faster thread, the one that finalizes the download the first, fails (as I explained above).
instance Monoid a => Monoid (TransientIO a) where
mappend x y = mappend <$> x <*> y
mempty= return mempty
sum= do
rs <- foldl (<>) (return 0) $ map (async . worker)
[ "http://www.haskell.org/", "http://www.google.com/"]
liftIO $ putStrLn $ "result=" ++ show rs
Since the worker
return an Int
, to sum the results we need a Monoid
instance for Int
instance Monoid Int where
mappend= (+)
mempty= 0
Note that there is a full de-inversion of control, since the result return to the monad. The Futures and promises of Scala (or in javascript) can not return the execution flow to the calling procedure without a form of explicit asynchronous rendezvous with the main thread that executes the main flow where it continues single threaded, in imperative mode.
Since the parallel effects of the Transient monad continue in the do block, the processing of the results can continue in the monad. That permits more complex and yet clearer computations. It is not reduced to list-like processing and the main thunk of the computation should not be single threaded.
I said that in an applicative instance with async
sentences, the one that finalize the latter computes the result and continue the rest of the computation, the other threads stop. But in an applicative expression with async, all the thread may succeed and return result to the do block, so all of them executes the next statements in parallel. This is the non-deterministic effect of transient that is explored in another article.
A Web Server
Here is toy Web Server:
server= do
sock <- liftIO $ listenOn $ PortNumber 80
(h,_,_) <- spawn $ accept sock
liftIO $ do
hPutStr h msg
hFlush h
hClose h
msg = "HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\nPong!\r\n"
In the current code, the primitives async
, waitEvents
and spawn
are defined in terms of parallel
, which is a generalization of async
and waitEvents
explained above :
data Loop= Once | Loop | Multithread
waitEvents :: IO b -> TransientIO b
waitEvents= parallel Loop
async :: IO b -> TransientIO b
async = parallel Once
spawn= parallel Multithread
parallel :: Loop -> IO b -> TransientIO b
When parallel
is called with Multithread
, it spawn the continuation in a thread for each event received immediately without waiting for the termination of the previous one. waitEvent
execute the continuation within the thread so the receive method is not called again until the previous event is processed.
Composition of programs (Runnable example)
We can compose any of these programs together since none of them block and the automatic thread control apply gracefully to all of the elements. This program combines the above programs and some others.
The combination in this case is using the alternative operator:
colors <|> app <|> sum1 <|> sum2 <|> server <|> menu
Since the fpcomplete environment uses ghci and it shares threads among snippets of code, I can run only one example in this article, and the composability of Transient is nice to show them all together.
To verify the multitasking press: app and then colors. app would start an iterative counter with an applicative expression, while colors will ask for an option among tree of them. both will run in parallel until you press "main" which will stop both, since main is above in the monad.
{-# START_FILE main.hs #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Main where
import Base
import Control.Applicative
import Control.Concurrent
import Control.Exception
import Control.Monad.State
import Data.Monoid
import System.IO.Unsafe
import Network.HTTP
import Network
import System.IO
-- show
data Option= Option Int Int | Number Int deriving Show
instance Monoid Int where
mappend= (+)
mempty= 0
main= do
runTransient $ do
async inputLoop <|> return ()
option "main" "to return to the main menu" <|> return ""
liftIO $ putStrLn "MAIN MENU"
colors <|> app <|> sum1 <|> sum2 <|> server <|> menu
stay
colors :: TransientIO ()
colors= do
option "colors" "choose between three colors"
r <- color 1 "red" <|> color 2 "green" <|> color 3 "blue"
liftIO $ print r
where
color :: Int -> String -> TransientIO String
color n str= option (show n) str >> return str
app :: TransientIO ()
app= do
option "app" "applicative expression that return a counter in 2-tuples every second"
r <- (,) <$> number <*> number
liftIO $ putStrLn $ "result=" ++ show r
where
number= waitEvents $ do
threadDelay 1000000
n <- takeMVar counter
putMVar counter (n+1)
return n
counter=unsafePerformIO $ newMVar (0 :: Int)
sum1 :: TransientIO ()
sum1= do
option "sum1" "access to two web pages concurrently and sum the number of words using Applicative"
(r,r') <- (,) <$> async (worker "http://www.haskell.org/")
<*> async (worker "http://www.google.com/")
liftIO $ putStrLn $ "result=" ++ show (r + r')
getURL= simpleHTTP . getRequest
worker :: String -> IO Int
worker url=do
r <- getURL url
body <- getResponseBody r
putStrLn $ "number of words in " ++ url ++" is: " ++ show(length (words body))
return . length . words $ body
sum2 :: TransientIO ()
sum2= do
option "sum2" "access to N web pages concurrenty and sum the number of words using map-fold"
rs <- foldl (<>) (return 0) $ map (async . worker)
[ "http://www.haskell.org/"
, "http://www.google.com/"]
liftIO $ putStrLn $ "result=" ++ show rs
server :: TransientIO ()
server= do
option "server" "A web server in the port 8080"
liftIO $ print "Server Stated"
sock <- liftIO $ listenOn $ PortNumber 8080
(h,_,_) <- spawn $ accept sock
liftIO $ do
hPutStr h msg
putStrLn "new request"
hFlush h
hClose h
`catch` (\(e::SomeException) -> sClose sock)
msg = "HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\nPong!\r\n"
menu :: TransientIO ()
menu= do
option "menu" "a submenu with two options"
colors <|> sum2
-- / show
{-# START_FILE Base.hs #-}
-----------------------------------------------------------------------------
--
-- Module : Base
-- Copyright :
-- License : GPL (Just (Version {versionBranch = [3], versionTags = []}))
--
-- Maintainer : [email protected]
-- Stability :
-- Portability :
--
-- |
--
-----------------------------------------------------------------------------
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
-- show
module Base (
module Control.Applicative,
TransientIO,
async,waitEvents, spawn,react,
runTransient,
inputLoop, option, stay,
getSData,setSData,delSData
) where
-- /show
import Control.Applicative
import Control.Monad.State
import Data.Dynamic
import qualified Data.Map as M
import Data.Monoid
import Debug.Trace
import System.IO.Unsafe
import Unsafe.Coerce
import Control.Concurrent
import Control.Concurrent.STM
import Data.List
import Data.Maybe
import GHC.Conc
import System.Mem.StableName
(!>) = const . id -- flip trace
infixr 0 !>
data Transient m x= Transient {runTrans :: m (Maybe x)}
type SData= ()
type EventId= Int
data EventF = forall a b . EventF{xcomp :: (EventId,TransientIO a)
,fcomp :: [a -> TransientIO b]
,mfData :: M.Map TypeRep SData
,mfSequence :: Int
,nodeInfo :: Maybe (P RowElem)
,row :: P RowElem
,replay :: Bool
}
type P= MVar
(=:) :: P a -> a -> IO()
(=:) n v= modifyMVar_ n $ const $ return v
type Buffer= Maybe ()
type NodeTuple= (EventId, ThreadId, Buffer)
type Children= Maybe (P RowElem)
data RowElem= Node NodeTuple | RowList Row Children
instance Show RowElem where
show (Node (e,_,_))= show e
show (RowList r ch)= show ( reverse r) ++ "->" ++ show ch
type Row = [P RowElem]
instance Eq NodeTuple where
(i,_,_) == (i',_,_)= i == i'
instance Show x => Show (MVar x) where
show x = show (unsafePerformIO $ readMVar x)
eventf0= EventF (-1,empty) [const $ empty] M.empty 0
Nothing rootRef False
topNode= (-1 :: Int,unsafePerformIO $ myThreadId,False,Nothing)
rootRef :: MVar RowElem
rootRef= unsafePerformIO $ newMVar $ RowList [] Nothing
instance MonadState EventF TransientIO where
get= Transient $ get >>= return . Just
put x= Transient $ put x >> return (Just ())
type TransientIO= Transient StateIO
type StateIO= StateT EventF IO
runTransient :: TransientIO x -> IO (Maybe x, EventF)
runTransient t= runStateT (runTrans t) eventf0
newRow :: MonadIO m => m (P RowElem)
newRow= liftIO $ newMVar $ RowList [] Nothing
setEventCont :: TransientIO a -> (a -> TransientIO b) -> StateIO EventF
setEventCont x f = do
st@(EventF _ fs d _ es ro r) <- get
n <- if replay st then return $ mfSequence st
else liftIO $ readMVar refSequence
ro' <- newRow
ro `eat` ro'
put $ EventF (n,x) ( f: unsafeCoerce fs) d n es ro' r !> ("stored " ++ show n)
return st
eat ro ro'= liftIO $
modifyMVar_ ro $ \(RowList es t) -> return $ RowList (ro':es) t
resetEventCont (EventF x fs _ _ _ _ _)=do
st@(EventF _ _ d n es ro r ) <- get
put $ EventF x fs d n es ro r
getCont ::(MonadState EventF m) => m EventF
getCont = get
runCont :: EventF -> StateIO ()
runCont (EventF (i,x) fs _ _ _ _ _)= do runIt i x (unsafeCoerce fs); return ()
where
runIt i x fs= runTrans $ do
st <- get
put st{mfSequence=i}
r <- x
put st
compose fs r
compose []= const empty
compose (f: fs)= \x -> f x >>= compose fs
instance Functor TransientIO where
fmap f x= Transient $ fmap (fmap f) $ runTrans x --
instance Applicative TransientIO where
pure a = Transient . return $ Just a
Transient f <*> Transient g= Transient $ do
k <- f
x <- g
return $ k <*> x
instance Alternative TransientIO where
empty= Transient $ return Nothing
Transient f <|> Transient g= Transient $ do
k <- f
x <- g
return $ k <|> x
-- | a sinonym of empty that can be used in a monadic expression. it stop the
-- computation
stop :: TransientIO a
stop= Control.Applicative.empty
instance Monoid a => Monoid (TransientIO a) where
mappend x y = mappend <$> x <*> y
mempty= return mempty
instance Monad TransientIO where
return x = Transient $ return $ Just x
x >>= f = Transient $ do
cont <- setEventCont x f
mk <- runTrans x
resetEventCont cont
case mk of
Just k -> do addRow' !> "ADDROW" ; runTrans $ f k
Nothing -> return Nothing
where
addRow'= do
r <- gets row
n <- addRow r
modify $ \s -> s{row= n}
addRow r=
liftIO $ do
n <- newMVar $ RowList [] Nothing
modifyMVar_ r $ \(RowList ns ch) -> do
case ch of
Just x -> error $ "children not empty: "++ show x
Nothing -> return $ RowList ns $ Just n
return n
instance MonadTrans (Transient ) where
lift mx = Transient $ mx >>= return . Just
instance MonadIO TransientIO where
liftIO = lift . liftIO -- let x= liftIO io in x `seq` lift x
-- | Get the session data of the desired type if there is any.
getSessionData :: (MonadState EventF m,Typeable a) => m (Maybe a)
getSessionData = resp where
resp= gets mfData >>= \list ->
case M.lookup ( typeOf $ typeResp resp ) list of
Just x -> return . Just $ unsafeCoerce x
Nothing -> return $ Nothing
typeResp :: m (Maybe x) -> x
typeResp= undefined
-- | getSessionData specialized for the View monad. if Nothing, the monadic computation
-- does not continue. getSData is a widget that does not validate when there is no data
-- of that type in the session.
getSData :: MonadState EventF m => Typeable a =>Transient m a
getSData= Transient getSessionData
-- | setSessionData :: (StateType m ~ MFlowState, Typeable a) => a -> m ()
setSessionData x=
modify $ \st -> st{mfData= M.insert (typeOf x ) (unsafeCoerce x) (mfData st)}
-- | a shorter name for setSessionData
setSData :: ( MonadState EventF m,Typeable a) => a -> m ()
setSData= setSessionData
delSessionData x=
modify $ \st -> st{mfData= M.delete (typeOf x ) (mfData st)}
delSData :: ( MonadState EventF m,Typeable a) => a -> m ()
delSData= delSessionData
----
genNewId :: MonadIO m => MonadState EventF m => m Int
genNewId= do
st <- get
case replay st of
True -> do
let n= mfSequence st
put $ st{mfSequence= n+1}
return n
False -> liftIO $
modifyMVar refSequence $ \n -> return (n+1,n)
refSequence :: MVar Int
refSequence= unsafePerformIO $ newMVar 0
--- IO events
--buffers :: IORef [(EventId,Dynamic)]
--buffers= unsafePerformIO $ newIORef []
data Loop= Once | Loop | Multithread deriving Eq
waitEvents :: IO b -> TransientIO b
waitEvents= parallel Loop
spawn= parallel Multithread
async :: IO b -> TransientIO b
async = parallel Once
parallel :: Loop -> IO b -> TransientIO b
parallel hasloop receive = Transient $ do
cont <- getCont
id <- genNewId
let currentRow= row cont
-- mnode= nodeInfo cont
mnode <- liftIO $ lookTree id currentRow !> ("idToLook="++ show id++ " in: "++ show currentRow)
case mnode of
Nothing ->do
return () !> "NOT FOUND"
liftIO $ do
ref <- newMVar $ Node (id,undefined,Nothing)
modifyMVar_ (row cont) $ \(RowList ns t) -> return $ RowList (ref : ns) t
forkIO $ do
th <- myThreadId
modifyMVar_ ref $ \(Node(id,_,n)) -> return $ Node (id,th,Nothing)
loop hasloop receive $ \r -> do
th <- myThreadId
modifyMVar_ ref $ \(Node(i,_,_)) -> return
$ Node(i,th,Just $ unsafeCoerce r)
case cont of
EventF (i,x) f _ _ _ _ _-> do
mr <- runStateT (runTrans x)
cont{replay= True,mfSequence=i,nodeInfo=Just ref}
!> "runx" !> ("mfSequence="++ show i)
case mr of
(Nothing,_) ->return()
(Just r,cont') ->do
let row1= row cont'
delEvents row1 !> ("delEvents, activated "++ show row1)
id <- readMVar refSequence
n <- if hasloop== Multithread then return row1 else addRow row1
runStateT (runTrans $ ( compose $ unsafeCoerce f) r)
cont'{row=n,replay= False,mfSequence=id } !> ("SEQ=" ++ show(mfSequence cont'))
return ()
-- delEvents children []
modifyMVar_ (row cont) $ \(RowList ns ch) -> return $ RowList (ref : ns) ch
return Nothing
Just (node@(id',th', mrec)) -> do
modify $ \cont -> cont{nodeInfo=Nothing}
return $ if isJust mrec then Just $ unsafeCoerce $ fromJust mrec else Nothing
where
loop Once rec x = rec >>= x
loop Loop rec f = do
r <- rec
f r
loop Loop rec f
loop Multithread rec f = do
r <- rec
forkIO $ f r
loop Multithread rec f
lookTree :: EventId -> P RowElem -> IO (Maybe NodeTuple)
lookTree id ref= do
RowList ns _<- readMVar ref
lookList id ns
lookList id mn= case mn of
[] -> return Nothing
(p:nodes) -> do
me <- readMVar p
case me of
Node(node@((id',_,_))) ->
if id== id'
then return $ Just node
else lookList id nodes
RowList row _ -> do
mx <- lookList id nodes
case mx of
Nothing -> lookList id row
Just x -> return $ Just x
delEvents :: P RowElem -> IO()
delEvents ref = do
RowList mevs mch <- takeMVar ref
maybeDel mch
putMVar ref $ RowList mevs Nothing
maybeDel mch= case mch of
Nothing -> return ()
Just p -> do
RowList es mch' <- readMVar p
delList es !> ("toDelete="++ show es)
maybeDel mch'
delList es= mapM_ del es where
del p = readMVar p >>= del'
del' (Node(node@(_,th,_)))= killThread th !> ("DELETING " ++ show node)
del' (RowList l mch)= delList l >> maybeDel mch
type EventSetter eventdata response= (eventdata -> IO response) -> IO ()
type ToReturn response= IO response
-- | de-invert a event handling setter. the second parameter compute the response to return each time the event handler is called.
-- It is useful whenever there is a framework or OS service that need to set interruption handlers, event handlers, request handlers,
-- callbacks etc.
--
-- For example, if we have this OnResponse callback setter for a asynchronous query response that send data to display:
--
-- > data Control= SendMore | MoMore
-- >
-- > OnResponse :: (Response -> IO Control) -> IO()
--
-- We can iterate the responses and we can interrupt them this way:
--
-- > rcontrol <- newMVar Control
-- >
-- > resp <- react $ OnResponse (const $ readMVar rcontrol)
-- > display resp
-- > r <- (option "more" "more" >> return SendMore) <|> (option "stop" "stop" >> return NoMore)
-- > putMVar rcontrol r
react
:: Typeable eventdata
=> EventSetter eventdata response
-> ToReturn response
-> TransientIO eventdata
react setHandler iob= Transient $ do
cont <- getCont
mEvData <- getSessionData
case mEvData of
Nothing -> do
liftIO $ setHandler $ \dat ->do
-- let cont'= cont{mfData = M.insert (typeOf dat)(unsafeCoerce dat) (mfData cont)}
runStateT (setSData dat >> runCont cont) cont
iob
return Nothing
Just dat -> delSessionData dat >> return (Just dat)
getLineRef= unsafePerformIO $ newTVarIO Nothing
-- for testing purposes
option1 x message= inputLoop `seq` (waitEvents $ do
liftIO $ putStrLn $ message++"("++show x++")"
atomically $ do
mr <- readTVar getLineRef
th <- unsafeIOToSTM myThreadId
case mr of
Nothing -> retry
Just r ->
case reads1 r !> ("received " ++ show r ++ show th) of
(s,_):_ -> if s == x !> ("waiting" ++ show x)
then do
writeTVar getLineRef Nothing !>"match"
return s
else retry
_ -> retry)
where
reads1 s=x where
x= if typeOf(typeOfr x) == typeOf "" then unsafeCoerce[(s,"")] else readsPrec 0 s
typeOfr :: [(a,String)] -> a
typeOfr = undefined
option :: String -> String -> TransientIO String
option ret message= do
liftIO $ putStrLn $"Enter "++show ret++"\tto: " ++ message
waitEvents $ getLine' (==ret)
liftIO $do putStrLn $ show ret ++ " chosen"
return ret
getLine' :: (String-> Bool) -> IO String
getLine' cond= inputLoop `seq` do
atomically $ do
mr <- readTVar getLineRef
th <- unsafeIOToSTM myThreadId
case mr of
Nothing -> retry
Just r ->
if cond r !> show (cond r)
then do
writeTVar getLineRef Nothing !>"match"
return r
else retry
where
reads1 s=x where
x= if typeOf(typeOfr x) == typeOf "" then unsafeCoerce[(s,"")] else readsPrec 0 s
typeOfr :: [(a,String)] -> a
typeOfr = undefined
inputLoop :: IO ()
inputLoop= do
putStrLn "Press end to exit"
inputLoop'
where
inputLoop'= do
r<- getLine !> "started inputLoop"
if r=="end" then putMVar rexit () else do
atomically . writeTVar getLineRef $ Just r
inputLoop'
rexit= unsafePerformIO newEmptyMVar
stay= takeMVar rexit >> print "bye"
Session data
I added a type indexed map to the state so the user can store his own session data with these primitives:
setSData :: a -> TransientIO ()
getSData :: TransientIO a
Session data can be used instead of a state monad transformer for each new kind of user data.
My purpose is to create a monad for general IT purposes, for profane programmers with no knowledge of monad transformers.
Since empty
stop the computation but does not transport any error condition, session data can be used for this purpose:
data Status= Error String | NoError
fail :: String -> TransientIO a
fail msg= setSData (Error msg) >> empty
After the execution, I can inspect the status:
status <- getSData <|> NoError
The alternative expression is necessary since if Status has not been set, the computation would stop. NoError
guarantee that it does not stop.
de-inverting callbacks
So far so good. But what happens when besides dealing with raw blocking IO there is a framework that deal with some particular events, so it initiates the threads himself and expect you just to set the callbacks?
Suppose that we have this event handling setter:
setHandlerForWatever :: (a -> IO ()) -> IO ()
It is necessary a de-inversion call whateverHappened
at some point of the computation may be at the beginning) so that the callback continues the monadic execution:
do
somethingToDo
r <- whatheverHappened
doSomethingWith r
....
To define the de-inverted call whateverHappened
we use the same trick than in async, but this time there is no forkIO
neither thread control, since the framework does it for you:
whateverHappened= do
cont <- getCont
mEvData <- Just <*> getSData <|> return Nothing
case mEvData of
Nothing -> setHandlerForWhatever $\dat -> do
runStateT ( setSData dat >> runTansient cont) cont
empty
Just dat -> return dat
Whether the framework is single threaded or multi threaded is not important, we give it the event handlers that it need by means of continuations.
To have something more general, I defined:
type EventSetter eventdata response= (eventdata -> IO response) -> IO ()
type ToReturn response= IO response
react
:: Typeable eventdata
=> EventSetter eventdata response
-> ToReturn response
-> TransientIO eventdata
the second parameter is the value returned by the callback. So if you have a callback called OnResponse
data Control= SendMore | MoMore
OnResponse :: (Response -> IO Control) -> IO()
I can display all data received while controlling the reception this way:
rcontrol <- newMVar Control
resp <- react $ OnResponse (const $ readMVar rcontrol)
display resp
r <- (option "more" "more" >> return SendMore) <|> (option "stop" "stop" >> return NoMore)
putMVar rcontrol r
Since react set as callback all the rest of the computation and since the ToReturn
expression is evaluated the latest, the continuation is executed and set rcontrol before the ToReturn
expression is evaluated.
Note that you can reassign the callback at any moment since react would set whatever continuation that is after it.
Conclusions and future work
The code is at
https://github.com/agocorona/transient
My aim is to create a family of combinators for programming in industry. As I said before, that implies no monad transformers, the simplest monad that could produce the simplest error messages.
The haskell applicative, alternative, monoidal and monadic combinators when applied to a monad that manage asynchronous IO permits multithreaded programming with little plumbing that is close to the specification level with great composability. No inversion of control means no need to deconstruct the specifications and no state machines.
This, together with the uniform and composable thread management, narrow the design space and makes the application more understandable from the requirements, and thus the technical documentation and maintenance costs are reduced to a minimum.
Note that the bulk of the programming is done in the IO monad. That is on purpose. The idea is a simple IT EDSL with the rigth effects that permit rapid and intuitive development. Additional monads can be used by running them within the IO procedures defined by the programmer if they wish. I will add some additional effects like backtracking to undo transactions and to produce execution traces. That would be the base of a new version of MFlow, my server-side framework and integration platform. The ability to perform rollbacks and respond to asynchronous events at the same time is important for cloud applications. reader and writer effects for any programmer need are almost trivial to implement using getSData
and setSData
.
Resource allocation and deallocation for file handlers etc can be done using the same strategy used for thread control, but it is more orthogonal to delegate it to the IO threads themselves. The programmer can use exceptions or monads that guarantee proper release of resources before the thread is killed.
In Multithread
mode the single entry buffer can be overrun. It is necessary to handle this case or, else, assume that the receive
procedure has his own buffer and his own event contention mechanism. That is the most orthogonal option.
This is huge. I plan to create interfaces for some GUI toolkit. The GUI objects will be fully composable for the first time.
Spawning threads in other machines is the next big step. MFlow and hplayground will converge with this platform. If you want to collaborate, don´t hesitate to send me a message!
With the react
primitive It is possible to de-invert any framework, including the callbacks of a GUI toolkit, so the widgets can be managed with Applicative and monadic combinators. hplayground does that for the Javascript callbacks and HTML forms. Since hplayground, that run in the client and MFlow that run server side share the same widget EDSL, that can be ported to a GUI, an application can run in any environment, including console applications.
Part II: The hardworking programmer II: practical backtracking to undo actions