【发布时间】:2020-09-06 13:36:10
【问题描述】:
我正在服务器上运行快速约会风格的聊天会话。
它是这样工作的:
- 用户请求加入会话
- 当 20 个用户请求加入一个会话时,会创建一个新会话
- 会话运行时,2 个用户组成的组在聊天中配对
- 聊天结束后,这 2 个用户返回用户池以再次配对
- 最终会话结束
我正在尝试弄清楚如何处理会话和配对
我不知道如何在线程之间传递套接字并跟踪它们
我在二进制套接字上使用 JSON,并且我正在使用 Slick 连接到 MySQL 数据库。
我认为我的线程架构是合乎逻辑的,但如果有什么不合理的地方请告诉我:
ChatServer (main app,
| starts 1 ServerHandler thread,
| starts 1 SessionWaiter thread,
| then loops waiting for server-side commands)
├──ServerHandler (loops waiting for new clients,
| | starts a new ClientHandler thread for each client)
| └──ClientHandler (each thread communicates with 1 client,
| client can request to join a chat session,
| then database is updated to show the request)
└──SessionWaiter (loops checking database every 5 seconds,
| if 20 Users are requesting a session then it creates a new session in the database,
| assigns those 20 Users to that SessionID,
| and creates 1 SessionRunner thread to handle the session,
| passing the 20 client sockets to the SessionRunner - BUT HOW?)
└──SessionRunner (each thread handles 1 Session (20 Users), pairing Users in Chats, until the Session ends)
application.conf:
mydb = {
driver = "com.mysql.cj.jdbc.Driver",
url = "jdbc:mysql://localhost:3306/chatsession?serverTimezone=UTC&useSSL=false",
user = "root",
password = "password",
connectionPool = disabled
}
内置.sbt:
scalaVersion := "2.13.1"
scalacOptions += "-deprecation"
libraryDependencies ++= Seq(
"com.typesafe.slick" %% "slick" % "3.3.2",
"org.slf4j" % "slf4j-nop" % "1.7.26",
"com.typesafe.slick" %% "slick-hikaricp" % "3.3.2",
"mysql" % "mysql-connector-java" % "6.0.6",
"com.typesafe.play" %% "play-json" % "2.8.0"
)
Main.scala:
import java.net.ServerSocket
import java.io.PrintStream
import java.io.BufferedReader
import java.io.InputStreamReader
import java.io.BufferedWriter
import java.io.OutputStreamWriter
import java.net.Socket
import slick.jdbc.MySQLProfile.api._
import scala.concurrent.Future
import scala.concurrent.blocking // needed if using "blocking { }"
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.collection.mutable.ArrayBuffer
import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success}
import java.util.concurrent.{Executors, ExecutorService} // for threads
import play.api.libs.json._
class ClientHandler(socket: Socket) extends Runnable {
def run() : Unit = {
val inputstream = new BufferedReader(new InputStreamReader(socket.getInputStream()))
val outputstream = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()))
var break = false
var startTime = System.currentTimeMillis()
outputstream.write("welcome") // convert to json
outputstream.newLine()
outputstream.flush()
println("client welcomed")
while(!break){
if(inputstream.ready()){
val input = inputstream.readLine() // blocking call
println("client: " + input)
val json: JsValue = Json.parse(input)
val command = (json \ "command").validate[String].getOrElse("unknown command")
command match {
case "connect" =>
val id = (json \ "id").validate[String].getOrElse("unknown command")
println(id + " connected")
case "joinsession" =>
// update user row in database (set WantsToJoin=1, LastActive=CurrentTime)
// should I store their socket number in the db or can I pass it internally somehow?
// respond to client and tell them they are in a queue
case "ping" =>
// client program sends a ping every 3 seconds
// if ping or another command is not received for 5 seconds,
// then the socket will be closed below
case _ => // any other input, break and close socket
println("breaking and closing socket")
break = true
}
startTime = System.currentTimeMillis()
} else if (System.currentTimeMillis() - startTime >= 5000) {
break = true
}
}
socket.close()
}
}
class ServerHandler(serversocket: ServerSocket) extends Runnable {
def run() : Unit = {
while(true) {
val socket = serversocket.accept // blocking call
println("client connected")
(new Thread(new ClientHandler(socket))).start()
}
}
}
// each thread of this class will manage an individual session with 10 users
class SessionRunner() extends Runnable {
def run() : Unit = {
while(true) {
// have an array of the 10 users (with each socket, userid from database, etc)
// take over each user's socket connection
// how do I get the sockets?
}
}
}
// one thread of this class will be run in a loop
// every 5 seconds it will check how many users are requesting a session
// if there are 10 users requesting a session, a new SessionRunner thread will be created
// -and passed the 10 sockets? of those 10 users so it knows which clients to contact
// how do I keep track of those sockets and pass them?
class SessionWaiter() extends Runnable {
def run() : Unit = {
while(true) {
// time out for 5 seconds
// do a database read
// if there are 20 users:
// -who are requesting a session, and
// -who have been online within the last 30 seconds
// then create a new thread to handle that session
// update those user rows to show:
// -they are no longer requesting a session, and
// -that they are in a session, and update sessionid
// -so they can rejoin the session if they lose and regain connection before the session ends
(new Thread(new SessionRunner())).start()
// -(how do I pass the 10 users' client sockets to that thread??)
}
}
}
// TDL: server prints to console every 10 seconds:
// # of active sessions, # of users in sessions, # of users waiting for a session
object ChatServer extends App {
val server = new ServerSocket(10000)
(new Thread(new ServerHandler(server))).start()
var break = false
while(!break) {
print(">")
val input = scala.io.StdIn.readLine() // blocking call
input match {
case "quit" =>
println("\nQUITTING")
server.close()
break = true
case _ =>
println("\nUnrecognized command:"+input+"<")
}
}
}
我被困的地方: 如何管理 20 个用户的套接字,以便我可以将它们配对并在配对之间中继聊天?
当我有 20 个用户在等待会话时,我想从数据库(用户 ID、用户名等)构建他们的套接字和其他用户数据的数组,并将该数组传递给创建的 SessionRunner 线程处理该会话。
然后该线程应该控制这 20 个套接字并处理管理会话并与客户端通信。
但是当用户离开 Session 时,SessionHandler 应该再次将他们的套接字交还给 ClientHandler,他们应该能够请求加入新的 Session 或以其他方式与 ClientHandler 通信。
【问题讨论】:
标签: multithreading scala sockets concurrency future