Notification

Python module to implement notification workflow

Nested GPT Function calling

Sending notifications is an important feature for our application. However, because this monitoring process should be initiated by the user via Chat GPT, the condition for sending notifications or stopping the monitoring stream is not constant. Therefore, we can attempt dynamically checking the conditions by using nested GPT Function calling.

In nested GPT FC, the notification process will start an on-going loop for checking the condition for sending notification. The condition itself is not explicitly hard-coded in the code but decided by a sub GPT Agent in the loop. This sub GPT FC process will take in the previous messages which contain the description for the notification task. This sub GPT will also be provided with another tool for sending notifications, which will be called if the condition as described is satisfied in the current loop.

An example scenario is presented as below. Supposing that we have a function (tool) for generating random integers between 1 and 100. We want to be notified every time the function generates a number higher than 50.

import random
from llmcam.core.fc import *
from llmcam.core.fn_to_schema import function_schema

def random_generator():
    """Generate a random number between 1 and 100"""
    return random.randint(1, 100)

tools = [function_schema(random_generator)]

Define tool for sending notifications. In this simplified context, this function only adds the notification message to an existing list named notifications.

notifications = []
def send_notification(msg: str):
    """Send a notification"""
    notifications.append(msg)
    return notifications

Test function for starting the monitoring stream:

import time

def start_notification_stream(
    messages: list  # Previous conversation with the user
):
    """Start a notification stream"""
    subtools = [ tool for tool in tools ]
    subtools.append(function_schema(send_notification))

    for _ in range(5):  # Only loop up to 5 times for the demo
        complete(messages, tools=subtools)
        time.sleep(5)
start_notification_stream(
    messages = form_msgs([
        ('user', 'Can you notify me every time you generate a number higher than 50? Stop after 10 notifications.')
    ])
)
notifications
['Generated number is 61, which is higher than 50.',
 'Generated number is 94, which is higher than 50.',
 'Generated number is 86, which is higher than 50.',
 'Generated number is 59, which is higher than 50.',
 'Generated number is 100, which is higher than 50.',
 'Generated number is 58, which is higher than 50.',
 'Generated number is 88, which is higher than 50.',
 'Generated number is 82, which is higher than 50.',
 'Generated number is 73, which is higher than 50.',
 'Generated number is 84, which is higher than 50.']

Modularized notification workflow with separated StreamThread

In the demo above, this function is not yet compatible with our current GPT FC framework. One approach to modularize this function and incorporate parallel execution is to use Python threading library. We can define a custom Thread class to initiate the monitoring process.


source

StreamThread

 StreamThread (thread_id:int, tools:list, messages:list)

A class to run a notification stream in a separate thread

Type Details
thread_id int The thread ID
tools list List of tools for sub GPT
messages list Previous conversation with the user

Apart from a function to send notifications, we may also need a function to halt the notification stream. This function may heavily depend on the context of usage (e.g., scripts, web services, …). Therefore, we can define a pair of starting and stopping a stream thread, with an example being the default functions that can be easily used in a Python script:


source

default_stream_stopper

 default_stream_stopper ()

Default function to stop the notifications stream


source

default_stream_starter

 default_stream_starter (tools, messages)

Default function to start the notifications stream

Similar to store utilities, the function for starting a notification stream needs to be attached to multiple context-dependent variables (tools, stream starter and stopper, notifications sender, …). Therefore, we need to define the _core function rather than a hard-coded stream function.


source

notification_stream_core

 notification_stream_core (tools:list, messages:list,
                           stream_starter:Optional[Callable]=None,
                           send_notification:Optional[Callable]=None,
                           stream_stopper:Optional[Callable]=None,
                           send_notification_schema:Optional[dict]=None,
                           stream_stopper_schema:Optional[dict]=None)

Core function to start and stop the notifications stream

Type Default Details
tools list Tools to use
messages list Previous conversation with the user
stream_starter Optional None Function to start the stream
send_notification Optional None Function to send the notification
stream_stopper Optional None Function to stop the stream
send_notification_schema Optional None Schema for the send_notification function
stream_stopper_schema Optional None Schema for the stream_stopper function
Returns str

An issue with the current llmcam.core.fn_to_schema is that the type suggestion does not cover complicated nested typings. In this case, the list of messages should be of a specific format to be inputted to OpenAI API:

[
    {
        "role": "user",
        "content": "Can you notify me if ..."
    }
]

Hence, the current work-around this issue is to manually adjust the schema of start_notification_stream function:


source

process_notification_schema

 process_notification_schema (start_notifications_stream:Callable)

Process the notification schema

Type Details
start_notifications_stream Callable Function to start the notifications stream

Simulated GPT workflow

Test integrating with our current GPT framework. This process follows the above demo.

import random
from llmcam.core.fc import *
from llmcam.core.fn_to_schema import function_schema

def random_generator():
    """Generate a random number between 1 and 100"""
    return random.randint(1, 100)

tools = [function_schema(random_generator)]
notifications = []
def send_notification(msg: str):
    """Send a notification"""
    notifications.append(msg)
    return notifications
def start_notification_stream(
    messages: list  # Previous conversation with the user
):
    return notification_stream_core(
        tools, 
        messages,
        stream_starter=default_stream_starter,
        send_notification=send_notification,
        stream_stopper=default_stream_stopper
    )
tools.append(process_notification_schema(start_notification_stream))
messages = form_msgs([
    ('system', 'You are a helpful system administrator. Use the supplied tools to assist the user. \
If asked to monitor and notify, do not resolve in main thread but defer to sub GPT in notifcation stream instead.'),
    ('user', 'Can you notify me every time you generate a number higher than 50? Stop after 10 notifications.'),
])
complete(messages, tools=tools)
print_msgs(messages)
>> System:
You are a helpful system administrator. Use the supplied tools to assist the user. If asked to
monitor and notify, do not resolve in main thread but defer to sub GPT in notifcation stream
instead.
>> User:
Can you notify me every time you generate a number higher than 50? Stop after 10 notifications.
>> Assistant:
Sure, I'll begin generating random numbers and will notify you each time a number higher than 50 is
generated, up to a total of 10 notifications. The process is underway.

Checking the notifications:

for noti in notifications:
    print(noti)
Generated a number higher than 50: 53
Generated a number higher than 50: 63
Generated a number higher than 50: 97
Generated a number higher than 50: 82
Generated a number higher than 50: 65
Generated a number higher than 50: 82
Generated a number higher than 50: 74
Generated a number higher than 50: 98
Generated a number higher than 50: 82
Generated a number higher than 50: 86
len(notifications)
10

Checking the global stream thread:

stream_thread.is_alive()
False