Fetch API和SSE:服务器发送事件变得简单

技术 · 05-27
Fetch API和SSE:服务器发送事件变得简单

使用 Fetch API 和 SSE 实现服务器发送事件

服务器发送事件(Server-Sent Events,简称SSE)是一种使得服务器能够向客户端推送信息的机制。在传统的Web应用中,客户端需要定期轮询服务器以获取最新数据,而SSE提供了一种更高效的方式,让服务器能够主动向客户端推送数据。在本篇文章中,我们将探讨如何使用Fetch API和SSE来实现服务器发送事件。

SSE简介

SSE是一种轻量级的长连接技术,它允许服务器向客户端推送数据,而不需要客户端频繁地请求。SSE使用HTTP协议,并通过text/event-stream内容类型来传输数据。服务器保持连接打开,并可以随时向客户端发送数据。

使用EventSource

在浏览器中,你可以使用EventSourceAPI来轻松地处理SSE。以下是一个简单的示例代码,展示了如何使用JavaScript处理SSE:

// 创建一个新的EventSource连接,指定服务器的URL
var source = new EventSource('/path/to/server-endpoint');
// 监听服务器的消息事件
source.onmessage = function(event) {
  // 当服务器发送消息时,此函数会被调用
  console.log('Received message:', event.data);
};
// 监听open事件,服务器连接成功时触发
source.onopen = function(event) {
  console.log('Connection opened:', event);
};
// 监听error事件,连接出错时触发
source.onerror = function(error) {
  console.log('Error occurred:', error);
};
// 可以自定义事件类型的监听器
source.addEventListener('myevent', function(event) {
  var data = JSON.parse(event.data);
  console.log('Received custom event:', data);
});

在上面的代码中,/path/to/server-endpoint是服务器提供的SSE接口。客户端通过EventSource对象与服务器建立连接,并监听不同的事件类型,如默认的message事件、open事件、error事件,以及自定义事件。服务器发送的消息可以通过event.data访问。

类(Class)封装方案

首先,我们提供一个使用类来封装SSE处理的方案:

class SSEClient {
  constructor(url) {
    this.url = url;
    this.eventListeners = {};
    this.reader = null;
    this.buffer = '';
  }
  connect() {
    this.controller = new AbortController();
    this.signal = this.controller.signal;
    fetch(this.url, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/x-www-form-urlencoded',
        // "Content-Type": "application/json",
      },
      body: 'your=post&data=here',
      signal: this.signal
    })
    .then(response => {
      if (
        response.headers
            .get("Content-Type")
            .includes("text/event-stream")
      ) {
        this.reader = response.body.getReader();
        this.decoder = new TextDecoder();
        this.read();
      } else {
        throw new Error(`Expected Content-Type "text/event-stream", but received "${response.headers.get('Content-Type')}"`);
      }
    })
    .catch(error => {
      console.error('Error fetching SSE:', error);
    });
  }
  disconnect() {
    if (this.reader) {
      this.reader.cancel();
    }
    if (this.controller) {
      this.controller.abort();
    }
  }
  read() {
    if (!this.reader) {
      throw new Error('Reader is not initialized');
    }
    this.reader.read().then(({ done, value }) => {
      if (done) {
        console.log('Stream complete');
        return;
      }
      this.buffer += this.decoder.decode(value, { stream: true });
      const lines = this.buffer.split('\n');
      this.buffer = lines.pop(); // 保留最后一个不完整的行
      lines.forEach(line => {
        if (line.trim().length === 0) return; // 忽略空行
        // const [type, data] = line.split(': ');
        // if (type === 'data') {
        //   this.dispatchEvent('message', data);
        // }
        if (line.startsWith("data:")) {
            const data = JSON.parse(line.slice(5));
            this.dispatchEvent("message", data);
        }
      });
      this.read();
    }).catch(error => {
      console.error('Error reading stream:', error);
    });
  }
  addEventListener(type, callback) {
    if (!this.eventListeners[type]) {
      this.eventListeners[type] = [];
    }
    this.eventListeners[type].push(callback);
  }
  removeEventListener(type, callback) {
    if (!this.eventListeners[type]) {
      return;
    }
    const index = this.eventListeners[type].indexOf(callback);
    if (index !== -1) {
      this.eventListeners[type].splice(index, 1);
    }
  }
  dispatchEvent(type, data) {
    if (!this.eventListeners[type]) {
      return;
    }
    this.eventListeners[type].forEach(callback => {
      callback(data);
    });
  }
}
// 使用示例
const sseClient = new SSEClient('/path/to/server-endpoint');
sseClient.addEventListener('message', data => {
  console.log('Received data:', data);
});
sseClient.connect();

