Отправка сообщений клиенту в реальном времени с помощью NodeJS и Server-Sent Events

andreysm

Andrey Smirnov

Posted on May 19, 2022

Отправка сообщений клиенту в реальном времени с помощью NodeJS и Server-Sent Events

Обзор

Технология Server-Sent Events(SSE) позволяющая отправлять информацию с сервера клиенту в реальном времени, основана на HTTP протоколе.

На клиентской стороне server-sent events API предоставляет EventSource интерфейс (часть HTML5 стандарта), посредством которого открывается постоянное соединение с HTTP сервером.

HTTP сервер отправляет события в text/event-stream формате. Соединение остаётся открытым до тех пор пока не будет вызван метод EventSource.close().

Ограничения:

  • Возможен только приём данных с сервера (однонаправленный поток данных, в отличии от WebSockets);
  • Данные могут передаваться только в UTF-8 формате (небинарные данные).

Возможные преимущества:

  • Работает через HTTP, а значит у клиентов не возникнет проблем с подключением в случае, когда они подключены через прокси, не поддерживающие другие соединения (такие как WebSockets);
  • Если соединение устанавливается через HTTPS, то трафик SSE защищён шифрованием.

Поддержка браузерами: https://caniuse.com/eventsource.

В этой статье мы разработаем приложение Todo List, которое позволяет пользователям добавлять, удалять, помечать как выполенные задания в списке.

Обратите внимание, состояние списка с помощью Server-sent Events будет общим для всех пользователей:

Image description

Шаг 1 - Разработка Express Backend



# Создаём и заходим в каталог проекта
mkdir sse
cd sse

# Создаём и заходим в подкаталог для для backend части проекта
mkdir server
cd server

# Инициализируем проект и устанавливаем необходимые пакеты
npm init -y
npm install express@^4.18.1 body-parser@^1.20.0 compression@^1.7.4 cors@^2.8.5 --save


Enter fullscreen mode Exit fullscreen mode

После установки пакетов в файл package.json необходимо добавить строку "type": "module" для того чтобы NodeJS мог работать с модулями.



{
  "name": "server",
  "type": "module"
  ...
}


Enter fullscreen mode Exit fullscreen mode

Создаём файл server.js и добавляем каркас приложения:



import express from 'express';
import compression from 'compression';
import bodyParser from 'body-parser';
import cors from 'cors';

const app = express();
app.use(compression());
app.use(cors());
app.use(bodyParser.json());

let clients = [];
let todoState = [];

app.get('/state', (req, res) => {
    res.json(todoState);
});

const PORT = process.env.PORT || 3005;
app.listen(PORT, () => {
    console.log(`Shared todo list server listening at http://localhost:${PORT}`);
});


Enter fullscreen mode Exit fullscreen mode

Запускаем сервер командой npm start. Если всё сделано правильно, то, сделав запрос curl http://localhost:3005/state вы увидете [] - пустой список todo листа.
Далее перед декларацией порта const PORT = process.env.PORT || 3005; добавляем код для подключения клиента через SSE:



app.get('/events', (req, res) => {
    const headers = {
        // Тип соединения 'text/event-stream' необходим для SSE
        'Content-Type': 'text/event-stream',
        'Access-Control-Allow-Origin': '*',
        // Отставляем соединение открытым 'keep-alive'
        'Connection': 'keep-alive',
        'Cache-Control': 'no-cache'
    };
    // Записываем в заголовок статус успешного ответа 200
    res.writeHead(200, headers);

    /*
    Формирование данных:
    Когда EventSource получает множество последовательных
    строк, начинающихся с data: они объединяются, вставляя
    символ новой строки между ними. Завершающие символы
    новой строки удаляются.
    Двойные символы конца строки \n\n обозначают конец
    события.
    */
    const sendData = `data: ${JSON.stringify(todoState)}\n\n`;

    res.write(sendData);
    // Если используется compression middleware, то необходимо
    // добавить res.flush() для отправки данных пользователю
    res.flush();

    // Создаём уникальный идентификатор клиента
    const clientId = genUniqId();
    const newClient = {
        id: clientId,
        res,
    };

    clients.push(newClient);

    console.log(`${clientId} - Connection opened`);

    req.on('close', () => {
        console.log(`${clientId} - Connection closed`);
        clients = clients.filter(client => client.id !== clientId);
    });
});

