Run pipeline workflows

davidmezzetti

David Mezzetti

Posted on March 27, 2021

Run pipeline workflows

txtai has a growing list of models available through it's pipeline framework. Pipelines wrap a machine learning model and transform data. Currently, pipelines can wrap Hugging Face models, Hugging Face pipelines or PyTorch models (support for TensorFlow is in the backlog).

The following is a list of the currently implemented pipelines.

  • Questions - Answer questions using a text context
  • Labels - Apply labels to text using a zero-shot classification model. Also supports similarity comparisions.
  • Summary - Abstractive text summarization
  • Textractor - Extract text from documents
  • Transcription - Transcribe audio to text
  • Translation - Machine translation

Pipelines are great and make using a variety of machine learning models easier. But what if we want to glue the results of different pipelines together? For example, extract text, summarize it, translate it to English and load it into an Embedding index. That would require code to join those operations together in an efficient manner.

Enter workflows. Workflows are a simple yet powerful construct that takes a callable and returns elements. Workflows don't know they are working with pipelines but enable efficient processing of pipeline data. Workflows are streaming by nature and work on data in batches, allowing large volumes of data to be processed efficiently.

Install dependencies

Install txtai and all dependencies. Since this article is using optional pipelines/workflows, we need to install the pipeline and workflow extras package.

pip install txtai[pipeline,workflow] sacremoses

# Get test data
wget -N https://github.com/neuml/txtai/releases/download/v2.0.0/tests.tar.gz
tar -xvzf tests.tar.gz
Enter fullscreen mode Exit fullscreen mode

Create a series of pipelines to use in this notebook

from txtai.pipeline import Summary, Textractor, Transcription, Translation

# Summary instance
summary = Summary()

# Text extraction
textractor = Textractor()

# Transcription instance
transcribe = Transcription("facebook/wav2vec2-large-960h")

# Create a translation instance
translate = Translation()
Enter fullscreen mode Exit fullscreen mode

Basic workflow

The following shows a basic workflow in action!

from txtai.workflow import Workflow, Task

# Workflow that translate text to French
workflow = Workflow([Task(lambda x: translate(x, "fr"))])

# Data to run through the pipeline
data = ["The sky is blue", "Forest through the trees"]

# Workflows are generators for efficiency, read results to list for display
list(workflow(data))
Enter fullscreen mode Exit fullscreen mode
['Le ciel est bleu', 'Forêt à travers les arbres']
Enter fullscreen mode Exit fullscreen mode

This isn't too different from previous pipeline examples. The only difference is data is feed through the workflow. In this example, the workflow calls the translation pipeline and translates text to French. Let's look at a more complex example.

Multistep workflow

The following workflow reads a series of audio files, transcribes them to text and translates the text to French. This is based on the classic txtai example from Introducing txtai.

Workflows take two main parameters. The action to execute which is a callable and a pattern to filter data with. Data that is accepted by the filter will be processed, otherwise it will be passed through to the next task.

from txtai.workflow import FileTask

tasks = [
    FileTask(transcribe, r"\.wav$"),
    Task(lambda x: translate(x, "fr"))
]

# List of files to process
data = [
  "txtai/US_tops_5_million.wav",
  "txtai/Canadas_last_fully.wav",
  "txtai/Beijing_mobilises.wav",
  "txtai/The_National_Park.wav",
  "txtai/Maine_man_wins_1_mil.wav",
  "txtai/Make_huge_profits.wav"
]

# Workflow that translate text to French
workflow = Workflow(tasks)

# Run workflow
list(workflow(data))
Enter fullscreen mode Exit fullscreen mode
["Les cas de virus U sont en tête d'un million",
 "La dernière plate-forme de glace entièrement intacte du Canada s'est soudainement effondrée en formant un berge de glace de taille manhatten",
 "Bagage mobilise les embarcations d'invasion le long des côtes à mesure que les tensions tiwaniennes s'intensifient",
 "Le service des parcs nationaux met en garde contre le sacrifice d'amis plus lents dans une attaque nue",
 "L'homme principal gagne du billet de loterie",
 "Faire d'énormes profits sans travailler faire jusqu'à cent mille dollars par jour"]
Enter fullscreen mode Exit fullscreen mode

Complex workflow

Let's put this all together into a full-fledged workflow to build an embeddings index. This workflow will work with both documents and audio files. Documents will have text extracted and summarized. Audio files will be transcribed. Both results will be joined, translated into French and loaded into an Embeddings index.

from txtai.embeddings import Embeddings, Documents
from txtai.workflow import FileTask, WorkflowTask

# Embeddings index
embeddings = Embeddings({"path": "sentence-transformers/paraphrase-multilingual-mpnet-base-v2", "content": True})
documents = Documents()

# List of files to process
files = [
  "txtai/article.pdf",
  "txtai/US_tops_5_million.wav",
  "txtai/Canadas_last_fully.wav",
  "txtai/Beijing_mobilises.wav",
  "txtai/The_National_Park.wav",
  "txtai/Maine_man_wins_1_mil.wav",
  "txtai/Make_huge_profits.wav"
]

data = [(x, element, None) for x, element in enumerate(files)]

# Workflow that extracts text and builds a summary
articles = Workflow([
    FileTask(textractor),
    Task(summary)
])

# Define workflow tasks. Workflows can also be tasks!
tasks = [
    WorkflowTask(articles, r".\.pdf$"),
    FileTask(transcribe, r"\.wav$"),
    Task(lambda x: translate(x, "fr")),
    Task(documents.add, unpack=False)
]

# Workflow that translate text to French
workflow = Workflow(tasks)

# Run workflow and show results to be indexed
for x in workflow(data):
  print(x)

# Build the embeddings index
embeddings.index(documents)

