Richard Rodger
Posted on March 22, 2024
This is the third post in a series I'm writing about a new Minimal Viable Product we've released at Voxgig that turns your podcast into a chatbot. Your visitors can now ask your guests questions directly! The first post is here: Building a Podcast Chatbot for Voxgig and you find the full list at the end of this post.
The problem with podcasts is that they tend to have new episodes. This is annoying because you can't just ingest all the episodes once and be done with it. You have to set up a regular process (once a day is sufficient) to download the podcast RSS feed, check for new episodes and changes to old episodes, and run part of the data ingestion process again.
To model this business logic, the system uses two separate messages:
-
aim:ingest,subscribe:podcast
: subscribe to a podcast, called only once to setup the podcast in the system -
aim:ingest:ingest:podcast
: actually process each episode and ingest the content
The ingest:podcast
message is the one we will run on a daily basis. For the moment, let's ignore the devops side of this system (it's still under development at the time of writing), and focus on processing the episodes of a podcast.
The podcast RSS gives us the list of episodes, so we need to download the RSS to get the latest version. This does mean that on the initial subscription, in our system, the RSS gets downloaded twice. We can easily solve this problem by adding a parameter to the ingest:podcast
message (and we may yet do that in future), but for now, we are deliberately not going to solve this problem. The other more important parts of the codebase to work on. Downloading the same RSS twice to complete a one-time business process is not exactly a fundamental issue. Let's invoke "premature optimization is the root of all evil" and leave it at that for the moment.
This little decision lets us keep the ingest:podcast
message implementation simpler for now. Let's look at the code, but this time also think about error handling. This message assumes we have already subscribed to a podcast in our system. That, for the moment, just means we saved an entry in the pdm/podcast
table.
Since we have abstracted away the database (see the previous post in this series), we'll use the term table to refer to a generic idea of an entity store that can persist records that look like JSON documents, and can query these records, at least using to level properties. That's pretty much all we need. We use the convention
entity-base/entity-name
to describe these entity stores and give them some namespacing.
The podcast entry must exist, so we need to know its ID. This must be a parameter to the message action, let's call it podcast_id
. We try to load this podcast and need to respond with an error if it does not exist.
Is this error a system error, one that is in some way fatal to the process, or would leave to corrupted data? Nope. This is an ordinary everyday business logic bug. Somehow a podcast ID got sent to us that isn't correct. We just reject it, but there's no need to panic. Thus, we don't throw an exception, or return a bad HTTP error, or do anything out of the ordinary. We just respond with a JSON message that indicates, by a convention of its schema, that there was a business problem.
The general schema that we use for message responses (when there is one!) is:
{
ok: boolean, // If true, message was accepted and processed successfully
why: string, // If ok is false, optionally supply a free-form debugging code useful to humans
// Other properties, perhaps well-defined, providing additional response data
}
Let's put all this together in code:
async function ingest_podcast(this: any, msg: any, meta: any) {
const seneca = this
const debug = seneca.shared.debug(meta.action)
const out: any = { ok: false, why: '' }
const podcast_id = out.podcast_id = msg.podcast_id
debug && debug('START', podcast_id)
let podcastEnt = await seneca.entity('pdm/podcast').load$(podcast_id)
if (null == podcastEnt) {
out.why = 'podcast-not-found'
debug && debug('FAIL-PODCAST-ENTITY', podcast_id, out)
return out
}
// Process episodes here...
return out
}
It's useful to have a "debug" mode for your microservices, that can produce additional log entries for debugging, both locally and when deployed. Some log entries might involve additional work to generate, so you want to avoid that when running normally.
Also, it's tedious to keep adding context information to each log entry, such as the name of the microservice, the name of the message action function, and so on. Thus we make use of a shared utility that provides a debug function: seneca.shared.debug
, and we pass in the current message details (the meta
parameter) so it can generate a full log entry.
If debug mode is not enabled, seneca.shared.debug
will return null
, so we can use that to short circuit any costly log entry code, using the idiom:
debug && debug(...)
We'll cover shared utilities in a later post, but if you want to look at the code, review ingest-prepare.ts.
Our code tries to load the podcast, and if it is not found, bails immediately by returning the response:
{
ok: false,
podcast_id: '...',
why: 'podcast-not-found'
}
We do a few things here to make debugging easier. When deployed, debugging becomes harder, so the more information you can give yourself, the better.
As well as the expected ok
and why
properties, we also include the podcast_id
parameter in the response. In a system under high load, you might only see the body of the response in the current section of the log that you're looking at, so give yourself as much context as possible.
For debug mode, we also emit a log entry, and add a test prefix that is easier to identify or search for: "FAIL-PODCAST-ENTITY"
. Ultimately you'll want to look at the line of code that caused the problem, but having a unique string is a great help when isolating the flow of the error. It also helps a great deal with working remotely with colleagues. This is a poor substitute for proper error codes for sure, but is a classic 80/20 solution that can be improved as needed.
You'll also notice some defensive pessimistic code here. We assume the message will fail, and initialize ok: false
. This also helps us be lazy developers, as we only have to set the why
property when returning early due to an error.
Most error conditions in this codebase are handled in the same way, and you can see them in the source code. We'll omit them in most cases it the rest of this series to keep the example code shorter.
Let's proceed to the next step: download and save the RSS:
const batch: string = out.batch = msg.batch
...
let podcastEnt = await seneca.entity('pdm/podcast').load$(podcast_id)
let feed = podcastEnt.feed
let rssRes = await seneca.shared.getRSS(debug, feed, podcast_id, mark)
let rss = rssRes.rss
let feedname = encodeURIComponent(feed.toLowerCase().replace(/^https?:\/\//, ''))
await seneca.entity('pdm/rss').save$({
id: 'rss01/' + feedname + '/' +
podcastEnt.id + '-' + batch + '.rss',
rss
})
We introduce a new parameter, batch
, which is a string identifying the current podcast ingestion process. This is a classic "batch" job run once a day, so we create a unique identifier to help track this job (and you thought using LLMs and AI models was sexy! No, production coding is the same old boring thing it's always been since the 1950's - batch jobs!).
The table pdm/rss
is special - it is a folder in an S3 bucket where we dump the RSS files. It is not special within our code, however, and looks just like any other database table. We do specify the data entity id
directly, as this will become the file name in the S3 bucket.
As we discussed in a previous post, using the same abstraction for different persistence mechanisms makes our code much cleaner and easier to refactor. Changing cloud provider, or even just deciding to use an actual database table in future, won't require any changes to this code. But more importantly, we can unit test our business logic locally without even setting up fake S3 docker containers or other nonsense.
We store the RSS because we will need it for debugging, and we might want to use it in other ways later. Also, it is good to have a record of the state of a given podcast RSS at a given date and time so we can track changes.
We get the individual episode details from the items
property of the RSS (we parse it using the rss-parser
package) to handle RSS variants. We loop over each episode and emit a process:episode
event for each one.
We're not bothering to check for changes to episodes here. That logic should live in process:episode
as that message "understands" episodes.
We also include the episode data in each message. This is a calculated risk. In general, you should keep your messages small, and reference data using an id. But in this case, we can safely assume the RSS content is "reasonable". If the content is too large, we'll just let this message fail and then think about whether we even want to solve the problem later.
let episodes = rss.items
out.episodes = episodes.length
episodeEnd = episodes.length
for (let epI = episodeStart; epI < episodeEnd; epI++) {
let episode = episodes[epI]
await handleEpisode(episode, epI)
}
async function handleEpisode(episode: any, epI: number) {
await seneca.post('aim:ingest,process:episode', {
episode,
podcast_id,
batch,
})
}
out.ok = true
return out
When deployed, the process:episode
message is triggered asynchronously, but we still wait for the message queue to accept it (hence the await
). Locally it is just a normal message and we wait for the actual business logic to complete.
Once we've sent all the process:episode
messages, we're done. We set ok: true
and return a response.
In the next post in this series, we'll look at the processing of each episode, including (finally!) our first usage of an LLM, which we'll use to extract additional information from the episode description.
Other Posts in this Series
Posted on March 22, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.