function genUniqId(){
    return Date.now() + '-' + Math.floor(Math.random() * 1000000000);
}


Enter fullscreen mode Exit fullscreen mode

Итак, мы написали код, позволяющий клиенту подключиться, установив постоянное соединение, а так же сохранили id и res в массиве клиентов, чтобы в дальнейшем мы могли отправлять данные подключенным клиентам.

Чтобы проверить, что всё работает, добавим код для передачи уникальных id подключенных пользователей.



app.get('/clients', (req, res) => {
    res.json(clients.map((client) => client.id));
});


Enter fullscreen mode Exit fullscreen mode

Запускаем сервер npm start.
Подключаемся в новом терминале к серверу:



curl -H Accept:text/event-stream http://localhost:3005/events


Enter fullscreen mode Exit fullscreen mode

В разных терминалах можно несколько раз повторить команду, чтобы сымитировать подключение нескольких клиентов.
Проверяем список подключившихся:



curl http://localhost:3005/clients


Enter fullscreen mode Exit fullscreen mode

В терминале вы должны увидеть массив id подключенных клиентов:



["1652948725022-121572961","1652948939397-946425533"]


Enter fullscreen mode Exit fullscreen mode

Теперь приступим к написанию бизнес логики приложения Todo List, нам нужно:
a) Добавлять задачу в todo список;
b) Удалять задачу из todo списка;
c) Устанавливать/снимать галочку выполнения задачи;
d) После каждого действия отправлять состояние всем подключенным клиентам.

Состояние todo списка будет выглядеть следующим образом:



[
   {
      id: "1652980545287-628967479",
      text: "Task 1",
      checked: true
   },
   {
      id: "1652980542043-2529066",
      text: "Task 2",
      checked: false
   },
   ...
]


Enter fullscreen mode Exit fullscreen mode

Где id - уникальный идентификатор, генерируемый сервером, text - текст задачи, checked - состояние галочки задачи.

Начнём с d) - после каждого действия отправлять состояние всем подключенным клиентам:



function sendToAllUsers() {
    for(let i=0; i<clients.length; i++){
        clients[i].res.write(`data: ${JSON.stringify(todoState)}\n\n`);
        clients[i].res.flush();
    }
}


Enter fullscreen mode Exit fullscreen mode

Затем реализуем a) b) и c):



// Добавляем новую задачу в список и отправляем
// состояние всем клиентам
app.post('/add-task', (req, res) => {
    const addedText = req.body.text;
    todoState = [
        { id: genUniqId(), text: addedText, checked: false },
        ...todoState
    ];
    res.json(null);
    sendToAllUsers();
});

// Изменяем состояние выполнения задачи в списке
// и отправляем результат всем клиентам
app.post('/check-task', (req, res) => {
    const id = req.body.id;
    const checked = req.body.checked;
    todoState = todoState.map((item) => {
        if(item.id === id){
            return { ...item, checked };
        }
        else{
            return item;
        }
    });
    res.json(null);
    sendToAllUsers();
});

// Удаляем задачу из списка и отправляем новое
// состояние списка всем клиентам
app.post('/del-task', (req, res) => {
    const id = req.body.id;
    todoState = todoState.filter((item) => {
        return item.id !== id;
    });
    res.json(null);
    sendToAllUsers();
});


Enter fullscreen mode Exit fullscreen mode

Итак, серверная часть готова. Полный код серверной части:



import express from 'express';
import compression from 'compression';
import bodyParser from 'body-parser';
import cors from 'cors';

const app = express();
app.use(compression());
app.use(cors());
app.use(bodyParser.json());

let clients = [];
let todoState = [];

app.get('/state', (req, res) => {
    res.json(todoState);
});

