Skip to main content

DBOS Methods & Variables

DBOS provides a number of useful context methods and variables. All are accessed through the syntax DBOS.<method> and can only be used once a DBOS class object has been initialized.

Context Methods

start_workflow

DBOS.start_workflow(
func: Workflow[P, R],
*args: P.args,
**kwargs: P.kwargs,
) -> WorkflowHandle[R]

Start a workflow in the background and return a handle to it. The DBOS.start_workflow method resolves after the handle is durably created; at this point the workflow is guaranteed to run to completion even if the app is interrupted.

Example syntax:

@DBOS.workflow()
def example_workflow(var1: str, var2: str):
DBOS.logger.info("I am a workflow")

# Start example_workflow in the background
handle: WorkflowHandle = DBOS.start_workflow(example_workflow, "var1", "var2")

start_workflow_async

DBOS.start_workflow_async(
func: Workflow[P, Coroutine[Any, Any, R]],
*args: P.args,
**kwargs: P.kwargs,
) -> Coroutine[Any, Any, WorkflowHandleAsync[R]]

Start an asynchronous workflow and return a handle to it. The DBOS.start_workflow_async method resolves after the handle is durably created; at this point the workflow is guaranteed to run to completion even if the app is interrupted. The workflow started with DBOS.start_workflow_async runs in the same event loop as its caller.

Example syntax:

@DBOS.workflow()
async def example_workflow(var1: str, var2: str):
DBOS.logger.info("I am a workflow")

# Start example_workflow
handle: WorkflowHandleAsync = await DBOS.start_workflow_async(example_workflow, "var1", "var2")

wait_first

DBOS.wait_first(
handles: List[WorkflowHandle[Any]],
*,
polling_interval_sec: float = 1.0,
) -> WorkflowHandle[Any]

Wait for any one of the given workflow handles to complete and return the first completed handle. This is useful when you have multiple concurrent workflows and want to process results as they complete.

Parameters:

  • handles: A non-empty list of workflow handles to wait on. Raises ValueError if the list is empty.
  • polling_interval_sec: The interval (in seconds) at which DBOS polls the database. Defaults to 1.0.

See the queue tutorial for an example.

wait_first_async

DBOS.wait_first_async(
handles: List[WorkflowHandleAsync[Any]],
*,
polling_interval_sec: float = 1.0,
) -> Coroutine[Any, Any, WorkflowHandleAsync[Any]]

Async version of wait_first. Wait for any one of the given async workflow handles to complete and return the first completed handle.

send

DBOS.send(
destination_id: str,
message: Any,
topic: Optional[str] = None,
*,
idempotency_key: Optional[str] = None,
serialization_type: Optional[WorkflowSerializationFormat] = WorkflowSerializationFormat.DEFAULT,
) -> None

Send a message to the workflow identified by destination_id. Messages can optionally be associated with a topic. The send function should not be used in coroutine workflows, send_async should be used instead.

Parameters:

  • destination_id: The workflow to which to send the message.
  • message: The message to send. Must be serializable.
  • topic: A topic with which to associate the message. Messages are enqueued per-topic on the receiver.
  • idempotency_key: If an idempotency key is set, the message will only be sent once no matter how many times DBOS.send is called with this key.
  • serialization_type: The serialization format to use for this message. Defaults to WorkflowSerializationFormat.DEFAULT.

send_async

DBOS.send_async(
destination_id: str,
message: Any,
topic: Optional[str] = None,
*,
idempotency_key: Optional[str] = None,
serialization_type: Optional[WorkflowSerializationFormat] = WorkflowSerializationFormat.DEFAULT,
) -> Coroutine[Any, Any, None]

Coroutine version of send

recv

DBOS.recv(
topic: Optional[str] = None,
timeout_seconds: float = 60,
) -> Any

Receive and return a message sent to this workflow. Can only be called from within a workflow. Messages are dequeued first-in, first-out from a queue associated with the topic. Calls to recv wait for the next message in the queue, returning None if the wait times out. If no topic is specified, recv can only access messages sent without a topic. The recv function should not be used in coroutine workflows, recv_async should be used instead.

Parameters:

  • topic: A topic queue on which to wait.
  • timeout_seconds: A timeout in seconds. If the wait times out, return None.

Returns:

  • The first message enqueued on the input topic, or None if the wait times out.

recv_async

DBOS.recv_async(
topic: Optional[str] = None,
timeout_seconds: float = 60,
) -> Coroutine[Any, Any, Any]

Coroutine version of recv

set_event

DBOS.set_event(
key: str,
value: Any,
*,
serialization_type: WorkflowSerializationFormat = WorkflowSerializationFormat.DEFAULT,
) -> None

