Tutorial for Generative AI

Deploy a generative AI model

To register a model hosted by a third-party provider that is compatible with OpenAI’s API or SDK, use the following syntax:

PROVIDER_APIKEY = 'your-api-key'
model_url = (f'openai+https://{PROVIDER_APIKEY}@openrouter.ai/api/v1'
              ';model=google/gemini-2.0-flash-lite-preview-02-05:free')
om.models.put(model_url, 'llm')
=>
<Metadata: Metadata(name=llm,bucket=omegaml,prefix=models/,kind=genai.text,created=2025-03-21 19:25:29.325000)>

This model is now available for completions:

model.complete('hello, who are you?')
=>
{'role': 'assistant',
 'content': 'Hello! I am a large language model, trained by Google.',
 'conversation_id': 'e99121a1050a463abaf2c913e99ff5ba'}

We can also serve the model for access by thid-party applications. This will start a REST API server that can be accessed by third-party applications.

$ om runtime serve
$ !curl -X PUT http://localhost:8000/api/v1/model/llm/complete -H 'Content-Type: application/json' -d '{ "prompt": "hello"}'
{"model": "llm", "result": {"role": "assistant", "content": "Hello there! How can I help you today? \ud83d\ude0a",
 "conversation_id": "60056750ec39433f9533e7cbf60c65cc"}, "resource_uri": "llm"}

Document storage

In support of Retrieval Augmented Generation (RAG), omega-ml provides a built-in document storage, using document embeddings and PostgreSQL as the vector DB (with the pgvector extension). To use this we first need an embedding model.

apikey = 'apikey'
om.models.put(f'openai+https://{apikey}@api.jina.ai/v1/;model=jina-embeddings-v3',  'jina', replace=True)
=>
<Metadata: Metadata(name=jina,bucket=omegaml,prefix=models/,kind=genai.text,created=2025-04-03 14:33:09.643173)>

To store actual documents, we can create a document store:

om.datasets.put('pgvector://postgres:test@localhost:5432/postgres', 'documents',
                embedding_model='jina', model_store=om.models, replace=True)
=>
<Metadata: Metadata(name=documents,bucket=omegaml,prefix=data/,kind=pgvector.conx,created=2025-04-03 14:34:23.606619)>

Now we can insert documents into the store. The documents are automatically chunked and embeddings created using the embedding model of the store.

for fn in Path('/path/to/documents').glob('*.pdf'):
    om.datasets.put(fn, 'documents', model_store=om.models)

Once the documents are stored, we can query the document store. The results are returned as a list of documents, sorted by relevance. The first document has the highest relevance score.

results = om.datasets.get('documents', query='hello world', top=3)
=>
[Document(id=1, text='Hello world! This is a test document.', score=0.9),
 Document(id=2, text='Another test document.', score=0.8),
 Document(id=3, text='Yet another test document.', score=0.7)]

Building a RAG pipeline

To build a RAG pipeline, we attach a document store to the model:

om.models.put(f'openai+https://{apikey}@api.jina.ai/v1/;model=jina-embeddings-v3',  'jina',
              documents='documents', replace=True)

When we ask for a completion, we can add the context in the prompt as {context}. The context is automatically retrieved from the document store, using the prompt as the query. The top document is used as the context.

model = om.models.get('llm', data_store=om.datasets)
model = model.complete('what is the sum of the invoice? Just say SUM=<sum>. context: {context}')
{'role': 'assistant',
 'content': 'SUM=15.00',
 'conversation_id': '024f8b43dcb74211a836a7042d067c8f'}

Adding tools

Tools are functions that can be called by a model. They are used to extend the capabilities of the model beyond text generation. For example, we can add a tool that calculates the sum of a list of numbers.

def sum_numbers(numbers):
    return sum((int(v) for v in numbers.split(','))) # the model passes as string

om.models.put(sum_numbers, 'tools/sum_numbers')