app.get('/events', (req, res) => {
    const headers = {
        'Content-Type': 'text/event-stream',
        'Access-Control-Allow-Origin': '*',
        'Connection': 'keep-alive',
        'Cache-Control': 'no-cache'
    };
    res.writeHead(200, headers);

    const sendData = `data: ${JSON.stringify(todoState)}\n\n`;
    res.write(sendData);
    res.flush();

    const clientId = genUniqId();

    const newClient = {
        id: clientId,
        res,
    };

    clients.push(newClient);

    console.log(`${clientId} - Connection opened`);

    req.on('close', () => {
        console.log(`${clientId} - Connection closed`);
        clients = clients.filter(client => client.id !== clientId);
    });
});

function genUniqId(){
    return Date.now() + '-' + Math.floor(Math.random() * 1000000000);
}

function sendToAllUsers() {
    for(let i=0; i<clients.length; i++){
        clients[i].res.write(`data: ${JSON.stringify(todoState)}\n\n`);
        clients[i].res.flush();
    }
}

app.get('/clients', (req, res) => {
    res.json(clients.map((client) => client.id));
});

app.post('/add-task', (req, res) => {
    const addedText = req.body.text;
    todoState = [
        { id: genUniqId(), text: addedText, checked: false },
        ...todoState
    ];
    res.json(null);
    sendToAllUsers();
});

app.post('/check-task', (req, res) => {
    const id = req.body.id;
    const checked = req.body.checked;

    todoState = todoState.map((item) => {
        if(item.id === id){
            return { ...item, checked };
        }
        else{
            return item;
        }
    });
    res.json(null);
    sendToAllUsers();
});

app.post('/del-task', (req, res) => {
    const id = req.body.id;
    todoState = todoState.filter((item) => {
        return item.id !== id;
    });

    res.json(null);
    sendToAllUsers();
});

const PORT = process.env.PORT || 3005;
app.listen(PORT, () => {
    console.log(`Shared todo list server listening at http://localhost:${PORT}`);
});


Enter fullscreen mode Exit fullscreen mode

Приступаем ко второму шагу - клиентской части.

Шаг 2 - Клиентская часть: React приложение

Перейдём в ранее созданную папку проекта sse, затем запустим команду создания шаблона react приложения:



npx create-react-app client


Enter fullscreen mode Exit fullscreen mode

Далее перейдём в папку созданного приложения и запустим его:



cd client
npm start


Enter fullscreen mode Exit fullscreen mode

После чего в браузере должна открыться страница клиентского приложения http://localhost:3000.

Далее переходим к файлу src/index.js и удаляем React.StrictMode из приложения.



// Было
root.render(
   <React.StrictMode>
      <App />
   </React.StrictMode>
);
// Стало
root.render(
   <App />
);


Enter fullscreen mode Exit fullscreen mode

Дело в том что React StrictMode дважды рендерит компоненты в режиме development для обнаружения возможных проблем. Но в нашем случае это не нужно, иначе клиент будет дважды подключаться к серверу и устанавливать постоянное соединение.

Удалим из файла App.css всё содержимое и вставим свои стили:



h1 {
    text-align: center;
}
main {
    display: flex;
    justify-content: center;
}
.l-todo {
    max-width: 31.25rem;
}
form {
    margin-bottom: 1rem;
}
form input[type="submit"] {
    margin-left: 0.5rem;
}
.task-group {
    margin-bottom: 0.125rem;
    display: flex;
    flex-wrap: nowrap;
    justify-content: space-between;
}
.task-group button {
    padding: 0.25rem 0.5rem;
    margin-left: 0.5rem;
    border: none;
    background-color: white;
}


Enter fullscreen mode Exit fullscreen mode

Подготовим каркас приложения, удалим из файла App.js всё содержимое и вставим свой код:



import './App.css';
import { useState, useEffect, useRef } from 'react';

function App(){
   return(
      <main>
      </main>
   );
}

export default App;


Enter fullscreen mode Exit fullscreen mode

Добавим в наш App компонент хук состояния списка:



const [tasks, setTasks] = useState([]);


Enter fullscreen mode Exit fullscreen mode

Теперь добавим хук useEffect в котором мы будем устанавливать постоянное SSE соединение:



