Skip to content

Python SDK Reference

The triggerware Python SDK is available on PyPI:

pip install triggerware

The TriggerwareClient is your entry point. Connect over TCP to a local instance.

from triggerware import TriggerwareClient
client = await TriggerwareClient.connect_tcp('localhost', 5221)

After connecting, you can configure defaults on the client that apply to all subsequent queries:

PropertyTypeDescriptionDefault
default_fetch_sizeint | NoneDefault row limit per batch fetchNone (no limit)
default_timeoutfloat | NoneDefault timeout in seconds for query operationsNone (no limit)
default_fol_schemastr | NoneDefault schema for FOL queriesNone
default_sql_schemastr | NoneDefault schema for SQL queriesNone

The simplest way to query is execute_query, which runs a query and returns an async-iterable ResultSet.

results = await client.execute_query("""
SELECT name, email FROM salesforce_contacts
WHERE region = 'US'
""")
async for row in results:
print(row)

By default, query strings are interpreted as SQL. To use first-order-logic (FOL) or a different schema, pass a query object:

from triggerware import SqlQuery, FolQuery
# Explicit SQL with a custom schema
results = await client.execute_query(
SqlQuery("SELECT * FROM users")
)
# FOL query
results = await client.execute_query(
FolQuery("((x) s.t. (users x))")
)

You can limit rows returned or set a timeout on any query execution by passing a QueryRestriction:

from triggerware import QueryRestriction
results = await client.execute_query(
"SELECT * FROM large_table",
restriction=QueryRestriction(row_limit=100, timeout=5.0)
)

To check whether a query is valid without executing it:

from triggerware import InvalidQueryException
try:
await client.validate_query("SELECT * FROM nonexistent_table")
except InvalidQueryException as e:
print(f"Invalid: {e.message}")

ResultSet is returned by all query execution methods. It is an async iterator — use async for to consume rows one at a time, or call pull(n) to fetch a specific number of rows.

Method / PropertyDescription
async for row in result_setIterate through all rows. Fetches additional batches from the server automatically.
await result_set.pull(n)Fetch up to n rows. Returns fewer if the result set is exhausted.
result_set.signatureColumn metadata from the server: a list of {"attribute": str, "type": str} dicts.
result_set.exhaustedTrue if all rows have been retrieved.
results = await client.execute_query("SELECT name, age FROM people")
# Check the column signature
print(results.signature)
# [{"attribute": "NAME", "type": "CASESENSITIVE"}, {"attribute": "AGE", "type": "INTEGER"}]
# Fetch exactly 5 rows
batch = await results.pull(5)
# Iterate through the rest
async for row in results:
print(row)

For parameterized queries that you want to execute multiple times with different inputs, use PreparedQuery. This sends the query to the server once for parsing, then executes it repeatedly with different bound values.

from triggerware import PreparedQuery
# Register the prepared query on the server
pq = await PreparedQuery(client, "SELECT factor FROM inflation WHERE year1 = :y1 AND year2 = :y2").register()
# Bind parameters by name
pq.set_parameter(":y1", 1980)
pq.set_parameter(":y2", 1990)
# Execute and iterate
results = await pq.execute()
async for row in results:
print(row)
# Re-bind and execute again
pq.set_parameter(":y1", 2000)
pq.set_parameter(":y2", 2020)
results = await pq.execute()

SQL queries support named parameters (:name) or positional parameters (?, ?1, ?2). The two styles cannot be mixed in the same query.

MethodDescription
await pq.register()Registers the query on the server. Must be called before execute. Returns self.
pq.set_parameter(position, value)Bind a value to a named (str) or positional (int) parameter.
pq.get_parameter(position)Retrieve the currently bound value.
pq.clone()Create a copy with the same bound parameters. Shares the same server-side handle.
await pq.execute(restriction?)Execute with current bindings. Returns a ResultSet.

A PolledQuery re-runs a query on a schedule and reports what changed (rows added and deleted) since the last poll. This is useful for monitoring conditions across data sources without writing your own polling loop.

PolledQuery is abstract — subclass it and implement handle_notification:

from triggerware import PolledQuery
class TicketMonitor(PolledQuery):
def handle_notification(self, added, deleted):
for row in added:
print(f"New match: {row}")
for row in deleted:
print(f"No longer matching: {row}")
monitor = await TicketMonitor(
client,
query="SELECT * FROM open_tickets WHERE age_hours > 48",
schedule=60 # poll every 60 seconds
).register()

The schedule parameter accepts three formats:

Periodic — an integer representing the interval in seconds:

schedule=60 # every 60 seconds

Calendar — a PolledQueryCalendarSchedule for cron-like scheduling:

from triggerware import PolledQueryCalendarSchedule
schedule = PolledQueryCalendarSchedule(
hours="9-17",
minutes="0,30",
weekdays="1-5", # Monday through Friday
timezone="America/New_York"
)
FieldTypeDefaultRange
minutesstr"*"0–59
hoursstr"*"0–23
daysstr"*"1–31
monthsstr"*"1–12
weekdaysstr"*"0–6 (0 = Sunday)
timezonestr"UTC"IANA timezone

Field values accept "*" (all), a single integer, "N-M" (range), or comma-separated values.

Combined — a list of schedules:

schedule = [
PolledQueryCalendarSchedule(hours="9", minutes="0"),
PolledQueryCalendarSchedule(hours="17", minutes="0"),
]

Pass a PolledQueryControlParameters to configure reporting behavior:

from triggerware import PolledQueryControlParameters
controls = PolledQueryControlParameters(
report_unchanged=False, # skip notification when nothing changed
report_initial="with delta", # "none", "with delta", or "without delta"
delay=False # if False, poll immediately on registration
)
MethodDescription
await pq.register()Register and start the polling schedule. Returns self.
await pq.poll_now()Trigger an immediate poll outside the schedule. Result arrives via handle_notification.
await pq.close()Stop polling and release server resources.

Manage connectors on the server to expose external data sources as virtual tables.

# Activate a connector
await client.activate_connector("salesforce", json_data={"api_key": "..."})
# List active connectors
connectors = await client.list_active_connectors()
print(connectors) # ["salesforce", "zendesk"]
# Deactivate a connector
await client.deactivate_connector("salesforce")
MethodDescription
await client.activate_connector(name, json_data?)Load and activate a named connector. json_data is passed to the connector on initialization.
await client.deactivate_connector(name)Deactivate a connector and drop its virtual tables.
await client.list_active_connectors()Returns a list of currently active connector names.

await client.close()
# Check connection state
if client.is_closed:
print("Disconnected")

All Triggerware client exceptions inherit from TriggerwareClientException.

ExceptionRaised when
InvalidQueryExceptionA query is syntactically invalid or references nonexistent tables/columns.
PreparedQueryExceptionA parameter name/position is invalid, or a type mismatch occurs when binding.
PolledQueryExceptionAn invalid schedule or control parameter is provided.
SubscriptionExceptionA subscription operation is invalid (e.g., activating an already-active subscription).