om.models.put(f'openai+https://{apikey}@openrouter.ai/api/v1;model=google/gemini-2.0-flash-exp:free',
              'llm', documents='documents', tools=['sum_numbers'], replace=True)

model = om.models.get('llm', data_store=om.datasets)
model.complete('What is the sum of 7, 9, 24?')
{'role': 'assistant',
 'content': 'The sum of 7, 9, and 24 is 40.\n',
 'conversation_id': '33738231f39047dcb886500143cebf8a',
 'intermediate_results': {'tool_calls': [{'id': 'tool_0_sum_numbers',
    'function': {'arguments': '{"numbers":"7,9,24"}', 'name': 'sum_numbers'},
    'type': 'function',
    'index': 0}],
  'tool_prompts': [{'role': 'tool',
    'tool_call_id': 'tool_0_sum_numbers',
    'content': '40'}],
  'tool_results': [{'role': 'assistant',
    'content': 40,
    'conversation_id': '33738231f39047dcb886500143cebf8a'}]}}

Adding custom pipeline actions

In omega-ml, a generative model is in effect a pipeline of multiple steps. For each step, the model can call custom code to adjust the processing of each step. For example, we can add custom code that checks the user’s prompt before processing, or add guardrails to check the output of the model, and if necessary, modify the output or return any other response.

A pipeline is a specific type of virtual function:

from omegaml.backends.genai.models import virtual_genai

@virtual_genai
def pipeline(*args, method=None, **kwargs):
    print(f"calling method={method}")
    print(f"   args={args}, kwargs={kwargs}")

om.models.put(pipeline, 'pipeline')

Add the pipeline to the model

model_url = f'openai+https://{APIKEY}@openrouter.ai/api/v1;model=google/gemini-2.0-flash-exp:free'
meta = om.models.put(model_url, 'llm', pipeline='pipeline')

When we call the model, it will call the pipeline function for each step. For every step we can add custom code to process the input and output of the model.

model = om.models.get('llm', data_store=om.datasets)
model.complete('hello world')
=>
calling method=template
    args=(), kwargs={'data': None, 'meta': None, 'store': None, 'tracking': None, 'prompt_message': {'role': 'user', 'content': 'hello', 'conversation_id': 'aae49e95404f4a4f99245e9237256017'}, 'messages': [{'role': 'system', 'content': 'You are a helpful assistant.', 'conversation_id': 'aae49e95404f4a4f99245e9237256017'}], 'template': 'You are a helpful assistant.', 'conversation_id': 'aae49e95404f4a4f99245e9237256017'}
calling method=prepare
    args=(), kwargs={'data': None, 'meta': None, 'store': None, 'tracking': None, 'prompt_message': {'role': 'user', 'content': 'hello', 'conversation_id': 'aae49e95404f4a4f99245e9237256017'}, 'messages': [{'role': 'system', 'content': 'You are a helpful assistant.', 'conversation_id': 'aae49e95404f4a4f99245e9237256017'}], 'template': 'You are a helpful assistant.', 'conversation_id': 'aae49e95404f4a4f99245e9237256017'}
calling method=process
    args=(), kwargs={'data': None, 'meta': None, 'store': None, 'tracking': None, 'response_message': {'role': 'assistant', 'content': 'Hello! How can I help you today? 😊', 'conversation_id': 'aae49e95404f4a4f99245e9237256017'}, 'prompt_message': {'role': 'user', 'content': 'hello', 'conversation_id': 'aae49e95404f4a4f99245e9237256017'}, 'messages': [{'role': 'system', 'content': 'You are a helpful assistant.', 'conversation_id': 'aae49e95404f4a4f99245e9237256017'}, {'role': 'user', 'content': 'hello', 'conversation_id': 'aae49e95404f4a4f99245e9237256017'}], 'template': 'You are a helpful assistant.', 'conversation_id': 'aae49e95404f4a4f99245e9237256017'}
{'role': 'assistant',
 'content': 'Hello! How can I help you today? 😊',
 'conversation_id': 'aae49e95404f4a4f99245e9237256017'}