useEffect(() => {
        let mount = true;
        let events;
        let timer;
        let createEvents = () => {
            // Закрываем соединение если открыто
            if(events){
                events.close();
            }
            // Устанавливаем SSE соединение
            events = new EventSource(`http://localhost:3005/events`);
            events.onmessage = (event) => {
                // Если компонент смонтирован, устанавливаем
                // полученными данными состояние списка
                if(mount){
                    let parsedData = JSON.parse(event.data);
                    setTasks(parsedData);
                }
            };
            // Если возникает ошибка - ждём секунду и
            // снова вызываем функцию подключения
            events.onerror = (err) => {
                timer = setTimeout(() => {
                    createEvents();
                }, 1000);
            };
        };
        createEvents();

        // Перед размонтированием компонента отчищаем
        // таймер и закрываем соединение
        return () => {
            mount = false;
            clearTimeout(timer);
            events.close();
        }
    }, []);


Enter fullscreen mode Exit fullscreen mode

Теперь при открытии клиентского сайта http://localhost:3000 произойдёт соединение с сервером и сервер отправит состояние todo списка подключившемуся клиенту. Клиент, получив данные установит состояние todo списка.

Разработаем компонент интерфейса для добавления новой задачи в список.
Image description
Добавим в проект файл src/AddTask.js



function AddTask(props){
    const { text, onTextChange, onSubmit, textRef } = props;
    return(
        <form onSubmit={onSubmit}>
            <input
                type="text"
                name="add"
                value={text}
                onChange={onTextChange}
                ref={textRef}
            />
            <input
                type="submit"
                value="Добавить"
            />
        </form>
    );
}

export default AddTask;


Enter fullscreen mode Exit fullscreen mode

Создадим элемент списка:
Image description
Добавим в проект файл src/Task.js:



function Task(props){
    const { id, text, checked, onCheck, onDel } = props;
    return(
        <div className="task-group">
            <div>
                <input
                    type="checkbox"
                    name={`chk${id}`}
                    id={`chk${id}`}
                    checked={checked}
                    onChange={onCheck}
                />
                <label htmlFor={`chk${id}`}>{text}</label>
            </div>
            <button
                id={`btn${id}`}
                onClick={onDel}>x
            </button>
        </div>
    );
}

export default Task;


Enter fullscreen mode Exit fullscreen mode

Подключим файлы созданные файлы в App.js:



import AddTask from './AddTask';
import Task from './Task';


Enter fullscreen mode Exit fullscreen mode

В нашем приложении мы будем передавать данные на сервер в JSON формате, поэтому прежде чем двигаться дальше мы напишем небольшую обёртку для javascript fetch API для упрощения клиентского кода. Создадим файл /src/jsonFetch.js:



function jsonFetch(url, data){
    return new Promise(function(resolve, reject){
        fetch(url, {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json'
            },
            body: JSON.stringify(data)
        })
        .then(function(res){
            if(res.ok){
                const contentType = res.headers.get('content-type');
                if(contentType && contentType.includes('application/json')){
                    return res.json();
                }
                return reject(`Не JSON, content-type: ${contentType}`);
            }
            return reject(`Статус: ${res.status}`);
        })
        .then(function(res){
            resolve(res);
        })
        .catch(function(error){
            reject(error);
        });
    });
}

export default jsonFetch;


Enter fullscreen mode Exit fullscreen mode

Подключим созданный файл в App.js:



import jsonFetch from './jsonFetch';


Enter fullscreen mode Exit fullscreen mode

Теперь добавим наши компоненты AddTask и Task в App компонент:



function App(){
    const [addTaskText, setAddTaskText] = useState('');
    const [tasks, setTasks] = useState([]);
    const addTextRef = useRef(null);

    useEffect(() => {
      // Код SSE
       ...
    },[]);

    const tasksElements = tasks.map((item) => {
        return(
            <Task
                key={item.id}
                id={item.id}
                text={item.text}
                checked={item.checked}
                onCheck={handleTaskCheck}
                onDel={handleTaskDel}
            />
        );
    });

    return (
        <main>
            <div className="l-todo">
                <h1>Todo List</h1>
                <AddTask
                    text={addTaskText}
                    onSubmit={handleAddTaskSubmit}
                    onTextChange={handleAddTaskTextChange}
                    textRef={addTextRef}
                />
                {tasksElements}
            </div>
        </main>
    );
}


Enter fullscreen mode Exit fullscreen mode

