In my quest for an efficient way to stream data from a backend API endpoint to the frontend application, I sought an
alternative to the widely-used WebSockets API
.
Having heard of it before, my search led me
to server-sent events (
SSE), which aligned with most of my requirements except one crucial aspect.
The SSE specification assumes
that the request will always be a GET
request, a constraint which is also evident
in EventSource API, in which you cannot change the HTTP
verb used for the request.
My use-case involved sending a complex object to the endpoint when initializing the stream, a task that isn't well-suited for a query string format, which the SSE spec unfortunately imposes.
I could have probably solved this problem by:
However, this approach has its drawbacks:
Due to these trade-offs, I decided to explore alternative implementations.
The implementation below is written in C#
for the backend and vanilla JavaScript
and HTML
for the frontend.
The following implementation demonstrates a simple 'stock feed'. A user can filter a specific stock and receive a stream of randomized stock prices at random intervals. Although not a realistic use case, it offers an easy-to-understand example of the technical implementation.
We will be creating a few classes that define what kind of details we want to display regarding stocks.
public static class StockCodes
{
public readonly static string[] All =
{
Google, Amazon, Nvidia, Meta, Tesla
};
public const string Google = "GOOGL";
public const string Amazon = "AMZN";
public const string Nvidia = "NVDA";
public const string Meta = "META";
public const string Tesla = "TSLA";
}
/// <summary>
/// Fictive stock price at a certain point in time
/// </summary>
/// <param name="Code">Stock NASDAQ Code</param>
/// <param name="Value">Stock price</param>
/// <param name="Date">Moment in time</param>
public sealed record Stock(string Code, decimal Value, DateTimeOffset Date);
Using the Stock class created above we'll be setting up a feed that holds our random generated stock values. We will be using a Bounded Channel with a limit of 5, for which the producers need to wait if the channel is full.
public class StockChannel
{
public Channel<Stock> Stocks { get; }
public StockChannel()
{
var channelOptions = new BoundedChannelOptions(5) // arbitrary limit for testing purposes
{
FullMode = BoundedChannelFullMode.Wait
};
Stocks = Channel.CreateBounded<Stock>(channelOptions);
}
}
Next, we create a producer that generates random stock values, and puts it on our channel. The idea is that it selects a random stock, generates a random date, and a random price for the stock, every second.
We are inheriting from BackgroundService to create a service that initiates at the startup of our application.
To get the best asynchronous support for generating the 'every second tick', we use PeriodicTimer which was (apparently) released in .NET 6.
public sealed class StockProducingService(StockChannel stockChannel) : BackgroundService
{
private readonly Random _rnd = new();
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
var tick = TimeSpan.FromSeconds(1);
using PeriodicTimer timer = new(tick);
while (!stoppingToken.IsCancellationRequested)
{
var stock = GenerateStock(_rnd);
await stockChannel.Stocks.Writer.WriteAsync(stock, stoppingToken);
await timer.WaitForNextTickAsync(stoppingToken);
}
stockChannel.Stocks.Writer.Complete();
}
private static Stock GenerateStock(Random rnd)
{
var dateTimeOffset = RandomDate(rnd);
var stockCode = RandomStockCode(rnd);
var price = RandomStockPrice(rnd);
return new Stock(stockCode, price, dateTimeOffset);
}
private static decimal RandomStockPrice(Random rnd)
{
var pre = rnd.Next(0, 1000);
var post = rnd.Next(1, 100);
var stringified = $"{pre},{post}";
return decimal.Parse(stringified);
}
private static string RandomStockCode(Random rnd) =>
StockCodes.All[rnd.Next(0, StockCodes.All.Length)];
private static DateTimeOffset RandomDate(Random rnd)
{
var now = DateTimeOffset.UtcNow;
var yearAgo = DateTimeOffset.UtcNow.AddYears(-1);
var timeSpan = now - yearAgo;
var randomSpan = new TimeSpan(0, rnd.Next(0, (int)timeSpan.TotalMinutes), 0);
return yearAgo.DateTime + randomSpan;
}
}
Having established the producing side, it's time to build the consuming side.
We'll start by creating a service that reads from our channel.
public sealed class StockFeedService(StockChannel stockChannel)
{
public async IAsyncEnumerable<Stock> ListenToStockFilteredAsync(string code, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await foreach (var item in stockChannel.Stocks.Reader.ReadAllAsync(cancellationToken))
{
if (string.Equals(code, item.Code, StringComparison.OrdinalIgnoreCase))
{
yield return item;
}
}
}
}
The backbone of our streaming will be made by utilizing the streaming behavior of IAsyncEnumerable
.
This paired with the usage of a CancellationToken
and the EnumeratorCancellation
attribute will allow us to easily
stop consuming when the user aborts the
request.
In our implementation we will be filtering on the code we've defined in our Stock record, and only yielding a result when the code matches the code of the record that has been generated.
We register the services we've made above in our Program.cs
StockChannel
as a Singleton
, to ensure everyone can consume the same exact feed.StockProducingService
as a HostedService so that the producing of our stocks starts up automatically.StockFeedService
as Scoped
so that every request on the endpoint we will be building can consume seperately.using System.Text.Json;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Net.Http.Headers;
using ServerSentEventsAlternative.Channels;
using ServerSentEventsAlternative.Mapping;
using ServerSentEventsAlternative.Models;
using ServerSentEventsAlternative.Services;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddRazorPages();
builder.Services.AddSingleton<StockChannel>();
builder.Services.AddHostedService<StockProducingService>();
builder.Services.AddScoped<StockFeedService>();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var app = builder.Build();
app.UseHttpsRedirection();
app.UseDefaultFiles();
app.UseStaticFiles();
app.UseRouting();
app.UseSwagger();
app.UseSwaggerUI();
app.Run();
We'll be writing an endpoint that would technically work for server-sent events
, if it had a GET
method instead of
a POST
method.
To expose our stock, we will be creating a few DTOs.
// Forces our DTOs to have a definition of the Event they represent, this is required due to the spec
public interface IServerSentEventData
{
public string EventName { get; }
}
// The actual SSE wrapper that takes in the EventName from the DTO
public record ServerSentEvent<T>(string Id, T Data) where T : IServerSentEventData
{
public string Event { get; init; } = Data.EventName;
}
// The DTO which we will expose. The EventName is JsonIgnored as we do not want to expose it in our JSON.
public record StockDto(string Code, decimal Value, DateTimeOffset Date) : IServerSentEventData
{
[JsonIgnore]
public string EventName { get; } = "Stock";
}
// The filter the user uses to filter on the right stock
public sealed record StockFilter(string Code);
We follow the spec by setting a few headers:
Subsequently we aim to listen to our StockFeedService
and map to our DTO with the ServerSentEvent
wrapper around it.
We are using this wrapper to match
the Event stream format.
According to the spec, event streams are always encoded in UTF-8, and this is something C# will gladly do for us when calling the
public static Task SerializeAsync<TValue>(
Stream utf8Json,
TValue value,
JsonSerializerOptions? options = null,
CancellationToken cancellationToken = default)
overload.
app.MapPost("stocks/sse",
static async (
HttpContext ctx,
[FromBody] StockFilter filter,
StockFeedService stockFeedService,
CancellationToken ct = default
) =>
{
ctx.Response.Headers.Add(HeaderNames.ContentType, "text/event-stream");
ctx.Response.Headers.Add(HeaderNames.CacheControl, CacheControlHeaderValue.NoCacheString);
ctx.Response.Headers.Add(HeaderNames.Connection, "keep-alive");
await foreach (var stock in stockFeedService.ListenToStockFilteredAsync(filter.Code, ct))
{
await JsonSerializer.SerializeAsync(ctx.Response.Body, stock.MapToServerSentEvent());
await ctx.Response.WriteAsync(string.Format("{0}{0}", Environment.NewLine));
await ctx.Response.Body.FlushAsync();
}
return ValueTask.CompletedTask;
});
We create a simple index.html
file in the wwwroot
folder of our project.
The aim is to build a no-frills, no styling, simple streaming example, calling our POST stocks/sse
endpoint.
To do this we will create:
input
in which the user can filter on a specific stock by codebutton
which allows the user to start and stop the streamdiv
in which the streaming responses will be added<html lang="en">
<div class="container">
<label for="code">
<input type="text" id="code" name="code" value="GOOGL"/>
</label>
<button id="send">Send</button>
<div class="response"></div>
</div>
<script>
let controller = new AbortController();
let signal = controller.signal;
// execute our POST call using the correct headers
async function fetchData() {
let code = document.getElementById('code').value;
let response = await fetch(`/stocks/sse`, {
signal,
method: 'POST',
headers: {
'Accept': 'text/event-stream',
'Content-Type': 'application/json'
},
body: JSON.stringify({code})
});
if (!response.ok) {
throw Error(response.statusText);
}
// keep reading the stream until it's empty
for (const reader = response.body.getReader(); ;) {
const {value, done} = await reader.read();
if (done) {
break;
}
const chunk = new TextDecoder().decode(value);
// extract the data out of the event wrapper
const subChunks = chunk.split(/(?<=})\n\ndata: (?={)/);
for (const subChunk of subChunks) {
// unwrap the chunk
const payload = subChunk.replace(/^data: /, "");
const parsed = JSON.parse(payload);
// add the response to our DOM
let listItem = document.createElement('li');
listItem.textContent = JSON.stringify(parsed);
document.querySelector('.response').appendChild(listItem);
}
}
}
// Stops the stream
function stopFetch() {
controller.abort();
controller = new AbortController(); // re-instantiate AbortController for the next request
signal = controller.signal;
document.getElementById('send').textContent = 'Send';
document.getElementById('send').addEventListener('click', handleSend);
}
// Start the stream
async function handleSend() {
document.getElementById('send').textContent = 'Stop';
document.getElementById('send').removeEventListener('click', handleSend);
document.getElementById('send').addEventListener('click', stopFetch);
await fetchData();
}
document.getElementById('send').addEventListener('click', handleSend);
</script>
</html>
A few things to note here:
We are not using EventSource
API as it does not allow us to use the POST
http method.
This also comes with some disadvantages as we lose some nice features such as:
AbortController
's signal allows us to easily abort our stream.TextDecoder
.After running our backend with dotnet run
and pressing the start button, we get to witness the streaming experience.
In our quest to find an effective for our streaming requirements, we've successfully crafted a solution
that employs POST
requests instead of limiting us to GET
requests. While this approach steers away from the
conventional
SSE specification, it satisfies our unique requirements, especially when dealing with complex objects during the
streaming initialization.
Even though our implementation does have some downsides, especially the lack of certain desirable features inherent of
the
EventSource
API, it offers a streamlined and efficient solution for this specific use case. A solution of which the
downsides can be mitigated considerably through having robust documentation and
clear communication within a or multiple development
teams.
In conclusion, this alternative SSE solution presents a viable option when faced with unique development constraints, even though it does require us to do 'the extra plumbing' to make it all work :)
You can find the source code on my GitHub repository.