Create and associate with this workflow an event with key key and value value. If the event already exists, update its value. Can only be called from within a workflow. The set_event function should not be used in coroutine workflows, set_event_async should be used instead.

Parameters:

  • key: The key of the event.
  • value: The value of the event. Must be serializable.
  • serialization_type: The serialization format to use for this event. Defaults to WorkflowSerializationFormat.DEFAULT.

set_event_async

DBOS.set_event_async(
key: str,
value: Any,
*,
serialization_type: WorkflowSerializationFormat = WorkflowSerializationFormat.DEFAULT,
) -> Coroutine[Any, Any, None]

Coroutine version of set_event

get_event

DBOS.get_event(
workflow_id: str,
key: str,
timeout_seconds: float = 60,
) -> Any

Retrieve the latest value of an event published by the workflow identified by workflow_id to the key key. If the event does not yet exist, wait for it to be published, returning None if the wait times out. The get_event function should not be used in coroutine workflows, get_event_async should be used instead.

Parameters:

  • workflow_id: The identifier of the workflow whose events to retrieve.
  • key: The key of the event to retrieve.
  • timeout_seconds: A timeout in seconds. If the wait times out, return None.

Returns:

  • The value of the event published by workflow_id with name key, or None if the wait times out.

get_event_async

DBOS.get_event_async(
workflow_id: str,
key: str,
timeout_seconds: float = 60,
) -> Coroutine[Any, Any, Any]

Coroutine version of get_event

get_all_events

DBOS.get_all_events(
workflow_id: str
) -> Dict[str, Any]

Retrieve the latest values of all events published by workflow_id.

  • workflow_id: The identifier of the workflow whose events to retrieve.

get_all_events_async

DBOS.get_all_events_async(
workflow_id: str
) -> Dict[str, Any]

Coroutine version of get_all_events.

sleep

DBOS.sleep(
seconds: float
) -> None

Sleep for the given number of seconds. May only be called from within a workflow. This sleep is durable—it records its intended wake-up time in the database so if it is interrupted and recovers, it still wakes up at the intended time. The sleep function should not be used in coroutine workflows, sleep_async should be used instead.

Parameters:

  • seconds: The number of seconds to sleep.

sleep_async

DBOS.sleep_async(
seconds: float
) -> Coroutine[Any, Any, None]

Coroutine version of sleep

asyncio_wait

DBOS.asyncio_wait(
fs: List[Awaitable[Any]],
*,
timeout: Optional[float] = None,
return_when: str = asyncio.ALL_COMPLETED,
) -> tuple[set[asyncio.Task[Any]], set[asyncio.Task[Any]]]

A durable wrapper around asyncio.wait with the same interface and semantics. It checkpoints which futures are done vs. pending so the result is deterministic during workflow recovery.

When called outside a workflow, it falls back to regular asyncio.wait.

Parameters:

  • fs: An list of awaitables (coroutines, tasks, or futures) to wait on.
  • timeout: Maximum number of seconds to wait. If None (the default), wait until the return_when condition is met.
  • return_when: Controls when the function returns. Must be one of the following constants:
    • asyncio.FIRST_COMPLETED: The function will return when any future finishes or is cancelled.
    • asyncio.FIRST_EXCEPTION: The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED.
    • asyncio.ALL_COMPLETED: The function will return when all futures finish or are cancelled. This is the default.

Returns: Two sets of Tasks/Futures: (done, pending). The done set contains futures that completed (finished or were cancelled) before the function returned. The pending set contains futures that are still running.

See the asyncio.wait documentation for full details.

run_step

DBOS.run_step(
dbos_step_options: Optional[StepOptions],
func: Callable[P, R],
*args: P.args,
**kwargs: P.kwargs,
) -> R:

Runs the provided func function (or lambda) as a checkpointed DBOS step. args and kwargs will be passed to func.

The StepOptions object has the following fields. All fields are optional.

class StepOptions(TypedDict, total=False):
"""
Configuration options for steps.

Attributes:
name:
Optional name for the step.
If not provided, the function's name will be used.

retries_allowed:
Whether the step should be retried on failure.

interval_seconds:
Initial delay (in seconds) between retry attempts.

max_attempts:
Maximum number of attempts before the step is
considered failed.

backoff_rate:
Multiplier applied to `interval_seconds` after
each failed attempt (e.g. 2.0 = exponential backoff).
"""

name: Optional[str]
retries_allowed: bool
interval_seconds: float
max_attempts: int
backoff_rate: float

run_step_async

Version of run_step to be called from async contexts.