The steps of the pipeline are:

  • template - the template is used to generate the prompt for the model. The template is

    generated from the model’s metadata. This should return the template to use.

  • prepare - the prepare step is used to prepare the input messages for the model. The input

    messages are generated from the template and the prompt message. This should return the list of messages to use.

  • process - the process step is used to process the output of the model.

    This should return the final output of the model.

  • toolcall - the toolcall step is used to process the output of a tool. The output can be

    modified by the pipeline. This should return the messages to be sent back to the model. Semantically this is the same as the prepare step.

  • toolresult - the toolresult step is used to process the response of the model to a tools’ result.

    This should return the output of the model. Semantically this is the same as the process step.

Note

There are currently no steps for the RAG part of the pipeline. However, you can use the prepare message to process the input messages and modify the context, or add a custom context.

The function signature is the same for all steps:

def pipeline(*args, method=None, template=None, prompt_message=None,
             messages=None, response_message=None, conversation_id=None, **kwargs):
    """
    Args:
        *args: positional arguments
        method (str): the name of the pipeline step
        template (str): the template to use
        prompt_message (str): the prompt message
        messages (list): the list of messages, this is in the format of the model provider,
          e.g. [{'role': 'user', 'content': 'hello world'}, ...]
        response_message (dict): the response message, this is the format of the model provider,
          e.g. {'role': 'assistant', 'content': 'hello world'}
        conversation_id (str): the conversation id
        **kwargs: keyword arguments

    Returns:
        * None: to continue the pipeline without changes
        * for method=template: the template string to use
        * for method=prepare: the list of messages to use, each message must of
            format {'role': 'user', 'content': 'hello world'}
        * for method=process: the response message to use as a dict of format
            {'role': 'assistant', 'content': 'hello world'}
        * for method=toolcall: the list of messages to use, each message must of
            format {'role': 'tool', 'content': 'hello world'}
        * for method=toolresult: the response message to use as a dict of format
            {'role': 'assistant', 'content': 'hello world'}
    """

Serving a model

To serve a model, we start the integrated REST API server and call the model using curl.

$ om runtime serve
$ curl -X PUT http://localhost:8000/api/v1/model/llm/complete -H 'Content-Type: application/json' -d '{ "prompt": "hello again!"}'

Tracking model interactions

To track a model’s inputs and outputs, we can use the track() method. This will automatically capture all calls to the model via omega-ml’s runtime or via the REST API.

exp = om.runtime.experiment('myexp')
exp.track('llm')

This automatically tracks all input and output of the model. By using omega-ml’s distributed runtime architecture, this works the same locally as with a scaled-up distributed environment like Kubernetes.

To access the tracking data, we can

exp = om.runtime.experiment('myexp')
exp.data()
=>
<DataFrame>

Using a third-party LLM framework

In some cases the omega-ml RAG pipeline or document storage may not be sufficient for your needs. In this case you can use any third-party framework, such as LangChain, Haystack or LlamaIndex.

For this purpose, implement a custom model as a virtualobj:

# define your custom pipeline
@virtual_genai
def mypipeline(*args, method=None, **kwargs):
    print(f"{method} args={args} kwargs={kwargs}") # trace message for testing
    import langchain
    results = ...
    return results

# store the custom pipeline
om.models.put(mypipeline, 'mypipeline')
model = om.models.get('mypipeline')

This model can now be called like any other generative model. It supports the chat(), complete() and embed() methods. In this case you should provide the code of the pipeline in full. Please refer to the documentation of the respective framework for details.

The function shall return the result as the final message to be sent back to the client. The result should be in the same response format of your model provider, typically in OpenAI format.

model.chat("hello world")
model.complete("hello world")
model.embed("hello world")
=>
complete args=('foo',) kwargs={}
chat args=('foo',) kwargs={}
embed args=('foo',) kwargs={}