【问题标题】:How can I share variables between threads or otherwise handle this logic?如何在线程之间共享变量或以其他方式处理此逻辑?
【发布时间】:2020-09-06 13:36:10
【问题描述】:

我正在服务器上运行快速约会风格的聊天会话。

它是这样工作的:

  1. 用户请求加入会话
  2. 当 20 个用户请求加入一个会话时,会创建一个新会话
  3. 会话运行时,2 个用户组成的组在聊天中配对
  4. 聊天结束后,这 2 个用户返回用户池以再次配对
  5. 最终会话结束

我正在尝试弄清楚如何处理会话和配对

我不知道如何在线程之间传递套接字并跟踪它们

我在二进制套接字上使用 JSON,并且我正在使用 Slick 连接到 MySQL 数据库。

我认为我的线程架构是合乎逻辑的,但如果有什么不合理的地方请告诉我:

ChatServer (main app, 
|           starts 1 ServerHandler thread, 
|           starts 1 SessionWaiter thread, 
|           then loops waiting for server-side commands)
├──ServerHandler (loops waiting for new clients, 
|  |              starts a new ClientHandler thread for each client)
|  └──ClientHandler (each thread communicates with 1 client, 
|                    client can request to join a chat session, 
|                    then database is updated to show the request)
└──SessionWaiter (loops checking database every 5 seconds, 
   |              if 20 Users are requesting a session then it creates a new session in the database, 
   |              assigns those 20 Users to that SessionID, 
   |              and creates 1 SessionRunner thread to handle the session, 
   |              passing the 20 client sockets to the SessionRunner - BUT HOW?)
   └──SessionRunner (each thread handles 1 Session (20 Users), pairing Users in Chats, until the Session ends)

application.conf:

mydb = {
  driver = "com.mysql.cj.jdbc.Driver",
  url = "jdbc:mysql://localhost:3306/chatsession?serverTimezone=UTC&useSSL=false",
  user = "root",
  password = "password",
  connectionPool = disabled
}

内置.sbt:

scalaVersion := "2.13.1"
scalacOptions += "-deprecation"
libraryDependencies ++= Seq(
  "com.typesafe.slick" %% "slick" % "3.3.2",
  "org.slf4j" % "slf4j-nop" % "1.7.26",
  "com.typesafe.slick" %% "slick-hikaricp" % "3.3.2",
  "mysql" % "mysql-connector-java" % "6.0.6",
  "com.typesafe.play" %% "play-json" % "2.8.0"
)

Main.scala:

import java.net.ServerSocket
import java.io.PrintStream
import java.io.BufferedReader
import java.io.InputStreamReader
import java.io.BufferedWriter
import java.io.OutputStreamWriter
import java.net.Socket
import slick.jdbc.MySQLProfile.api._
import scala.concurrent.Future
import scala.concurrent.blocking // needed if using "blocking { }"
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.collection.mutable.ArrayBuffer
import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success}
import java.util.concurrent.{Executors, ExecutorService} // for threads
import play.api.libs.json._

class ClientHandler(socket: Socket) extends Runnable {
  def run() : Unit = {
    val inputstream = new BufferedReader(new InputStreamReader(socket.getInputStream()))
    val outputstream = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()))
    var break = false
    var startTime = System.currentTimeMillis()
    outputstream.write("welcome") // convert to json
    outputstream.newLine()
    outputstream.flush()
    println("client welcomed")
    while(!break){
      if(inputstream.ready()){
        val input = inputstream.readLine() // blocking call
        println("client: " + input)
        val json: JsValue = Json.parse(input)
        val command = (json \ "command").validate[String].getOrElse("unknown command")
        command match {
          case "connect" =>
            val id = (json \ "id").validate[String].getOrElse("unknown command")
            println(id + " connected")
          case "joinsession" =>
            // update user row in database (set WantsToJoin=1, LastActive=CurrentTime)
            // should I store their socket number in the db or can I pass it internally somehow?
            // respond to client and tell them they are in a queue
          case "ping" =>
            // client program sends a ping every 3 seconds
            // if ping or another command is not received for 5 seconds, 
            // then the socket will be closed below
          case _ => // any other input, break and close socket
            println("breaking and closing socket")
            break = true
        }
        startTime = System.currentTimeMillis()
      } else if (System.currentTimeMillis() - startTime >= 5000) {
        break = true
      }
    }
    socket.close()
  }
}

class ServerHandler(serversocket: ServerSocket) extends Runnable {
  def run() : Unit = {
    while(true) {
      val socket = serversocket.accept // blocking call
      println("client connected")
      (new Thread(new ClientHandler(socket))).start()
    }
  }
}

// each thread of this class will manage an individual session with 10 users
class SessionRunner() extends Runnable {
  def run() : Unit = {
    while(true) {
      // have an array of the 10 users (with each socket, userid from database, etc)
      // take over each user's socket connection 
      // how do I get the sockets?
    }
  }
}