get_result

DBOS.get_result(
workflow_id: str,
) -> Optional[Any]

Wait for the workflow identified by workflow_id to complete, and return its result. This is similar to calling get_result on a WorkflowHandle, but is a single step that does not require a handle.

Parameters:

  • workflow_id: The identifier of the workflow whose result to return.

Returns:

  • The result of the workflow, or throws an exception if the workflow threw an exception.

get_result_async

DBOS.get_result_async(
workflow_id: str,
) -> Coroutine[Any, Any, Optional[Any]]

Coroutine version of get_result.

get_workflow_status

DBOS.get_workflow_status(
workflow_id: str,
) -> Optional[WorkflowStatus]

Retrieve the status of a workflow by its ID. Returns None if no workflow with the given ID exists.

Parameters:

  • workflow_id: The identifier of the workflow whose status to retrieve.

Returns:

get_workflow_status_async

DBOS.get_workflow_status_async(
workflow_id: str,
) -> Coroutine[Any, Any, Optional[WorkflowStatus]]

Coroutine version of get_workflow_status.

retrieve_workflow

DBOS.retrieve_workflow(
workflow_id: str,
existing_workflow: bool = True,
) -> WorkflowHandle[R]

Retrieve the handle of a workflow with identity workflow_id.

Parameters:

  • workflow_id: The identifier of the workflow whose handle to retrieve.
  • existing_workflow: Whether to throw an exception if the workflow does not yet exist, or to wait for its creation. If set to False and the workflow does not exist, will wait for the workflow to be created, then return its handle.

Returns:

  • The handle of the workflow whose ID is workflow_id.

retrieve_workflow_async

DBOS.retrieve_workflow(
workflow_id: str,
existing_workflow: bool = True,
) -> WorkflowHandleAsync[R]

Coroutine version of DBOS.retrieve_workflow, retrieving an async workflow handle.

write_stream

DBOS.write_stream(
key: str,
value: Any,
*,
serialization_type: WorkflowSerializationFormat = WorkflowSerializationFormat.DEFAULT,
) -> None

Write a value to a stream. Can only be called from within a workflow or its steps. The write_stream function should not be used in coroutine workflows, write_stream_async should be used instead.

Parameters:

  • key: The stream key / name within the workflow
  • value: A serializable value to write to the stream
  • serialization_type: The serialization format to use for this value. Defaults to WorkflowSerializationFormat.DEFAULT.

write_stream_async

DBOS.write_stream_async(
key: str,
value: Any,
*,
serialization_type: WorkflowSerializationFormat = WorkflowSerializationFormat.DEFAULT,
) -> Coroutine[Any, Any, None]

Coroutine version of write_stream

close_stream

DBOS.close_stream(
key: str
) -> None

Close a stream identified by a key. After this is called, no more values can be written to the stream. Can only be called from within a workflow. The close_stream function should not be used in coroutine workflows, close_stream_async should be used instead.

Parameters:

  • key: The stream key / name within the workflow

close_stream_async

DBOS.close_stream_async(
key: str
) -> Coroutine[Any, Any, None]

Coroutine version of close_stream

read_stream

DBOS.read_stream(
workflow_id: str,
key: str
) -> Generator[Any, Any, None]

Read values from a stream as a generator.

This function reads values from a stream identified by the workflow_id and key, yielding each value in order until the stream is closed or the workflow terminates.

Parameters:

  • workflow_id: The workflow instance ID that owns the stream
  • key: The stream key / name within the workflow

Yields:

  • Each value in the stream until the stream is closed

Example syntax:

for value in DBOS.read_stream(workflow_id, example_key):
print(f"Received: {value}")

read_stream_async

DBOS.read_stream_async(
workflow_id: str,
key: str
) -> AsyncGenerator[Any, None]

Read values from a stream as an async generator.

This function reads values from a stream identified by the workflow_id and key, yielding each value in order until the stream is closed or the workflow terminates.

Parameters:

  • workflow_id: The workflow instance ID that owns the stream
  • key: The stream key / name within the workflow

Example syntax:

async for value in DBOS.read_stream_async(workflow_id, example_key):
print(f"Received: {value}")

Yields:

  • Each value in the stream until the stream is closed

patch

DBOS.patch(
patch_name: str
) -> bool

Insert a patch marker at the current point in workflow history, returning True if it was successfully inserted and False if there is already a checkpoint present at this point in history. Used to safely upgrade workflow code, see the patching tutorial for more detail.

Parameters:

  • patch_name: The name to give the patch marker that will be inserted into workflow history.

patch_async

DBOS.patch_async(
patch_name: str
) -> Coroutine[Any, Any, bool]