在这个类中,我们封装了连接、读取事件流、以及分发事件的逻辑。你可以通过addEventListener方法添加事件监听器,通过removeEventListener方法移除事件监听器,通过connect方法建立连接,以及通过disconnect方法断开连接。

非类封装方案

如果你不想使用类,也可以使用模块化的方式来编写处理SSE的代码:

const decoder = new TextDecoder();
function createSSEClient(url) {
  let reader = null;
  let buffer = '';
  const eventListeners = {};
  function connect() {
    const controller = new AbortController();
    const signal = controller.signal;
    fetch(url, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/x-www-form-urlencoded',
      },
      body: 'your=post&data=here',
      signal: signal
    })
    .then(response => {
      if (response.headers.get('Content-Type') === 'text/event-stream') {
        reader = response.body.getReader();
        read();
      } else {
        throw new Error('Expected Content-Type "text/event-stream", but received "${response.headers.get('Content-Type')}"');
      }
    })
    .catch(error => {
      console.error('Error fetching SSE:', error);
    });
    return {
      disconnect: () => controller.abort()
    };
  }
  function read() {
    if (!reader) {
      throw new Error('Reader is not initialized');
    }
    reader.read().then(({ done, value }) => {
      if (done) {
        console.log('Stream complete');
        return;
      }
      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split('\n');
      buffer = lines.pop(); // 保留最后一个不完整的行
      lines.forEach(line => {
        if (line.trim().length === 0) return; // 忽略空行
        const [type, data] = line.split(': ');
        if (type === 'data') {
          dispatchEvent('message', data);
        }
      });
      read();
    }).catch(error => {
      console.error('Error reading stream:', error);
    });
  }
  function addEventListener(type, callback) {
    if (!eventListeners[type]) {
      eventListeners[type] = [];
    }
    eventListeners[type].push(callback);
  }
  function removeEventListener(type, callback) {
    if (!eventListeners[type]) {
      return;
    }
    const index = eventListeners[type].indexOf(callback);
    if (index !== -1) {
      eventListeners[type].splice(index, 1);
    }
  }
  function dispatchEvent(type, data) {
    if (!eventListeners[type]) {
      return;
    }
    eventListeners[type].forEach(callback => {
      callback(data);
    });
  }
  return {
    connect,
    addEventListener,
    removeEventListener
  };
}
// 使用示例
const sseClient = createSSEClient('/path/to/server-endpoint');
sseClient.addEventListener('message', data => {
  console.log('Received data:', data);
});
const { disconnect } = sseClient.connect();
// 当你需要断开连接时
// disconnect();

在这个非类封装方案中,我们创建了一个createSSEClient函数,它返回一个对象,包含connectaddEventListenerremoveEventListener方法。这样,你可以在不同的地方导入并使用这个函数来创建SSE客户端,并根据自己的需求添加事件监听器和处理逻辑。
请注意,这个示例使用了ES6模块语法。如果你在不支持ES6模块的环境中工作,你可能需要使用CommonJS语法或其他模块系统。

总结

无论是使用类还是函数,封装SSE客户端的逻辑都是为了重用和模块化代码。类的方式提供了更明确的封装和对象导向的编程风格,而函数方式则提供了更灵活的模块化。选择哪种方式取决于你的个人偏好和项目的具体需求。不过,两种方式都允许你轻松地处理服务器发送的事件,并能够适应不同的使用场景。

JavaScript SSE
Theme Jasmine by Kent Liao