// one thread of this class will be run in a loop
// every 5 seconds it will check how many users are requesting a session
// if there are 10 users requesting a session, a new SessionRunner thread will be created
// -and passed the 10 sockets? of those 10 users so it knows which clients to contact
// how do I keep track of those sockets and pass them?
class SessionWaiter() extends Runnable {
  def run() : Unit = {
    while(true) {
      // time out for 5 seconds
      // do a database read
      // if there are 20 users:
      // -who are requesting a session, and
      // -who have been online within the last 30 seconds
      // then create a new thread to handle that session
      // update those user rows to show:
      //  -they are no longer requesting a session, and
      //  -that they are in a session, and update sessionid
      //  -so they can rejoin the session if they lose and regain connection before the session ends
      (new Thread(new SessionRunner())).start()
      // -(how do I pass the 10 users' client sockets to that thread??)
    }
  }
}

// TDL: server prints to console every 10 seconds:
// # of active sessions, # of users in sessions, # of users waiting for a session
object ChatServer extends App {
  val server = new ServerSocket(10000)
  (new Thread(new ServerHandler(server))).start()

  var break = false
  while(!break) {
    print(">")
    val input = scala.io.StdIn.readLine() // blocking call
    input match {
      case "quit" =>
        println("\nQUITTING")
        server.close()
        break = true
      case _ =>
        println("\nUnrecognized command:"+input+"<")
    }
  }
}

我被困的地方: 如何管理 20 个用户的套接字,以便我可以将它们配对并在配对之间中继聊天?

当我有 20 个用户在等待会话时,我想从数据库(用户 ID、用户名等)构建他们的套接字和其他用户数据的数组,并将该数组传递给创建的 SessionRunner 线程处理该会话。

然后该线程应该控制这 20 个套接字并处理管理会话并与客户端通信。

但是当用户离开 Session 时,SessionHandler 应该再次将他们的套接字交还给 ClientHandler,他们应该能够请求加入新的 Session 或以其他方式与 ClientHandler 通信。

【问题讨论】:

    标签: multithreading scala sockets concurrency future


    【解决方案1】:

    免责声明:除此之外,当您拥有大量用于此类任务的非常强大的库和解决方案时,您会同时使用非常低级的并发原语(线程、可运行),尤其是在Scala(Futures、Akka、Akka-streams、cats effect、fs2 等),我将尽力帮助您,我将使用最简单的方法和原语,使其具有当前代码的风格。我希望这是一项学习任务,而不是生产解决方案的一部分。

    您至少可以按照当前程序的风格采用 2 种方式:

    • 共享全局状态并从静态上下文中引用它
    • 共享状态并通过构造函数将其传递到模块(对象)中

    对于一个更简单的(静态上下文中的全局状态),它会像下面这样。

    SessionWaiter 中,您需要某个集合的引用,您可以从中获取 10 个套接字并创建会话。重要的是,您必须以线程安全的方式使用该集合。我建议一个更简单的 - 通过synchronized 块。 我们将假设该集合仅包含请求连接。 为此,我们将编写一个包含该集合引用的静态对象:

    type MyConnectionType = Socket
    object RequestingConnectionsHolder {
      @volatile var requestingConnections: List[MyConnectionType] = List()
    }
    

    SessionWaiter 中,您将检查是否有至少 10 个连接,使用前 10 个连接启动会话并将它们从集合中删除。

    class SessionWaiter() extends Runnable {
      def run() : Unit = {
        while(true) {
          val collection = RequestingConnectionsHolder.requestingConnections
          val sessionSize = 10
          RequestingConnectionsHolder.synchronized {
            if(collection.size >= sessionSize) {
              val (firsts, lasts) = (collection.take(10), collection.drop(10))
              (new Thread(new SessionRunner(firsts))).start()
              RequestingConnectionsHolder.requestingConnections = lasts
            }
          }
        }
      }
    }
    

    ClientHandler 中,在这种情况下,当连接/客户端传输时,您将填写requestingConnections。据我了解,它会在case "joinsession" =&gt;

    case "joinsession" => RequestingConnectionsHolder.synchronized { RequestingConnectionsHolder.requestingConnections += socket }
    

    最后在SessionRunner 之后(当)你结束一个会话并且它的连接变成“请求一个新会话”,你必须将它添加到requestingConnections

    class SessionRunner(connections: List[MyConnectionType]) extends Runnable {
      def run() : Unit = {
        while(sessionIsAlive) { }
        RequestingConnectionsHolder.synchronized {
          RequestingConnectionsHolder.requestingConnections += connections
        }
      }
    }
    

    在某些时候,您应该检查连接是否已关闭,并且您需要将其从请求连接中删除。

    再说一次,这不是解决此类任务的方式。上面的代码非常非常糟糕。但是,如果您在学习阶段需要练习原始线程和最简单的并发原语 - 没关系。

    为了更好的解决方案,以你目前的知识和技能水平,我建议你探索Akka,因为你的任务似乎非常适合通过处理的actor模型来解决。

    【讨论】:

      猜你喜欢
      • 2021-01-27
      • 2019-05-01
      • 2021-05-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-22
      相关资源
      最近更新 更多