Pipelines

Pipeline Overview

The concept of a pipeline is well-cemented into data processing, preparation and transformation. Within Kodexa, our goal is to build small pipelines that can ultimately be brought together to solve larger and more complex issues.

When we define a pipeline, we follow a very simple model:

┌─────────────────────┐
│ │
│ Connector │
│ │
└─────────────────────┘
┌────────────▼────────┐
│ ┌───────────────────┴─┐
│ │ ┌───────────────────┴─┐
│ │ │ │
└─┤ │ Step │
└─┤ │
└────────┬────────────┘
┌─────────────────────┐
│ │
│ Sink │
│ │
└─────────────────────┘

We provide a source, one or more steps steps, and then sink.

Since we have simple pipelines, we would want to be able to consider building them in two ways:

Code

We should be able to simply define a pipeline by wiring together a set of Python classes.

This should allow us to both implement new steps, sources and sinks and re-use “out of the box” ones.

An example of how we might want to be able to build a pipeline is as follows:

file_source = FileSource("/tmp/my.pdf")
pipeline = Pipeline(file_source)
pipeline.add_step(PdfMinerParserStep())
pipeline.add_step(TextLayoutStep())
pipeline.sink(HtmlRender())
pipeline.run()

Function Steps

We also support the concept of function steps, this is very useful when you want to work with a document but you don't want to go to all the trouble of creating a Step class to handle it, for example:

document_store = JsonDocumentStore()
document_store.add(create_document())
new_document_store = JsonDocumentStore()
def my_function(doc):
doc.metadata.cheese = "fishstick"
return doc
assert new_document_store.count() == 0
pipeline = Pipeline(document_store)
pipeline.add_step(my_function)
pipeline.set_sink(new_document_store)
stats = pipeline.run()
assert stats['documents'] == 1
assert new_document_store.count() == 1
assert new_document_store.get_document(0).metadata.cheese == 'fishstick'

Pipeline Context

Pipeline context is created when you create a pipeline and it provides a way to access information about the pipeline that is running. It can be made available to steps/functions so they can interact with it. It also provides access to document/data stores that have been added to the pipeline.

When a pipeline is running it also has a context, this provides access to both information about the pipeline (such as the transaction_id) and also access to statistics, and stores.

Not all steps need access to the context, however any step (class or function) can get the context by simply including a second parameter on the process method (or the function if it is just a function step).

For example:

document_store = JsonDocumentStore()
document_store.add(create_document())
new_document_store = JsonDocumentStore()
def my_function(doc, context):
doc.metadata.cheese = context.transaction_id
return doc
assert new_document_store.count() == 0
pipeline = Pipeline(document_store)
pipeline.add_step(my_function)
pipeline.set_sink(new_document_store)
stats = pipeline.run()