ChatRoom.scala 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package models
  2. import akka.actor._
  3. import scala.concurrent.duration._
  4. import scala.language.postfixOps
  5. import play.api._
  6. import play.api.libs.json._
  7. import play.api.libs.iteratee._
  8. import play.api.libs.concurrent._
  9. import akka.util.Timeout
  10. import akka.pattern.ask
  11. import play.api.Play.current
  12. import play.api.libs.concurrent.Execution.Implicits._
  13. object ChatRoom {
  14. implicit val timeout = Timeout(1 second)
  15. lazy val default = Akka.system.actorOf(Props[ChatRoom])
  16. def join(username:String):scala.concurrent.Future[(Iteratee[JsValue,_],Enumerator[JsValue])] = {
  17. (default ? Join(username)).map {
  18. case Connected(enumerator) =>
  19. // Create an Iteratee to consume the feed
  20. val iteratee = Iteratee.foreach[JsValue] { event =>
  21. default ! Talk(username, (event \ "text").as[String])
  22. }.map { _ =>
  23. default ! Quit(username)
  24. }
  25. (iteratee,enumerator)
  26. case CannotConnect(error) =>
  27. // Connection error
  28. // A finished Iteratee sending EOF
  29. val iteratee = Done[JsValue,Unit]((),Input.EOF)
  30. // Send an error and close the socket
  31. val enumerator = Enumerator[JsValue](JsObject(Seq("error" -> JsString(error)))).andThen(Enumerator.enumInput(Input.EOF))
  32. (iteratee,enumerator)
  33. }
  34. }
  35. }
  36. class ChatRoom extends Actor {
  37. val tabooGame = Akka.system.actorOf(Props(classOf[TabooGame], self))
  38. var members = Map.empty[String, Concurrent.Channel[JsValue]]
  39. val (chatEnumerator, chatChannel) = Concurrent.broadcast[JsValue]
  40. def receive = {
  41. case Join(username) => {
  42. if(members.contains(username)) {
  43. sender ! CannotConnect("This username is already used")
  44. } else {
  45. val (personalEnumerator, personalChannel) = Concurrent.broadcast[JsValue]
  46. members = members + (username -> personalChannel)
  47. sender ! Connected(chatEnumerator.interleave(personalEnumerator))
  48. self ! NotifyJoin(username)
  49. tabooGame ! Join(username)
  50. }
  51. }
  52. case NotifyJoin(username) => {
  53. notifyAll("join", username, "has entered the room")
  54. }
  55. case Talk(username, text) => {
  56. notifyAll("talk", username, text)
  57. tabooGame ! Talk(username, text)
  58. }
  59. case Tell(username, text, to) => {
  60. notifyOne(to, "talk", username, text)
  61. }
  62. case Quit(username) => {
  63. members = members - username
  64. notifyAll("quit", username, "has left the room")
  65. tabooGame ! Quit(username)
  66. }
  67. }
  68. def notifyOne(to: String, kind: String, user: String, text: String) {
  69. val msg = JsObject(
  70. Seq(
  71. "kind" -> JsString(kind),
  72. "user" -> JsString(user),
  73. "message" -> JsString(text),
  74. "members" -> JsArray(
  75. members.keys.toList.map(JsString)
  76. )
  77. )
  78. )
  79. members(to).push(msg)
  80. }
  81. def notifyAll(kind: String, user: String, text: String) {
  82. val msg = JsObject(
  83. Seq(
  84. "kind" -> JsString(kind),
  85. "user" -> JsString(user),
  86. "message" -> JsString(text),
  87. "members" -> JsArray(
  88. members.keys.toList.map(JsString)
  89. )
  90. )
  91. )
  92. chatChannel.push(msg)
  93. }
  94. }
  95. case class Join(username: String)
  96. case class Quit(username: String)
  97. case class Talk(username: String, text: String)
  98. case class Tell(username: String, text: String, to: String)
  99. case class NotifyJoin(username: String)
  100. case class Connected(enumerator:Enumerator[JsValue])
  101. case class CannotConnect(msg: String)