Напишем обработчики событий пользователя в App компоненте:



function handleAddTaskTextChange(event){
   setAddTaskText(event.target.value);
}

function handleAddTaskSubmit(event){
   event.preventDefault();
   let addedText = addTaskText.trim();
   if(!addedText){
       return setAddTaskText('');
   }
   jsonFetch('http://localhost:3005/add-task', {text: addedText})
   .then(() => {
       setAddTaskText('');
   })
   .catch((err) => {
       console.log(err);
   })
   .finally(() => {
       addTextRef.current.focus();
   });
}

function handleTaskCheck(event){
   const checked =  event.target.checked;
   const targetId = event.target.id.substring(3);

   jsonFetch('http://localhost:3005/check-task', {id: targetId, checked})
   .catch((err) => {
       console.log(err);
   });
}

function handleTaskDel(event){
   let targetId = event.target.id.substring(3);

   jsonFetch('http://localhost:3005/del-task', {id: targetId})
   .catch((err) => {
       console.log(err);
   });
}


Enter fullscreen mode Exit fullscreen mode

Итак, логика работы приложения: при монтировании компонента создаётся SSE подключение к серверу, который при подключении передаёт состояние списка. После получения состояние списка с сервера оно устанавливается клиенту setTasks(parsedData).
Далее, при добавлении, удалении и установке/снятии галочки задачи - изменения отправляются на сервер, там записываются в todoState и передаются всем подключенным пользователям.

Полный код клиентского приложения:



import './App.css';
import { useState, useEffect, useRef } from 'react';
import AddTask from './AddTask';
import Task from './Task';
import jsonFetch from './jsonFetch';

function App(){
    const [addTaskText, setAddTaskText] = useState('');
    const [tasks, setTasks] = useState([]);
    const addTextRef = useRef(null);

    useEffect(() => {
        let mount = true;
        let events;
        let timer;
        let createEvents = () => {
            if(events){
                events.close();
            }
            events = new EventSource(`http://localhost:3005/events`);
            events.onmessage = (event) => {
                if(mount){
                    let parsedData = JSON.parse(event.data);
                    setTasks(parsedData);
                }
            };
            events.onerror = (err) => {
                timer = setTimeout(() => {
                    createEvents();
                }, 1000);
            };
        };
        createEvents();

        return () => {
            mount = false;
            clearTimeout(timer);
            events.close();
        }
    }, []);

    const tasksElements = tasks.map((item) => {
        return(
            <Task
                key={item.id}
                id={item.id}
                text={item.text}
                checked={item.checked}
                onCheck={handleTaskCheck}
                onDel={handleTaskDel}
            />
        );
    });

    return (
        <main>
            <div className="l-todo">
                <h1>Todo List</h1>
                <AddTask
                    text={addTaskText}
                    onSubmit={handleAddTaskSubmit}
                    onTextChange={handleAddTaskTextChange}
                    textRef={addTextRef}
                />
                {tasksElements}
            </div>
        </main>
    );

    function handleAddTaskTextChange(event){
        setAddTaskText(event.target.value);
    }

    function handleAddTaskSubmit(event){
        event.preventDefault();
        let addedText = addTaskText.trim();
        if(!addedText){
            return setAddTaskText('');
        }
        jsonFetch('http://localhost:3005/add-task', {text: addedText})
        .then(() => {
            setAddTaskText('');
        })
        .catch((err) => {
            console.log(err);
        })
        .finally(() => {
            addTextRef.current.focus();
        });
    }

    function handleTaskCheck(event){
        const checked =  event.target.checked;
        const targetId = event.target.id.substring(3);

        jsonFetch('http://localhost:3005/check-task', {id: targetId, checked})
        .catch((err) => {
            console.log(err);
        });
    }

    function handleTaskDel(event){
        let targetId = event.target.id.substring(3);

        jsonFetch('http://localhost:3005/del-task', {id: targetId})
        .catch((err) => {
            console.log(err);
        });
    }
}

export default App;


Enter fullscreen mode Exit fullscreen mode

Ставьте лайки, пишите комментарии.

💖 💪 🙅 🚩
andreysm
Andrey Smirnov

Posted on May 19, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related