ChatRoom.scala 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package models
  2. import akka.actor._
  3. import scala.concurrent.duration._
  4. import scala.language.postfixOps
  5. import play.api._
  6. import play.api.libs.json._
  7. import play.api.libs.iteratee._
  8. import play.api.libs.concurrent._
  9. import akka.util.Timeout
  10. import akka.pattern.ask
  11. import play.api.Play.current
  12. import play.api.libs.concurrent.Execution.Implicits._
  13. object ChatRoom {
  14. implicit val timeout = Timeout(1 second)
  15. var chatRooms = Map.empty[String, ActorRef]
  16. var connectionCount = 0
  17. def closeRoom(room: String) = {
  18. chatRooms -= room
  19. }
  20. def join(room: String, username:String):scala.concurrent.Future[(Iteratee[JsValue,_],Enumerator[JsValue])] = {
  21. if(!chatRooms.keySet(room)) {
  22. chatRooms += (room -> Akka.system.actorOf(Props(classOf[ChatRoom], room)))
  23. }
  24. val actor = chatRooms(room)
  25. (actor ? Join(username)).map {
  26. case Connected(enumerator) =>
  27. connectionCount += 1
  28. // Create an Iteratee to consume the feed
  29. val iteratee = Iteratee.foreach[JsValue] { event =>
  30. actor ! Talk(username, (event \ "text").as[String])
  31. }.map { _ =>
  32. connectionCount -= 1
  33. actor ! Quit(username)
  34. }
  35. (iteratee,enumerator)
  36. case CannotConnect(error) =>
  37. // Connection error
  38. // A finished Iteratee sending EOF
  39. val iteratee = Done[JsValue,Unit]((),Input.EOF)
  40. // Send an error and close the socket
  41. val enumerator = Enumerator[JsValue](JsObject(Seq("error" -> JsString(error)))).andThen(Enumerator.enumInput(Input.EOF))
  42. (iteratee,enumerator)
  43. }
  44. }
  45. }
  46. class ChatRoom(name: String) extends Actor {
  47. val tabooGame = Akka.system.actorOf(Props(classOf[TabooGame], self))
  48. var members = Map.empty[String, Concurrent.Channel[JsValue]]
  49. val (chatEnumerator, chatChannel) = Concurrent.broadcast[JsValue]
  50. def receive = {
  51. case Join(username) => {
  52. if(members.contains(username)) {
  53. sender ! CannotConnect("This username is already used")
  54. } else {
  55. val (personalEnumerator, personalChannel) = Concurrent.broadcast[JsValue]
  56. members = members + (username -> personalChannel)
  57. sender ! Connected(chatEnumerator.interleave(personalEnumerator))
  58. tabooGame ! Join(username)
  59. }
  60. }
  61. case Talk(username, "/ping") => {
  62. self ! Tell(username, Json.obj(
  63. "kind" -> "pong"
  64. ))
  65. }
  66. case Talk(username, text) => {
  67. if(!text.startsWith("/")) {
  68. self ! Announce(Json.obj(
  69. "kind" -> "talk",
  70. "user" -> username,
  71. "message" -> text
  72. ))
  73. }
  74. tabooGame ! Talk(username, text)
  75. }
  76. case Announce(message) => {
  77. chatChannel.push(message)
  78. }
  79. case Tell(username, message) => {
  80. members(username).push(message)
  81. }
  82. case Quit(username) => {
  83. members = members - username
  84. if(members.isEmpty) {
  85. ChatRoom.closeRoom(name)
  86. context.stop(tabooGame)
  87. context.stop(self)
  88. }
  89. else {
  90. tabooGame ! Quit(username)
  91. }
  92. }
  93. }
  94. }
  95. case class Join(username: String)
  96. case class Quit(username: String)
  97. case class Talk(username: String, text: String)
  98. case class Announce(value: JsValue)
  99. case class Tell(username: String, value: JsValue)
  100. case class Connected(enumerator:Enumerator[JsValue])
  101. case class CannotConnect(msg: String)