【问题标题】:How to write an event bus in Haskell?如何在 Haskell 中编写事件总线?
【发布时间】:2015-09-14 12:15:09
【问题描述】:

我正在努力解决 Haskell 中的一个设计问题,我似乎无法以一种优雅而令人满意的方式解决这个问题。我有一个系统,其核心是基于事件溯源的概念:系统的状态来自于将一系列事件应用到初始状态。有不同类型的事件,每种类型都通过类型族与系统的特定组件相关:

class Model a where
   data Event a :: * 
   apply :: Event a -> a -> a

instance Model Foo where
   data Event Foo = Foo Int
   ...

instance Model Bar where
   data Event Bar = Bar String
   ...

目前系统是 100% 同步和耦合的,每个模型都可以访问所有其他模型的事件,这很快就会变得一团糟,所以我想通过引入 事件总线来解耦Bus Events 这样我应该可以写出类似的东西 dispatch :: Bus Events -> Consumer (Event Foo) -> Bus EventsEvent Foo 的一些消费者附加到Bus Events 上,假设Event FooEvents 之间存在某种形式的子类型或包含。 然后我可以通过确保每个消费者在自己的线程中运行来添加异步性。

从系统的角度来看,这将允许我确保每个组件都是可独立打包的,从而将依赖关系限制为所有事件的子集。 Events 类型将在整个应用程序级别定义。 这个问题看起来与离散时间 FRP 看似相似,但我似乎无法理解它......

有没有人处理过类似的事情,如果有,如何处理?

编辑

我想出了以下代码,它没有使用Source,但受到@Cirdec 提议的极大启发:

import           Control.Applicative
import           Control.Concurrent
import           Control.Concurrent.STM
import           Control.Monad.Reader
import qualified Data.Vector            as V

type Handlers e = V.Vector (Handler e)

data EventBus e = EventBus { handlers    :: Handlers e
                           , eventQueue  :: TChan e
                           , eventThread :: MVar ThreadId
                           }

newBus :: IO (EventBus e)
newBus = do
  chan <- newTChanIO
  var <- newEmptyMVar
  return $ EventBus V.empty chan var

addHandler :: Handler e -> EventBus e -> EventBus e
addHandler h b@EventBus{..} = b { handlers = V.snoc handlers h }

removeHandler :: Int -> EventBus e -> EventBus e
removeHandler idx b@EventBus{..} = b { handlers = let (h,t) = V.splitAt idx handlers
                                                  in h V.++ V.tail t }

startBus :: EventBus e -> IO (EventBus e)
startBus b@EventBus{..} = do
  tid <- forkIO (runBus b)
  putMVar eventThread tid
  return b

runBus :: EventBus e -> IO ()
runBus b@EventBus{..} = do
  _ <- takeMVar eventThread
  forever $ do
    e <- liftIO $ atomically $ readTChan eventQueue
    v <- newTVarIO b
    runReaderT (runEvents $ publish e) v

-- | A monad to handle pub/sub of events of type @e@
newtype Events e a = Events { runEvents :: ReaderT (TVar (EventBus e)) IO a }
                   deriving (Applicative, Functor, Monad, MonadIO, MonadReader (TVar (EventBus e)))

newtype Handler e = Handler { handle :: Events e ()                 -- Unsubscription function
                                     -> Events e (e -> Events e ())  -- what to do with events @e@
                            }


-- | Register a new @Handler e@ within given @Events e@ context
subscribe :: Handler e -> Events e ()
subscribe h = do
  bus <- ask
  liftIO $ atomically $ modifyTVar' bus (addHandler h)

unsubscribe :: Int -> Events e ()
unsubscribe idx = do
  bus <- ask
  liftIO $ atomically $ modifyTVar' bus (removeHandler idx)

publishBus :: EventBus e -> e -> IO ()
publishBus EventBus{..} = atomically . writeTChan eventQueue

publish :: e -> Events e ()
publish event = do
  EventBus{..} <- ask >>= liftIO . atomically . readTVar
  forM_ (zip (V.toList handlers) [0..]) (dispatch event)

dispatch :: e -> (Handler e, Int) -> Events e ()
dispatch event (Handler h, idx) = do
  hdl <- h (unsubscribe idx)
  hdl event

printer :: (Show s) => String -> Handler s
printer prefix = Handler ( \ _ -> return $ \ e -> liftIO (putStrLn $ prefix ++ show e))

【问题讨论】:

  • 所以你正在为你的事件搜索类似 Observable 模式的东西,对吧? IMO 你可以像在 OO 中那样实现它(使用IORef 或任何你想要Subscribe 的行为 - 或者如果你不想在创建时提供处理程序) - 问题是:你真的需要它还是应该由您的 commands/-handler 负责?
  • @Carsten 有点,是的。对于实际的管道,我计划使用TChan,它具有提供发布/订阅功能的良好特性,但这并不是我的问题。我更关心整体设计方法,这看起来像是我正在尝试实现类型化actor之类的东西,或者重新发明离散时间 FRP,但我很困惑。
  • 您要解决的问题是什么?您的问题是询问如何以特定方式解决问题,而不是如何解决问题。解决方案很可能已经存在:oo 样式事件、可观察对象、FRP 或并发管道。
  • 你说得对,我的问题具体是如何针对设计“复杂”应用程序的一般问题实施特定解决方案,以减少耦合和增加内聚。我已经决定使用事件溯源作为应用程序所基于的核心原理,我现在的问题是从同步(例如基于函数调用)解决方案转移到基于异步(例如事件队列、参与者……)的解决方案解决方案。

标签: haskell types event-sourcing


【解决方案1】:

可订阅的带有as的事件源有以下类型

type Source m a = (a -> m ()) -> m (m ())
                   |             |  ^--- how to unsubscribe             
                   |             ^--- how to subscribe
                   ^--- what to do when an `a` happens

事件的消费者或处理程序是天真地接受事件源并订阅它的东西

type Handler m a = (Source m a             ) -> m ()
                 = ((a -> m ()) -> m (m ())) -> m ()
                                                ^-- set up the consumer.

这有点令人费解,我们可以反转事物并为事件处理程序获得更好的表示:

type Handler m a = m () -> m (a -> m ())
                   |       |  ^-- what to do when an `a` happens
                   |       ^-- set up the consumer
                   ^-- how to unsubscribe

原始事件源使用起来有点棘手;订阅者可能想要取消订阅以响应事件的发生,在这种情况下,他们需要递归地获取结果的取消订阅操作,以便在事件发生时执行该操作。从Handler 的更好定义开始,我们没有这个问题。事件源现在是接受事件处理程序并向其发布的东西。

type Source m a = (Handler m a          ) -> m ()
                = (m () -> m (a -> m ())) -> m ()
                                             ^-- how to subscribe

【讨论】:

  • 感谢您的建议,这看起来就像我正在寻找的东西,除了一个缺失的部分:如何定义 Handler m a 以使其可以传递给 Source m b 其中@ 987654329@ 用于某些“子类型”关系。但是写下我想到的那句话我可以简单地定义class a :&lt; b并提供正确的实例......
  • 实际上我要做的似乎并不那么简单,因为这意味着能够将一些 a 强制转换为 sum 类型 b,然后发送到相关的 Handler c 这样 @ 987654334@。似乎没有多大意义。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-07-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-09-03
相关资源
最近更新 更多