Coroutine version of DBOS.patch().

deprecate_patch

DBOS.deprecate_patch(
patch_name: str
) -> bool

Safely bypass a patch marker at the current point in workflow history if present. Always returns True. Used to safely deprecate patches, see the patching tutorial for more detail.

Parameters:

  • patch_name: The name of the patch marker to be bypassed.

deprecate_patch_async

DBOS.deprecate_patch_async(
patch_name: str
) -> Coroutine[Any, Any, bool]

Coroutine version of DBOS.deprecate_patch()

Workflow Management Methods

list_workflows

def list_workflows(
*,
workflow_ids: Optional[List[str]] = None,
status: Optional[Union[str, List[str]]] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
name: Optional[Union[str, List[str]]] = None,
app_version: Optional[Union[str, List[str]]] = None,
forked_from: Optional[Union[str, List[str]]] = None,
parent_workflow_id: Optional[Union[str, List[str]]] = None,
user: Optional[Union[str, List[str]]] = None,
queue_name: Optional[Union[str, List[str]]] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
sort_desc: bool = False,
workflow_id_prefix: Optional[Union[str, List[str]]] = None,
load_input: bool = True,
load_output: bool = True,
executor_id: Optional[Union[str, List[str]]] = None,
queues_only: bool = False,
has_parent: Optional[bool] = None,
has_parent: Optional[bool] = None,
) -> List[WorkflowStatus]:

Retrieve a list of WorkflowStatus of all workflows matching specified criteria.

Parameters:

  • workflow_ids: Retrieve workflows with these IDs.
  • status: Retrieve workflows with this status (or one of these statuses) (Must be ENQUEUED, DELAYED, PENDING, SUCCESS, ERROR, CANCELLED, or MAX_RECOVERY_ATTEMPTS_EXCEEDED)
  • start_time: Retrieve workflows started after this (RFC 3339-compliant) timestamp.
  • end_time: Retrieve workflows started before this (RFC 3339-compliant) timestamp.
  • name: Retrieve workflows with this fully-qualified name (or one of these names).
  • app_version: Retrieve workflows tagged with this application version (or one of these versions).
  • forked_from: Retrieve workflows forked from this workflow ID (or one of these IDs).
  • parent_workflow_id: Retrieve workflows that were started as children of this workflow (or one of these workflows).
  • user: Retrieve workflows run by this authenticated user (or one of these users).
  • queue_name: Retrieve workflows that were enqueued on this queue (or one of these queues).
  • limit: Retrieve up to this many workflows.
  • offset: Skip this many workflows from the results returned (for pagination).
  • sort_desc: Whether to sort the results in descending (True) or ascending (False) order by workflow start time.
  • workflow_id_prefix: Retrieve workflows whose IDs start with the specified string (or one of these strings).
  • load_input: Whether to load and deserialize workflow inputs. Set to False to improve performance when inputs are not needed.
  • load_output: Whether to load and deserialize workflow outputs. Set to False to improve performance when outputs are not needed.
  • executor_id: Retrieve workflows with this executor ID (or one of these IDs).
  • queues_only: If True, only retrieve workflows that are currently queued (status ENQUEUED or PENDING and queue_name not null). Equivalent to using list_queued_workflows.
  • was_forked_from: If True, only retrieve workflows that have been forked from. If False, only retrieve workflows that have not been forked from.
  • has_parent: If True, only retrieve workflows that have a parent workflow. If False, only retrieve workflows without a parent.

list_workflows_async

Coroutine version of list_workflows.

list_queued_workflows

def list_queued_workflows(
*,
workflow_ids: Optional[List[str]] = None,
status: Optional[Union[str, List[str]]] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
name: Optional[Union[str, List[str]]] = None,
app_version: Optional[Union[str, List[str]]] = None,
forked_from: Optional[Union[str, List[str]]] = None,
parent_workflow_id: Optional[Union[str, List[str]]] = None,
user: Optional[Union[str, List[str]]] = None,
queue_name: Optional[Union[str, List[str]]] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
sort_desc: bool = False,
workflow_id_prefix: Optional[Union[str, List[str]]] = None,
load_input: bool = True,
load_output: bool = True,
executor_id: Optional[Union[str, List[str]]] = None,
has_parent: Optional[bool] = None,
has_parent: Optional[bool] = None,
) -> List[WorkflowStatus]:

Retrieve a list of WorkflowStatus of all queued workflows (status ENQUEUED or PENDING and queue_name not null) matching specified criteria.

