Making a chat app using Angular Js 2 and RxJs
The screenshot for this app is here.
The cod for this app is also available at Github here. If you have purchased the book and downloaded the zip code of the source code that comes with the book then go to the directory code/rxjs/chat by changing to the directory as shown in step 1 below and do the following:
- cd code/rxjs/chat
- npm install
- nom run go
- Open chrome or firefox and in the address bar type http://localhot:8080
- Play with the application by clicking the unread messages available and chat with the bots.
Three top level components have been used to build this application. They are given below:
- ChatNavBar - This component is displayed at the top of the screen in the browser and shows among other things the number of unread messages. Note that when you click an unread thread and load its messages in the ChatWindow the number of unread messages decrease.
- ChatThreads - The number of chat threads are shown in the ChatNavBar but the actual chat threads are here and each row represents one chat thread. Once you click on a chat thread the messages in that chat thread will be loaded in the component ChatWindow. The ChatThread component listens for the most recent list of threads from the ThreadService.
- ChatWindow - The window that had a box for typing and that you used to type your message as well as read the previous messages if any. The ChatWindow component subscribes for the most recent list of messages.
The components subscribe to the streams maintained by the services and then render according to the most recent values.
The app has three models given below:
- Thread - This model stores a collection of messages as well as some data about the conversation in a thread. Each thread has a unique id. Moreover, a thread has a name, a source for the avatar avatarSrc shown to represent that thread and the lastMessage which sotres the last message in this thread.
- Message - One Message is stored here. To identify a message uniquely a message has an id. It has a time sentAt. A boolean isRead showing if the message has been read or not. A string variable text which has the text of the message. Moreover, a Message has an author and a thread to which it belongs.
- User - A human being chatting with a bot in the app is a user. A user has an id, a name and and avatarSrc.
Each model has a corresponding service. In this app a singleton objet has been used to provide a service. The singleton object not only provide streams of data that our app can subscribe to but also provide operations to add new data or modify existing data. The services used in this app are given below:
- UserService
- MessageService
- ThreadService
The services maintain streams and these streams emit models. An example of a model in this app is a Message. The MessageService maintains a stream that emits Messages. The UserService maintains a stream that emits the current user. Moreover, it provides a method setCurrentUser that emits the current user from the currentUser stream.
We will build this app in the following three stages.
- Implement our three models namely Thread, Message and User.
- Create services UserService, MessageService and ThreadService which manage streams.
- Implement the three components namely ChatNavBar, ChatThreads and ChatWindow.
Implementing the Models
Implementation of the Model User:
| User |
|---|
| export class User{ id: string; /* We are using typescript shorthand in the constructor below. Whenever a new instance is created, that instance get two public properties apart from the property id. The values of the arguments to the constructor are assigned to those public properties that together with the property id make an instance of the class USer. */ constructor(public name: string, public avatarSrc: string){ this.id = uuid(); } }//class User ends here. |
Implementation of the Model Thread:
| Thread |
|---|
| export class Thread{ id: string; name: string; avatarSrc: string; lastMessage: Message; /* The reference lastMessage points to the last Message in a thread in order to display this last message in the preview of this thread to the user. */ constructor(id?: string, name?: string, avatarSrc?: string){ this.id = id || uuid(); this.name = name; this.avatarSrc = avatarSrc; } }//class Thread ends here. |
Implementation of the Model Message:
| Message |
|---|
| export class Message id: string; text: string; author: string; isRead: boolean; sentAt: Date; thread: Thread; constructor(obj?: any){ this.id = (obj && obj.id) || uuid(); this.text = (obj && obj.text) || null; this.author = (obj && obj.author) || null; this.isRead = (obj && obj.isRead) || false; this.sentAt = (obj && obj.sentAt) || new Date(); this.thread = (obj && obj.thread) || null; } }//class Message ends here. /* We can call the contructor above without any arguments or with some of the arguments to make a new instance of class Message. In this way we can create a new instance with whatever data we have available. Moreover, we don't have to worry about the order of the arguments or if some arguments are missing. For example, we can create a new instance like this. let message = new Message({ text: "Hi Mr. xyz. How are you today?" });*/ |
Implementing the Services
Implementation of the Service UserService:
We will make the UserService class injectable so that it could be used as a dependency to the other components in the app. The job of the UserService is to provide a place where the app can learn about the current user and notify the rest of the app in case the current user changes.
| UserService |
|---|
| import { Injectable, bind } from 'angular2/core'; import { Subject, BehaviorSubject } from 'rxjs'; import { User } from '../models'; @Injectable() export class UserService{ /* In the line below we set up a stream which will be used to manage the current user. A subject can be thought of as a "read/write" stream. */ currentUser: Subject<User> = new BehaviorSubject<User>(null); public setCurrentUser(newUser: User): void{ /* We use the next method on a Subject (read/write stream) to push a new value to the stream. */ this.currentUser.next(newUser); } }//class UserService ends here. export var userServiceInjectables: Array<any> = [bind(UserService).toClass(UserService)]; |
Messages are published immediately to the stream and therefore there is a chance that a new subscriber misses the latest value of the stream. To overcome this we use the constructor of BehaviorSubject to create a new instance because BehaviorSubject stores the last value in a special property. Due to this special property a subscriber to the stream will receive the latest value. Any part of the application that subscribes to the UserService.currentUser stream will immediately come to know who the current user is.
So how to do we set a new user. There are two ways to do it.
One way is to create a new user and add the user directly to the stream like this.
let user = new User('Alex', 'imageSource');
userService = new UserService();
userService.subscribe(
(user) => {
console.log('New user is: ', user.name);
}
);
userService.currentUser.next(user);
The next method will pust the new value i.e., user into the stream.- Another way to update the current user is to create a helper method to do the job.
Here is how we can do it.
public setCurrentUser(user: User) : void {
this.currentUser.next(user);
}
This second technique is better as implementation of the currentUser is decoupled from the implementation of the stream. In the second technique we wrap the method next in the helper method setCurrentUser and this gives us room to change the implementation of the UserService class without breaking our clients that subscribe to the UserService class. Maintainability is important in large projects and decoupling is part and parcel of writing maintainable code.
Implementation of the Service MessagesService:
In row two of the following table the code for MessagesService has been commented extensively to explain the code. Row 3 of the same table has the same code but without comments for quick reference.
| MessagesService |
|---|
import {Injectable, bind} from 'angular2/core';
import {Subject, Observable} from 'rxjs'; import {User, Thread, Message} from '../models'; let initialMessages: Message[] = []; interface IMessagesOperation extends Function { /* Accepts a list of messges actually an array of objects of type Message. Returns an array of objects of type Message. */ (messages: Message[]): Message[]; } @Injectable() export class MessagesService { // a stream that publishes new messages only once /* Subject can be thought of as a read/write stream. */ newMessages: Subject<Message> = new Subject<Message>(); // `messages` is a stream that emits an array of the most up to date messages messages: Observable<Message[]>; /* updates is a stream which is a stream of functions. Any function that is put on the stream updates will change the list of current messages stored in 'messages'. Any function that is put on the updates stream should accept a list of Message objects and return a list of Message objects. `updates` receives _operations_ to be applied to our `messages`. It's a way we can perform changes on *all* messages (that are currently stored in `messages`) */ updates: Subject<any> = new Subject<any>(); /* action streams - the create stream is a regular stream. The term 'action stream' is meant to describe its role in our service. */ create: Subject<Message> = new Subject<Message>(); markThreadAsRead: Subject<any> = new Subject<any>(); constructor() { /***************************************************************************/ this.messages = this.updates // watch the updates and accumulate operations on the messages /* The scan runs the function for each element in the incoming stream and accumulates a value. scan will emit a value for each intermediate result and doesn't wait for the stream to complete before emitting a result. */ .scan((messages: Message[], /* The messages we are acuumulating in this pass */ operation: IMessagesOperation) => { /* The new operation to apply in this pass. */ return operation(messages); /* Returning the new Message[]. */ }, initialMessages) /* Make sure we can share the most recent list of messages across anyone who's interested in subscribing and cache the last known list of messages. */ .publishReplay(1) .refCount(); /* refCount makes it easier to use the return value of publishReplay by managing when the ovservable will emit values. */ /***************************************************************************/ /* Next in our constructor we configure the create stream. The create stream takes a Message object and then puts an operation (the inner function) on the 'updates' stream to add the Message object to the list of messages. The map operator works on streams and it runs the inner function once for each item (Message object) in the stream create and emits the return value of the function. In this case we are saying "for each Message we receive as input, return an IMessagesOperation that adds this Message to the list 'messages'. In other words we can say that the create stream will emit a function for each Message and that function accpts the Message and adds it to our list of messages. That is for each item that gets added to 'create' stream (by using 'next') the stream 'create' emits a concat operation function. Next we subscribe 'this.updates' to listen to the stream 'create' which means tht it will receive each operation (in this case the concat operation) that is created. That is we are subscribing the 'updates' stream to listen to 'create' stream which means that when 'create' receives a message it will emit an IMessagesOperation that will be received by the 'updates' stream and then the Message will be added to messages: Message[]. */ this.create .map( function(message: Message): IMessagesOperation { return (messages: Message[]) => { return messages.concat(message); }; }) .subscribe(this.updates); /***************************************************************************/ this.newMessages .subscribe(this.create); /* The 'create' stream is subscribing to the newMessages stream which is a stream that publishes new messages only once. Now our flow is complete. We get individual messages from the stream newMessages and we can get the most up-to-date list from the stream 'messages'. */ /***************************************************************************/ /* The stream`markThreadAsRead` takes a Thread and then puts an operation on the `updates` stream to mark the Messages as read. The map operator works on the stream markThreadAsRead and for each thread in this stream it runs the function () once for each item/thread and emits the return value of the function. */ this.markThreadAsRead .map( (thread: Thread) => { /* In this pass get all the messages in the list (Message[]). */ return (messages: Message[]) => { /* In this pass get one message from the list of messages (Message[]). */ return messages.map( (message: Message) => { /* Get the thread for this message and then get the id of the thread. Then compare it to the id of the thread 'thread' passed to the mapfunction to check if it has been read. Each message has a property isRead. Set that to true and false accordingly. */ // note that we're manipulating `message` directly here. Mutability // can be confusing and there are lots of reasons why you might want // to, say, copy the Message object or some other 'immutable' here if (message.thread.id === thread.id) { message.isRead = true; } return message; }); }; }) /* The stream 'updates' will subscribe to the message returned by this.markThreadAsRead.map() and this will set the property isRead of this messges to true or false accordingly. */ .subscribe(this.updates); } /************************************************************************/ // an imperative function call to this action stream addMessage(message: Message): void { this.newMessages.next(message); } messagesForThreadUser(thread: Thread, user: User): Observable<Message> { return this.newMessages .filter((message: Message) => { // belongs to this thread return (message.thread.id === thread.id) && // and isn't authored by this user (message.author.id !== user.id);/* Return all message of other users. */ }); } } export var messagesServiceInjectables: Array<any> = [ bind(MessagesService).toClass(MessagesService) ]; |
| /*Here is the code for this service without comments and explanation. */ import {Injectable, bind} from 'angular2/core'; import {Subject, Observable} from 'rxjs'; import {User, Thread, Message} from '../models'; let initialMessages: Message[] = []; interface IMessagesOperation extends Function { (messages: Message[]): Message[]; } @Injectable() export class MessagesService { newMessages: Subject<Message> = new Subject<Message>(); messages: Observable<Message[]>; updates: Subject<any> = new Subject<any>(); create: Subject<Message> = new Subject<Message>(); markThreadAsRead: Subject<any> = new Subject<any>(); constructor() { this.messages = this.updates .scan((messages: Message[], operation: IMessagesOperation) => { return operation(messages); }, initialMessages).publishReplay(1).refCount(); this.create .map( function(message: Message): IMessagesOperation { return (messages: Message[]) => { return messages.concat(message); }; }).subscribe(this.updates); this.newMessages.subscribe(this.create); this.markThreadAsRead .map( (thread: Thread) => { return (messages: Message[]) => { return messages.map( (message: Message) => { if (message.thread.id === thread.id) { message.isRead = true; } return message; }); }; }).subscribe(this.updates); } addMessage(message: Message): void { this.newMessages.next(message); } messagesForThreadUser(thread: Thread, user: User): Observable<Message> { return this.newMessages .filter((message: Message) => { return (message.thread.id === thread.id) && (message.author.id !== user.id); }); } } export var messagesServiceInjectables: Array<any> = [ bind(MessagesService).toClass(MessagesService) ]; |
Implementation of the Service ThreadsService:
In row two of the following table the code for ThreadsService has been commented extensively to explain the code. Row 3 of the same table has the same code but without comments for quick reference.
Four streams have been used in the ThreadsService.
More explanation in the comments in the code for ThreadsService below.
Four streams have been used in the ThreadsService.
- threads - This is a stream that maps the current set of Threads.
- orderedThreads - This stream keeps the list of Threads in chronological order i.e., newest first.
- currentThread - This stream is for the currently selected Thread.
- currentThreadMessages - This stream is the list of Messages for the currently selected Thread.
More explanation in the comments in the code for ThreadsService below.
| ThreadsService |
|---|
| import {Injectable, bind} from 'angular2/core'; import {Subject, BehaviorSubject, Observable} from 'rxjs'; import {Thread, Message} from '../models'; import {MessagesService} from './MessagesService'; import * as _ from 'underscore'; @Injectable() export class ThreadsService { /* threads is a stream (Observable) that has the most up-to-date list of threads. This stream will emit a map(an object). The object has a key and value as shown below: key is of type string [key: string] which is the id of a Thread value is an object of type Thread which is the thread associated with the given id. */ threads: Observable<{ [key: string]: Thread }>; // `orderedThreads` contains a newest-first chronological list of threads orderedThreads: Observable<Thread[]>; // `currentThread` contains the currently selected thread currentThread: Subject<Thread> = new BehaviorSubject<Thread>(new Thread()); /* `currentThreadMessages` contains the set of messages for the currently selected thread. */ currentThreadMessages: Observable<Message[]>; constructor(public messagesService: MessagesService) { /* In the service MessagesService, each time a new Message is created, the stream messages will emit an array of the current Messages . Here, we are goin to look at each Message and return a unique list of the Threads. */ this.threads = messagesService.messages /* The map operator works on streams and it runs the inner function once for each item in the stream 'messages' and emits the return value of the function. */ .map( (messages: Message[]) => { /* Each time i.e, in each pass of operator map we will create a new list of 'threads'. The reason for this is because we might delete some messages down the line (e.g., leave the conversation). Because we are recalculating the list of threads each time, we naturally will delete a thread that has no messages. */ let threads: {[key: string]: Thread} = {}; /* Store the message's thread in our accumulator `threads`. Each time we will create a new list of threads. The reason for this that the user might leave the converstation and, therefore, messages of such a user might have been deleted. Since we are recalculating the list of threads in each pass of the oerpator map, lists with no messages will be deleted as we start from a message and get its thread and then id of that Thread threads[message.thread.id]. */ messages.map((message: Message) => { /* In each pass of the operator map do the following: 1. Get a Messge from the stream. 2. Get the Thread to which the message belong. 3. Get the id of that Thread. 4. Check if the key already exists then we retain the value otherwise we assign the Thread of step 3 to the key. This can also be written like the following: if(!threads[message.thread.id]) { threads[message.thread.id] = message.thread; } */ threads[message.thread.id] = threads[message.thread.id] || message.thread; /*****************************************************************************/ /* On the webpage we show threads and in the preview of each thread we also show the text of the last message (the most recent message) in that thread. To do that we have to sore the most recent message for each Thread by comparing the property sentAt of all Messages in a particular thread. */ // Cache the most recent message for each thread let messagesThread: Thread = threads[message.thread.id]; if (!messagesThread.lastMessage || messagesThread.lastMessage.sentAt < message.sentAt) { messagesThread.lastMessage = message; } }); /*****************************************************************************/ return threads; }); /*****************************************************************************/ /* `this.orderedThreads` contains a newest-first chronological list of threads. */ this.orderedThreads = this.threads /* this.threads is a stream (Observable) that has the most up-to-date list of threads. This stream will emit a map(an object). threads: Observable<{ [key: string]: Thread }>; */ .map( ( threadGroups: { [key: string]: Thread } ) => { let threads: Thread[] = _.values(threadGroups); return _.sortBy(threads, (t: Thread) => t.lastMessage.sentAt).reverse(); }); /*****************************************************************************/ this.currentThreadMessages = this.currentThread .combineLatest(messagesService.messages, (currentThread: Thread, messages: Message[]) => { if (currentThread && messages.length > 0) { return _.chain(messages) .filter((message: Message) => (message.thread.id === currentThread.id)) .map((message: Message) => { message.isRead = true; return message; }) .value(); } else { return []; } }); /*****************************************************************************/ /* /* The stream`markThreadAsRead` takes a Thread as argument and then puts an operation on the `updates` stream to mark the Messages as read. The map operator works on the stream markThreadAsRead and for each thread in this stream it runs the function () once for each item/thread and emits the return value of the function. In the code line below we are connecting the stream markThreadAsRead to the stream currentThread so that markThreadAsRead can gets its argument of type Thread and mark all its messages as read. */ this.currentThread.subscribe(this.messagesService.markThreadAsRead); }//constructor ends here. /*****************************************************************************/ setCurrentThread(newThread: Thread): void { this.currentThread.next(newThread); } /*****************************************************************************/ }//class ThreadsService ends here. export var threadsServiceInjectables: Array<any> = [ bind(ThreadsService).toClass(ThreadsService) ]; |
| /*Here is the code for this service without comments and explanation. */
import {Injectable, bind} from 'angular2/core';
import {Subject, BehaviorSubject, Observable} from 'rxjs'; import {Thread, Message} from '../models'; import {MessagesService} from './MessagesService'; import * as _ from 'underscore'; @Injectable() export class ThreadsService { threads: Observable<{ [key: string]: Thread }>; orderedThreads: Observable<Thread[]>; currentThread: Subject<Thread> = new BehaviorSubject<Thread>(new Thread()); currentThreadMessages: Observable<Message[]>; constructor(public messagesService: MessagesService) { this.threads = messagesService.messages.map( (messages: Message[]) => { let threads: {[key: string]: Thread} = {}; messages.map((message: Message) => { threads[message.thread.id] = threads[message.thread.id] || message.thread; let messagesThread: Thread = threads[message.thread.id]; if (!messagesThread.lastMessage || messagesThread.lastMessage.sentAt < message.sentAt) { messagesThread.lastMessage = message; } }); return threads; }); this.orderedThreads = this.threads.map( ( threadGroups: { [key: string]: Thread } ) => { let threads: Thread[] = _.values(threadGroups); return _.sortBy(threads, (t: Thread) => t.lastMessage.sentAt).reverse(); }); this.currentThreadMessages = this.currentThread .combineLatest(messagesService.messages, (currentThread: Thread, messages: Message[]) => { if (currentThread && messages.length > 0) { return _.chain(messages) .filter((message: Message) => (message.thread.id === currentThread.id)) .map((message: Message) => { message.isRead = true; return message; }) .value(); } else { return []; } }); this.currentThread.subscribe(this.messagesService.markThreadAsRead); }//constructor ends here. setCurrentThread(newThread: Thread): void { this.currentThread.next(newThread); } }//class ThreadsService ends here. export var threadsServiceInjectables: Array<any> = [ bind(ThreadsService).toClass(ThreadsService) ]; |
Implementation of the Service xxxxxService:
In row two of the following table the code for xxxxService has been commented extensively to explain the code. Row 3 of the same table has the same code but without comments for quick reference.
No comments:
Post a Comment