# Cleanup temporary storage
documents.close()
Enter fullscreen mode Exit fullscreen mode
(0, "Txtai, un moteur de recherche alimenté par l'IA construit sur Transformers, permet la recherche basée sur la compréhension du langage naturel (NLU) dans n'importe quelle application. Le champ de traitement du langage naturel (NLP) évolue rapidement avec un certain nombre de nouveaux développements. Le moteur de recherche open-source est open source et disponible sur GitHub.", None)
(1, "Les cas de virus U sont en tête d'un million", None)
(2, "La dernière plate-forme de glace entièrement intacte du Canada s'est soudainement effondrée en formant un berge de glace de taille manhatten", None)
(3, "Bagage mobilise les embarcations d'invasion le long des côtes à mesure que les tensions tiwaniennes s'intensifient", None)
(4, "Le service des parcs nationaux met en garde contre le sacrifice d'amis plus lents dans une attaque nue", None)
(5, "L'homme principal gagne du billet de loterie", None)
(6, "Faire d'énormes profits sans travailler faire jusqu'à cent mille dollars par jour", None)
Enter fullscreen mode Exit fullscreen mode

Query for results in French

# Run a search query and show the result.
embeddings.search("changement climatique", 1)[0]
Enter fullscreen mode Exit fullscreen mode
{'id': '2',
 'score': 0.2982647716999054,
 'text': "La dernière plate-forme de glace entièrement intacte du Canada s'est soudainement effondrée en formant un berge de glace de taille manhatten"}
Enter fullscreen mode Exit fullscreen mode
# Run a search query and show the result.
embeddings.search("traitement du langage naturel", 1)[0]
Enter fullscreen mode Exit fullscreen mode
{'id': '0',
 'score': 0.47031939029693604,
 'text': "Txtai, un moteur de recherche alimenté par l'IA construit sur Transformers, permet la recherche basée sur la compréhension du langage naturel (NLU) dans n'importe quelle application. Le champ de traitement du langage naturel (NLP) évolue rapidement avec un certain nombre de nouveaux développements. Le moteur de recherche open-source est open source et disponible sur GitHub."}
Enter fullscreen mode Exit fullscreen mode

Configuration-driven workflow

Workflows can also be defined with YAML and run as an application. Applications can run standalone or as a FastAPI instance. More information can be found here.

workflow = """
writable: true
embeddings:
  path: sentence-transformers/paraphrase-multilingual-mpnet-base-v2
  content: True

# Summarize text
summary:

# Extract text from documents
textractor:

# Transcribe audio to text
transcription:
  path: facebook/wav2vec2-large-960h

# Translate text between languages
translation:

workflow:
  summarize:
    tasks:
      - action: textractor
        task: file
      - summary
  index:
    tasks:
      - action: summarize
        select: '\\.pdf$'
      - action: transcription
        select: '\\.wav$'
        task: file
      - action: translation
        args: ['fr']
      - action: index
"""

# Create and run the workflow
from txtai.app import Application

# Create and run the workflow
app = Application(workflow)
list(app.workflow("index", files))
Enter fullscreen mode Exit fullscreen mode
["Txtai, un moteur de recherche alimenté par l'IA construit sur Transformers, permet la recherche basée sur la compréhension du langage naturel (NLU) dans n'importe quelle application. Le champ de traitement du langage naturel (NLP) évolue rapidement avec un certain nombre de nouveaux développements. Le moteur de recherche open-source est open source et disponible sur GitHub.",
 "Les cas de virus U sont en tête d'un million",
 "La dernière plate-forme de glace entièrement intacte du Canada s'est soudainement effondrée en formant un berge de glace de taille manhatten",
 "Bagage mobilise les embarcations d'invasion le long des côtes à mesure que les tensions tiwaniennes s'intensifient",
 "Le service des parcs nationaux met en garde contre le sacrifice d'amis plus lents dans une attaque nue",
 "L'homme principal gagne du billet de loterie",
 "Faire d'énormes profits sans travailler faire jusqu'à cent mille dollars par jour"]
Enter fullscreen mode Exit fullscreen mode
# Run a search query and show the result.
app.search("changement climatique", 1)[0]
Enter fullscreen mode Exit fullscreen mode
{'id': '2',
 'score': 0.2982647716999054,
 'text': "La dernière plate-forme de glace entièrement intacte du Canada s'est soudainement effondrée en formant un berge de glace de taille manhatten"}
Enter fullscreen mode Exit fullscreen mode
# Run a search query and show the result.
app.search("traitement du langage naturel", 1)[0
Enter fullscreen mode Exit fullscreen mode
{'id': '0',
 'score': 0.47031939029693604,
 'text': "Txtai, un moteur de recherche alimenté par l'IA construit sur Transformers, permet la recherche basée sur la compréhension du langage naturel (NLU) dans n'importe quelle application. Le champ de traitement du langage naturel (NLP) évolue rapidement avec un certain nombre de nouveaux développements. Le moteur de recherche open-source est open source et disponible sur GitHub."}
Enter fullscreen mode Exit fullscreen mode

Wrapping up

Results are good! We can see the power of workflows and how they can join a series of pipelines together in an efficient manner. Workflows can work with any callable, not just pipelines, workflows transform data from one format to another. Workflows are an exciting and promising development for txtai.

💖 💪 🙅 🚩
davidmezzetti
David Mezzetti

Posted on March 27, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Granting autonomy to agents
ai Granting autonomy to agents

November 25, 2024

💡 What's new in txtai 8.0
ai 💡 What's new in txtai 8.0

November 18, 2024

Generative Audio
ai Generative Audio

October 13, 2024

Speech to Speech RAG
ai Speech to Speech RAG

September 27, 2024