Parameters:

  • workflow_ids: Retrieve workflows with these IDs.
  • status: Retrieve workflows with this status (or one of these statuses) (Must be ENQUEUED or PENDING)
  • start_time: Retrieve workflows enqueued after this (RFC 3339-compliant) timestamp.
  • end_time: Retrieve workflows enqueued before this (RFC 3339-compliant) timestamp.
  • name: Retrieve workflows with this fully-qualified name (or one of these names).
  • app_version: Retrieve workflows tagged with this application version (or one of these versions).
  • forked_from: Retrieve workflows forked from this workflow ID (or one of these IDs).
  • parent_workflow_id: Retrieve workflows that were started as children of this workflow (or one of these workflows).
  • user: Retrieve workflows run by this authenticated user (or one of these users).
  • queue_name: Retrieve workflows running on this queue (or one of these queues).
  • limit: Retrieve up to this many workflows.
  • offset: Skip this many workflows from the results returned (for pagination).
  • sort_desc: Whether to sort the results in descending (True) or ascending (False) order by workflow start time.
  • workflow_id_prefix: Retrieve workflows whose IDs start with the specified string (or one of these strings).
  • load_input: Whether to load and deserialize workflow inputs. Set to False to improve performance when inputs are not needed.
  • load_output: Whether to load and deserialize workflow outputs. Set to False to improve performance when outputs are not needed.
  • executor_id: Retrieve workflows with this executor ID (or one of these IDs).
  • has_parent: If True, only retrieve workflows that have a parent workflow. If False, only retrieve workflows without a parent.

list_queued_workflows_async

Coroutine version of list_queued_workflows.

list_workflow_steps

def list_workflow_steps(
workflow_id: str,
*,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> List[StepInfo]

Retrieve the steps of a workflow. Steps are ordered by function_id. Use limit and offset to paginate results. This is a list of StepInfo objects, with the following structure:

class StepInfo(TypedDict):
# The unique ID of the step in the workflow. One-indexed.
function_id: int
# The (fully qualified) name of the step
function_name: str
# The step's output, if any
output: Optional[Any]
# The error the step threw, if any
error: Optional[Exception]
# If the step starts or retrieves the result of a workflow, its ID
child_workflow_id: Optional[str]
# The Unix epoch timestamp at which this step started
started_at_epoch_ms: Optional[int]
# The Unix epoch timestamp at which this step completed
completed_at_epoch_ms: Optional[int]

list_workflow_steps_async

Coroutine version of list_workflow_steps.

set_workflow_delay

DBOS.set_workflow_delay(
workflow_id: str,
*,
delay_seconds: Optional[float] = None,
delay_until_epoch_ms: Optional[int] = None,
) -> None

Set or update the delay on a workflow. Only affects workflows with DELAYED status. Provide exactly one of delay_seconds (relative) or delay_until_epoch_ms (absolute).

Parameters:

  • workflow_id: The ID of the workflow whose delay to set.
  • delay_seconds: Delay the workflow by this many seconds from now. Must be non-negative.
  • delay_until_epoch_ms: Delay the workflow until this absolute time, specified as a Unix epoch timestamp in milliseconds. Must be non-negative.

set_workflow_delay_async

Coroutine version of set_workflow_delay.

cancel_workflow

DBOS.cancel_workflow(
workflow_id: str,
) -> None

Cancel a workflow. This sets is status to CANCELLED, removes it from its queue (if it is enqueued) and preempts its execution (interrupting it at the beginning of its next step)

cancel_workflow_async

Coroutine version of cancel_workflow.

cancel_workflows

DBOS.cancel_workflows(
workflow_ids: List[str],
) -> None

Cancel multiple workflows. Behaves like cancel_workflow but operates on a list of workflow IDs.

cancel_workflows_async

Coroutine version of cancel_workflows.

resume_workflow

DBOS.resume_workflow(
workflow_id: str,
*,
queue_name: Optional[str] = None,
) -> WorkflowHandle[R]

Resume a workflow. This immediately starts it from its last completed step. You can use this to resume workflows that are cancelled or have exceeded their maximum recovery attempts. You can also use this to start an enqueued workflow immediately, bypassing its queue.

If queue_name is provided, the resumed workflow is enqueued on the specified queue instead of starting immediately.

resume_workflow_async

Coroutine version of resume_workflow.

resume_workflows

DBOS.resume_workflows(
workflow_ids: List[str],
*,
queue_name: Optional[str] = None,
) -> List[WorkflowHandle[Any]]

Resume multiple workflows. Behaves like resume_workflow but operates on a list of workflow IDs and returns a list of handles.

resume_workflows_async

Coroutine version of resume_workflows. Returns List[WorkflowHandle