classDbtCloudJob(JobBlock):""" Block that holds the information and methods to interact with a dbt Cloud job. Attributes: dbt_cloud_credentials: The credentials to use to authenticate with dbt Cloud. job_id: The id of the dbt Cloud job. timeout_seconds: The number of seconds to wait for the job to complete. interval_seconds: The number of seconds to wait between polling for job completion. trigger_job_run_options: The options to use when triggering a job run. Examples: Load a configured dbt Cloud job block. ```python from prefect_dbt.cloud import DbtCloudJob dbt_cloud_job = DbtCloudJob.load("BLOCK_NAME") ``` Triggers a dbt Cloud job, waits for completion, and fetches the results. ```python from prefect import flow from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob @flow def dbt_cloud_job_flow(): dbt_cloud_credentials = DbtCloudCredentials.load("dbt-token") dbt_cloud_job = DbtCloudJob.load( dbt_cloud_credentials=dbt_cloud_credentials, job_id=154217 ) dbt_cloud_job_run = dbt_cloud_job.trigger() dbt_cloud_job_run.wait_for_completion() dbt_cloud_job_run.fetch_result() return dbt_cloud_job_run dbt_cloud_job_flow() ``` """_block_type_name="dbt Cloud Job"_logo_url="https://images.ctfassets.net/gm98wzqotmnx/5zE9lxfzBHjw3tnEup4wWL/9a001902ed43a84c6c96d23b24622e19/dbt-bit_tm.png?h=250"# noqa_documentation_url="https://prefecthq.github.io/prefect-dbt/cloud/jobs/#prefect_dbt.cloud.jobs.DbtCloudJob"# noqadbt_cloud_credentials:DbtCloudCredentials=Field(default=...,description="The dbt Cloud credentials to use to authenticate with dbt Cloud.",)# noqa: E501job_id:int=Field(default=...,description="The id of the dbt Cloud job.",title="Job ID")timeout_seconds:int=Field(default=900,description="The number of seconds to wait for the job to complete.",)interval_seconds:int=Field(default=10,description="The number of seconds to wait between polling for job completion.",)trigger_job_run_options:TriggerJobRunOptions=Field(default_factory=TriggerJobRunOptions,description="The options to use when triggering a job run.",)@sync_compatibleasyncdefget_job(self,order_by:Optional[str]=None)->Dict[str,Any]:""" Retrieve information about a dbt Cloud job. Args: order_by: The field to order the results by. Returns: The job data. """try:asyncwithself.dbt_cloud_credentials.get_administrative_client()asclient:response=awaitclient.get_job(job_id=self.job_id,order_by=order_by,)exceptHTTPStatusErrorasex:raiseDbtCloudGetJobFailed(extract_user_message(ex))fromexreturnresponse.json()["data"]@sync_compatibleasyncdeftrigger(self,trigger_job_run_options:Optional[TriggerJobRunOptions]=None)->DbtCloudJobRun:""" Triggers a dbt Cloud job. Returns: A representation of the dbt Cloud job run. """try:trigger_job_run_options=(trigger_job_run_optionsorself.trigger_job_run_options)asyncwithself.dbt_cloud_credentials.get_administrative_client()asclient:response=awaitclient.trigger_job_run(job_id=self.job_id,options=trigger_job_run_options)exceptHTTPStatusErrorasex:raiseDbtCloudJobRunTriggerFailed(extract_user_message(ex))fromexrun_data=response.json()["data"]run_id=run_data.get("id")run=DbtCloudJobRun(dbt_cloud_job=self,run_id=run_id,)self.logger.info(f"dbt Cloud job {self.job_id} run {run_id} successfully triggered. "f"You can view the status of this run at "f"https://{self.dbt_cloud_credentials.domain}/#/accounts/"f"{self.dbt_cloud_credentials.account_id}/projects/"f"{run_data['project_id']}/runs/{run_id}/")returnrun
@sync_compatibleasyncdefget_job(self,order_by:Optional[str]=None)->Dict[str,Any]:""" Retrieve information about a dbt Cloud job. Args: order_by: The field to order the results by. Returns: The job data. """try:asyncwithself.dbt_cloud_credentials.get_administrative_client()asclient:response=awaitclient.get_job(job_id=self.job_id,order_by=order_by,)exceptHTTPStatusErrorasex:raiseDbtCloudGetJobFailed(extract_user_message(ex))fromexreturnresponse.json()["data"]
@sync_compatibleasyncdeftrigger(self,trigger_job_run_options:Optional[TriggerJobRunOptions]=None)->DbtCloudJobRun:""" Triggers a dbt Cloud job. Returns: A representation of the dbt Cloud job run. """try:trigger_job_run_options=(trigger_job_run_optionsorself.trigger_job_run_options)asyncwithself.dbt_cloud_credentials.get_administrative_client()asclient:response=awaitclient.trigger_job_run(job_id=self.job_id,options=trigger_job_run_options)exceptHTTPStatusErrorasex:raiseDbtCloudJobRunTriggerFailed(extract_user_message(ex))fromexrun_data=response.json()["data"]run_id=run_data.get("id")run=DbtCloudJobRun(dbt_cloud_job=self,run_id=run_id,)self.logger.info(f"dbt Cloud job {self.job_id} run {run_id} successfully triggered. "f"You can view the status of this run at "f"https://{self.dbt_cloud_credentials.domain}/#/accounts/"f"{self.dbt_cloud_credentials.account_id}/projects/"f"{run_data['project_id']}/runs/{run_id}/")returnrun
classDbtCloudJobRun(JobRun):# NOT A BLOCK""" Class that holds the information and methods to interact with the resulting run of a dbt Cloud job. """def__init__(self,run_id:int,dbt_cloud_job:"DbtCloudJob"):self.run_id=run_idself._dbt_cloud_job=dbt_cloud_jobself._dbt_cloud_credentials=dbt_cloud_job.dbt_cloud_credentials@propertydef_log_prefix(self):returnf"dbt Cloud job {self._dbt_cloud_job.job_id} run {self.run_id}."asyncdef_wait_until_state(self,in_final_state_fn:Awaitable[Callable],get_state_fn:Awaitable[Callable],log_state_fn:Callable=None,timeout_seconds:int=60,interval_seconds:int=1,):""" Wait until the job run reaches a specific state. Args: in_final_state_fn: An async function that accepts a run state and returns a boolean indicating whether the job run is in a final state. get_state_fn: An async function that returns the current state of the job run. log_state_fn: A callable that accepts a run state and makes it human readable. timeout_seconds: The maximum amount of time, in seconds, to wait for the job run to reach the final state. interval_seconds: The number of seconds to wait between checks of the job run's state. """start_time=time.time()last_state=run_state=Nonewhilenotin_final_state_fn(run_state):run_state=awaitget_state_fn()ifrun_state!=last_state:ifself.loggerisnotNone:self.logger.info("%s has new state: %s",self._log_prefix,log_state_fn(run_state),)last_state=run_stateelapsed_time_seconds=time.time()-start_timeifelapsed_time_seconds>timeout_seconds:raiseDbtCloudJobRunTimedOut(f"Max wait time of {timeout_seconds} ""seconds exceeded while waiting")awaitasyncio.sleep(interval_seconds)@sync_compatibleasyncdefget_run(self)->Dict[str,Any]:""" Makes a request to the dbt Cloud API to get the run data. Returns: The run data. """try:dbt_cloud_credentials=self._dbt_cloud_credentialsasyncwithdbt_cloud_credentials.get_administrative_client()asclient:response=awaitclient.get_run(self.run_id)exceptHTTPStatusErrorasex:raiseDbtCloudGetRunFailed(extract_user_message(ex))fromexrun_data=response.json()["data"]returnrun_data@sync_compatibleasyncdefget_status_code(self)->int:""" Makes a request to the dbt Cloud API to get the run status. Returns: The run status code. """run_data=awaitself.get_run()run_status_code=run_data.get("status")returnrun_status_code@sync_compatibleasyncdefwait_for_completion(self)->None:""" Waits for the job run to reach a terminal state. """awaitself._wait_until_state(in_final_state_fn=DbtCloudJobRunStatus.is_terminal_status_code,get_state_fn=self.get_status_code,log_state_fn=DbtCloudJobRunStatus,timeout_seconds=self._dbt_cloud_job.timeout_seconds,interval_seconds=self._dbt_cloud_job.interval_seconds,)@sync_compatibleasyncdeffetch_result(self,step:Optional[int]=None)->Dict[str,Any]:""" Gets the results from the job run. Since the results may not be ready, use wait_for_completion before calling this method. Args: step: The index of the step in the run to query for artifacts. The first step in the run has the index 1. If the step parameter is omitted, then this method will return the artifacts compiled for the last step in the run. """run_data=awaitself.get_run()run_status=DbtCloudJobRunStatus(run_data.get("status"))ifrun_status==DbtCloudJobRunStatus.SUCCESS:try:asyncwithself._dbt_cloud_credentials.get_administrative_client()asclient:# noqaresponse=awaitclient.list_run_artifacts(run_id=self.run_id,step=step)run_data["artifact_paths"]=response.json()["data"]self.logger.info("%s completed successfully!",self._log_prefix)exceptHTTPStatusErrorasex:raiseDbtCloudListRunArtifactsFailed(extract_user_message(ex))fromexreturnrun_dataelifrun_status==DbtCloudJobRunStatus.CANCELLED:raiseDbtCloudJobRunCancelled(f"{self._log_prefix} was cancelled.")elifrun_status==DbtCloudJobRunStatus.FAILED:raiseDbtCloudJobRunFailed(f"{self._log_prefix} has failed.")else:raiseDbtCloudJobRunIncomplete(f"{self._log_prefix} is still running; ""use wait_for_completion() to wait until results are ready.")@sync_compatibleasyncdefget_run_artifacts(self,path:Literal["manifest.json","catalog.json","run_results.json"],step:Optional[int]=None,)->Union[Dict[str,Any],str]:""" Get an artifact generated for a completed run. Args: path: The relative path to the run artifact. step: The index of the step in the run to query for artifacts. The first step in the run has the index 1. If the step parameter is omitted, then this method will return the artifacts compiled for the last step in the run. Returns: The contents of the requested manifest. Returns a `Dict` if the requested artifact is a JSON file and a `str` otherwise. """try:dbt_cloud_credentials=self._dbt_cloud_credentialsasyncwithdbt_cloud_credentials.get_administrative_client()asclient:response=awaitclient.get_run_artifact(run_id=self.run_id,path=path,step=step)exceptHTTPStatusErrorasex:raiseDbtCloudGetRunArtifactFailed(extract_user_message(ex))fromexifpath.endswith(".json"):artifact_contents=response.json()else:artifact_contents=response.textreturnartifact_contentsdef_select_unsuccessful_commands(self,run_results:List[Dict[str,Any]],command_components:List[str],command:str,exe_command:str,)->List[str]:""" Select nodes that were not successful and rebuild a command. """# note "fail" here instead of "cancelled" because# nodes do not have a cancelled staterun_nodes=" ".join(run_result["unique_id"].split(".")[2]forrun_resultinrun_resultsifrun_result["status"]in("error","skipped","fail"))select_arg=Noneif"-s"incommand_components:select_arg="-s"elif"--select"incommand_components:select_arg="--select"# prevent duplicate --select/-s statementsifselect_argisnotNone:# dbt --fail-fast run, -s, bad_mod --vars '{"env": "prod"}' to:# dbt --fail-fast run -s other_mod bad_mod --vars '{"env": "prod"}'command_start,select_arg,command_end=command.partition(select_arg)modified_command=f"{command_start}{select_arg}{run_nodes}{command_end}"# noqaelse:# dbt --fail-fast, build, --vars '{"env": "prod"}' to:# dbt --fail-fast build --select bad_model --vars '{"env": "prod"}'dbt_global_args,exe_command,exe_args=command.partition(exe_command)modified_command=(f"{dbt_global_args}{exe_command} -s {run_nodes}{exe_args}")returnmodified_commandasyncdef_build_trigger_job_run_options(self,job:Dict[str,Any],run:Dict[str,Any],)->TriggerJobRunOptions:""" Compiles a list of steps (commands) to retry, then either build trigger job run options from scratch if it does not exist, else overrides the existing. """generate_docs=job.get("generate_docs",False)generate_sources=job.get("generate_sources",False)steps_override=[]forrun_stepinrun["run_steps"]:status=run_step["status_humanized"].lower()# Skipping cloning, profile setup, and dbt deps - always the first three# steps in any run, and note, index starts at 1 instead of 0ifrun_step["index"]<=3orstatus=="success":continue# get dbt build from "Invoke dbt with `dbt build`"command=run_step["name"].partition("`")[2].partition("`")[0]# These steps will be re-run regardless if# generate_docs or generate_sources are enabled for a given job# so if we don't skip, it'll run twicefreshness_in_command=("dbt source snapshot-freshness"incommandor"dbt source freshness"incommand)if"dbt docs generate"incommandandgenerate_docs:continueeliffreshness_in_commandandgenerate_sources:continue# find an executable command like `build` or `run`# search in a list so that there aren't false positives, like# `"run" in "dbt run-operation"`, which is True; we actually want# `"run" in ["dbt", "run-operation"]` which is Falsecommand_components=shlex.split(command)forexe_commandinEXE_COMMANDS:ifexe_commandincommand_components:breakelse:exe_command=""is_exe_command=exe_commandinEXE_COMMANDSis_not_success=statusin("error","skipped","cancelled")is_skipped=status=="skipped"if(notis_exe_commandandis_not_success)or(is_exe_commandandis_skipped):# if no matches like `run-operation`, we will be rerunning entirely# or if it's one of the expected commands and is skippedsteps_override.append(command)else:# errors and failures are when we need to inspect to figure# out the point of failuretry:run_artifact=awaitself.get_run_artifacts("run_results.json",run_step["index"])exceptJSONDecodeError:# get the run results scoped to the step which had an error# an error here indicates that either:# 1) the fail-fast flag was set, in which case# the run_results.json file was never created; or# 2) there was a problem on dbt Cloud's side saving# this artifactsteps_override.append(command)else:# we only need to find the individual nodes# for those run commandsrun_results=run_artifact["results"]modified_command=self._select_unsuccessful_commands(run_results=run_results,command_components=command_components,command=command,exe_command=exe_command,)steps_override.append(modified_command)ifself._dbt_cloud_job.trigger_job_run_optionsisNone:trigger_job_run_options_override=TriggerJobRunOptions(steps_override=steps_override)else:trigger_job_run_options_override=(self._dbt_cloud_job.trigger_job_run_options.copy())trigger_job_run_options_override.steps_override=steps_overridereturntrigger_job_run_options_override@sync_compatibleasyncdefretry_failed_steps(self)->"DbtCloudJobRun":# noqa: F821""" Retries steps that did not complete successfully in a run. Returns: A representation of the dbt Cloud job run. """job=awaitself._dbt_cloud_job.get_job()run=awaitself.get_run()trigger_job_run_options_override=awaitself._build_trigger_job_run_options(job=job,run=run)num_steps=len(trigger_job_run_options_override.steps_override)ifnum_steps==0:self.logger.info(f"{self._log_prefix} does not have any steps to retry.")else:self.logger.info(f"{self._log_prefix} has {num_steps} steps to retry.")run=awaitself._dbt_cloud_job.trigger(trigger_job_run_options=trigger_job_run_options_override,)returnrun
Gets the results from the job run. Since the results
may not be ready, use wait_for_completion before calling this method.
Parameters:
Name
Type
Description
Default
step
Optional[int]
The index of the step in the run to query for artifacts. The
first step in the run has the index 1. If the step parameter is
omitted, then this method will return the artifacts compiled
for the last step in the run.
@sync_compatibleasyncdeffetch_result(self,step:Optional[int]=None)->Dict[str,Any]:""" Gets the results from the job run. Since the results may not be ready, use wait_for_completion before calling this method. Args: step: The index of the step in the run to query for artifacts. The first step in the run has the index 1. If the step parameter is omitted, then this method will return the artifacts compiled for the last step in the run. """run_data=awaitself.get_run()run_status=DbtCloudJobRunStatus(run_data.get("status"))ifrun_status==DbtCloudJobRunStatus.SUCCESS:try:asyncwithself._dbt_cloud_credentials.get_administrative_client()asclient:# noqaresponse=awaitclient.list_run_artifacts(run_id=self.run_id,step=step)run_data["artifact_paths"]=response.json()["data"]self.logger.info("%s completed successfully!",self._log_prefix)exceptHTTPStatusErrorasex:raiseDbtCloudListRunArtifactsFailed(extract_user_message(ex))fromexreturnrun_dataelifrun_status==DbtCloudJobRunStatus.CANCELLED:raiseDbtCloudJobRunCancelled(f"{self._log_prefix} was cancelled.")elifrun_status==DbtCloudJobRunStatus.FAILED:raiseDbtCloudJobRunFailed(f"{self._log_prefix} has failed.")else:raiseDbtCloudJobRunIncomplete(f"{self._log_prefix} is still running; ""use wait_for_completion() to wait until results are ready.")
Makes a request to the dbt Cloud API to get the run data.
Returns:
Type
Description
Dict[str, Any]
The run data.
Source code in prefect_dbt/cloud/jobs.py
702703704705706707708709710711712713714715716717
@sync_compatibleasyncdefget_run(self)->Dict[str,Any]:""" Makes a request to the dbt Cloud API to get the run data. Returns: The run data. """try:dbt_cloud_credentials=self._dbt_cloud_credentialsasyncwithdbt_cloud_credentials.get_administrative_client()asclient:response=awaitclient.get_run(self.run_id)exceptHTTPStatusErrorasex:raiseDbtCloudGetRunFailed(extract_user_message(ex))fromexrun_data=response.json()["data"]returnrun_data
The index of the step in the run to query for artifacts. The
first step in the run has the index 1. If the step parameter is
omitted, then this method will return the artifacts compiled
for the last step in the run.
None
Returns:
Type
Description
Union[Dict[str, Any], str]
The contents of the requested manifest. Returns a Dict if the
requested artifact is a JSON file and a str otherwise.
@sync_compatibleasyncdefget_run_artifacts(self,path:Literal["manifest.json","catalog.json","run_results.json"],step:Optional[int]=None,)->Union[Dict[str,Any],str]:""" Get an artifact generated for a completed run. Args: path: The relative path to the run artifact. step: The index of the step in the run to query for artifacts. The first step in the run has the index 1. If the step parameter is omitted, then this method will return the artifacts compiled for the last step in the run. Returns: The contents of the requested manifest. Returns a `Dict` if the requested artifact is a JSON file and a `str` otherwise. """try:dbt_cloud_credentials=self._dbt_cloud_credentialsasyncwithdbt_cloud_credentials.get_administrative_client()asclient:response=awaitclient.get_run_artifact(run_id=self.run_id,path=path,step=step)exceptHTTPStatusErrorasex:raiseDbtCloudGetRunArtifactFailed(extract_user_message(ex))fromexifpath.endswith(".json"):artifact_contents=response.json()else:artifact_contents=response.textreturnartifact_contents
Makes a request to the dbt Cloud API to get the run status.
Returns:
Type
Description
int
The run status code.
Source code in prefect_dbt/cloud/jobs.py
719720721722723724725726727728729
@sync_compatibleasyncdefget_status_code(self)->int:""" Makes a request to the dbt Cloud API to get the run status. Returns: The run status code. """run_data=awaitself.get_run()run_status_code=run_data.get("status")returnrun_status_code
@sync_compatibleasyncdefretry_failed_steps(self)->"DbtCloudJobRun":# noqa: F821""" Retries steps that did not complete successfully in a run. Returns: A representation of the dbt Cloud job run. """job=awaitself._dbt_cloud_job.get_job()run=awaitself.get_run()trigger_job_run_options_override=awaitself._build_trigger_job_run_options(job=job,run=run)num_steps=len(trigger_job_run_options_override.steps_override)ifnum_steps==0:self.logger.info(f"{self._log_prefix} does not have any steps to retry.")else:self.logger.info(f"{self._log_prefix} has {num_steps} steps to retry.")run=awaitself._dbt_cloud_job.trigger(trigger_job_run_options=trigger_job_run_options_override,)returnrun
@sync_compatibleasyncdefwait_for_completion(self)->None:""" Waits for the job run to reach a terminal state. """awaitself._wait_until_state(in_final_state_fn=DbtCloudJobRunStatus.is_terminal_status_code,get_state_fn=self.get_status_code,log_state_fn=DbtCloudJobRunStatus,timeout_seconds=self._dbt_cloud_job.timeout_seconds,interval_seconds=self._dbt_cloud_job.interval_seconds,)
@task(name="Get dbt Cloud job details",description="Retrieves details of a dbt Cloud job ""for the job with the given job_id.",retries=3,retry_delay_seconds=10,)asyncdefget_dbt_cloud_job_info(dbt_cloud_credentials:DbtCloudCredentials,job_id:int,order_by:Optional[str]=None,)->Dict:""" A task to retrieve information about a dbt Cloud job. Args: dbt_cloud_credentials: Credentials for authenticating with dbt Cloud. job_id: The ID of the job to get. Returns: The job data returned by the dbt Cloud administrative API. Example: Get status of a dbt Cloud job: ```python from prefect import flow from prefect_dbt.cloud import DbtCloudCredentials from prefect_dbt.cloud.jobs import get_job @flow def get_job_flow(): credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789) return get_job( dbt_cloud_credentials=credentials, job_id=42 ) get_job_flow() ``` """# noqatry:asyncwithdbt_cloud_credentials.get_administrative_client()asclient:response=awaitclient.get_job(job_id=job_id,order_by=order_by,)exceptHTTPStatusErrorasex:raiseDbtCloudGetJobFailed(extract_user_message(ex))fromexreturnresponse.json()["data"]
@task(name="Get dbt Cloud job run ID",description="Extracts the run ID from a trigger job run API response",)defget_run_id(obj:Dict):""" Task that extracts the run ID from a trigger job run API response, This task is mainly used to maintain dependency tracking between the `trigger_dbt_cloud_job_run` task and downstream tasks/flows that use the run ID. Args: obj: The JSON body from the trigger job run response. Example: ```python from prefect import flow from prefect_dbt.cloud import DbtCloudCredentials from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run, get_run_id @flow def trigger_run_and_get_id(): dbt_cloud_credentials=DbtCloudCredentials( api_key="my_api_key", account_id=123456789 ) triggered_run_data = trigger_dbt_cloud_job_run( dbt_cloud_credentials=dbt_cloud_credentials, job_id=job_id, options=trigger_job_run_options, ) run_id = get_run_id.submit(triggered_run_data) return run_id trigger_run_and_get_id() ``` """id=obj.get("id")ifidisNone:raiseRuntimeError("Unable to determine run ID for triggered job.")returnid
@flow(name="Retry subset of dbt Cloud job run and wait for completion",description=("Retries a subset of dbt Cloud job run, filtered by select statuses, ""and waits for the triggered retry to complete."),)asyncdefretry_dbt_cloud_job_run_subset_and_wait_for_completion(dbt_cloud_credentials:DbtCloudCredentials,run_id:int,trigger_job_run_options:Optional[TriggerJobRunOptions]=None,max_wait_seconds:int=900,poll_frequency_seconds:int=10,)->Dict:""" Flow that retrys a subset of dbt Cloud job run, filtered by select statuses, and waits for the triggered retry to complete. Args: dbt_cloud_credentials: Credentials for authenticating with dbt Cloud. trigger_job_run_options: An optional TriggerJobRunOptions instance to specify overrides for the triggered job run. max_wait_seconds: Maximum number of seconds to wait for job to complete poll_frequency_seconds: Number of seconds to wait in between checks for run completion. run_id: The ID of the job run to retry. Raises: ValueError: If `trigger_job_run_options.steps_override` is set by the user. Returns: The run data returned by the dbt Cloud administrative API. Examples: Retry a subset of models in a dbt Cloud job run and wait for completion: ```python from prefect import flow from prefect_dbt.cloud import DbtCloudCredentials from prefect_dbt.cloud.jobs import retry_dbt_cloud_job_run_subset_and_wait_for_completion @flow def retry_dbt_cloud_job_run_subset_and_wait_for_completion_flow(): credentials = DbtCloudCredentials.load("MY_BLOCK_NAME") retry_dbt_cloud_job_run_subset_and_wait_for_completion( dbt_cloud_credentials=credentials, run_id=88640123, ) retry_dbt_cloud_job_run_subset_and_wait_for_completion_flow() ``` """# noqaiftrigger_job_run_optionsandtrigger_job_run_options.steps_overrideisnotNone:raiseValueError("Do not set `steps_override` in `trigger_job_run_options` ""because this flow will automatically set it")run_info_future=awaitget_dbt_cloud_run_info.submit(dbt_cloud_credentials=dbt_cloud_credentials,run_id=run_id,include_related=["run_steps"],)run_info=awaitrun_info_future.result()job_id=run_info["job_id"]job_info_future=awaitget_dbt_cloud_job_info.submit(dbt_cloud_credentials=dbt_cloud_credentials,job_id=job_id,)job_info=awaitjob_info_future.result()trigger_job_run_options_override=await_build_trigger_job_run_options(dbt_cloud_credentials=dbt_cloud_credentials,trigger_job_run_options=trigger_job_run_options,run_id=run_id,run_info=run_info,job_info=job_info,)# to circumvent `RuntimeError: The task runner is already started!`flow_run_context=FlowRunContext.get()task_runner_type=type(flow_run_context.task_runner)run_data=awaittrigger_dbt_cloud_job_run_and_wait_for_completion.with_options(task_runner=task_runner_type())(dbt_cloud_credentials=dbt_cloud_credentials,job_id=job_id,retry_filtered_models_attempts=0,trigger_job_run_options=trigger_job_run_options_override,max_wait_seconds=max_wait_seconds,poll_frequency_seconds=poll_frequency_seconds,)returnrun_data
@flowasyncdefrun_dbt_cloud_job(dbt_cloud_job:DbtCloudJob,targeted_retries:int=3,)->Dict[str,Any]:""" Flow that triggers and waits for a dbt Cloud job run, retrying a subset of failed nodes if necessary. Args: dbt_cloud_job: Block that holds the information and methods to interact with a dbt Cloud job. targeted_retries: The number of times to retry failed steps. Examples: ```python from prefect import flow from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob from prefect_dbt.cloud.jobs import run_dbt_cloud_job @flow def run_dbt_cloud_job_flow(): dbt_cloud_credentials = DbtCloudCredentials.load("dbt-token") dbt_cloud_job = DbtCloudJob( dbt_cloud_credentials=dbt_cloud_credentials, job_id=154217 ) return run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job) run_dbt_cloud_job_flow() ``` """logger=get_run_logger()run=awaittask(dbt_cloud_job.trigger.aio)(dbt_cloud_job)whiletargeted_retries>0:try:awaittask(run.wait_for_completion.aio)(run)result=awaittask(run.fetch_result.aio)(run)returnresultexceptDbtCloudJobRunFailed:logger.info(f"Retrying job run with ID: {run.run_id} "f"{targeted_retries} more times")run=awaittask(run.retry_failed_steps.aio)(run)targeted_retries-=1
fromprefectimportflowfromprefect_dbt.cloudimportDbtCloudCredentialsfromprefect_dbt.cloud.jobsimporttrigger_dbt_cloud_job_runfromprefect_dbt.cloud.modelsimportTriggerJobRunOptions@flowdeftrigger_dbt_cloud_job_run_flow():credentials=DbtCloudCredentials(api_key="my_api_key",account_id=123456789)trigger_dbt_cloud_job_run(dbt_cloud_credentials=credentials,job_id=1,options=TriggerJobRunOptions(git_branch="staging",schema_override="dbt_cloud_pr_123",dbt_version_override="0.18.0",target_name_override="staging",timeout_seconds_override=3000,generate_docs_override=True,threads_override=8,steps_override=["dbt seed","dbt run --fail-fast","dbt test --fail-fast",],),)trigger_dbt_cloud_job_run()
@task(name="Trigger dbt Cloud job run",description="Triggers a dbt Cloud job run for the job ""with the given job_id and optional overrides.",retries=3,retry_delay_seconds=10,)asyncdeftrigger_dbt_cloud_job_run(dbt_cloud_credentials:DbtCloudCredentials,job_id:int,options:Optional[TriggerJobRunOptions]=None,)->Dict:""" A task to trigger a dbt Cloud job run. Args: dbt_cloud_credentials: Credentials for authenticating with dbt Cloud. job_id: The ID of the job to trigger. options: An optional TriggerJobRunOptions instance to specify overrides for the triggered job run. Returns: The run data returned from the dbt Cloud administrative API. Examples: Trigger a dbt Cloud job run: ```python from prefect import flow from prefect_dbt.cloud import DbtCloudCredentials from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run @flow def trigger_dbt_cloud_job_run_flow(): credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789) trigger_dbt_cloud_job_run(dbt_cloud_credentials=credentials, job_id=1) trigger_dbt_cloud_job_run_flow() ``` Trigger a dbt Cloud job run with overrides: ```python from prefect import flow from prefect_dbt.cloud import DbtCloudCredentials from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run from prefect_dbt.cloud.models import TriggerJobRunOptions @flow def trigger_dbt_cloud_job_run_flow(): credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789) trigger_dbt_cloud_job_run( dbt_cloud_credentials=credentials, job_id=1, options=TriggerJobRunOptions( git_branch="staging", schema_override="dbt_cloud_pr_123", dbt_version_override="0.18.0", target_name_override="staging", timeout_seconds_override=3000, generate_docs_override=True, threads_override=8, steps_override=[ "dbt seed", "dbt run --fail-fast", "dbt test --fail-fast", ], ), ) trigger_dbt_cloud_job_run() ``` """# noqalogger=get_run_logger()logger.info(f"Triggering run for job with ID {job_id}")try:asyncwithdbt_cloud_credentials.get_administrative_client()asclient:response=awaitclient.trigger_job_run(job_id=job_id,options=options)exceptHTTPStatusErrorasex:raiseDbtCloudJobRunTriggerFailed(extract_user_message(ex))fromexrun_data=response.json()["data"]if"project_id"inrun_dataand"id"inrun_data:logger.info(f"Run successfully triggered for job with ID {job_id}. ""You can view the status of this run at "f"https://{dbt_cloud_credentials.domain}/#/accounts/"f"{dbt_cloud_credentials.account_id}/projects/{run_data['project_id']}/"f"runs/{run_data['id']}/")returnrun_data
importasynciofromprefect_dbt.cloudimportDbtCloudCredentialsfromprefect_dbt.cloud.jobsimporttrigger_dbt_cloud_job_run_and_wait_for_completionfromprefect_dbt.cloud.modelsimportTriggerJobRunOptionsasyncio.run(trigger_dbt_cloud_job_run_and_wait_for_completion(dbt_cloud_credentials=DbtCloudCredentials(api_key="my_api_key",account_id=123456789),job_id=1,trigger_job_run_options=TriggerJobRunOptions(git_branch="staging",schema_override="dbt_cloud_pr_123",dbt_version_override="0.18.0",target_name_override="staging",timeout_seconds_override=3000,generate_docs_override=True,threads_override=8,steps_override=["dbt seed","dbt run --fail-fast","dbt test --fail fast",],),))
@flow(name="Trigger dbt Cloud job run and wait for completion",description="Triggers a dbt Cloud job run and waits for the""triggered run to complete.",)asyncdeftrigger_dbt_cloud_job_run_and_wait_for_completion(dbt_cloud_credentials:DbtCloudCredentials,job_id:int,trigger_job_run_options:Optional[TriggerJobRunOptions]=None,max_wait_seconds:int=900,poll_frequency_seconds:int=10,retry_filtered_models_attempts:int=3,)->Dict:""" Flow that triggers a job run and waits for the triggered run to complete. Args: dbt_cloud_credentials: Credentials for authenticating with dbt Cloud. job_id: The ID of the job to trigger. trigger_job_run_options: An optional TriggerJobRunOptions instance to specify overrides for the triggered job run. max_wait_seconds: Maximum number of seconds to wait for job to complete poll_frequency_seconds: Number of seconds to wait in between checks for run completion. retry_filtered_models_attempts: Number of times to retry models selected by `retry_status_filters`. Raises: DbtCloudJobRunCancelled: The triggered dbt Cloud job run was cancelled. DbtCloudJobRunFailed: The triggered dbt Cloud job run failed. RuntimeError: The triggered dbt Cloud job run ended in an unexpected state. Returns: The run data returned by the dbt Cloud administrative API. Examples: Trigger a dbt Cloud job and wait for completion as a stand alone flow: ```python import asyncio from prefect_dbt.cloud import DbtCloudCredentials from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion asyncio.run( trigger_dbt_cloud_job_run_and_wait_for_completion( dbt_cloud_credentials=DbtCloudCredentials( api_key="my_api_key", account_id=123456789 ), job_id=1 ) ) ``` Trigger a dbt Cloud job and wait for completion as a sub-flow: ```python from prefect import flow from prefect_dbt.cloud import DbtCloudCredentials from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion @flow def my_flow(): ... run_result = trigger_dbt_cloud_job_run_and_wait_for_completion( dbt_cloud_credentials=DbtCloudCredentials( api_key="my_api_key", account_id=123456789 ), job_id=1 ) ... my_flow() ``` Trigger a dbt Cloud job with overrides: ```python import asyncio from prefect_dbt.cloud import DbtCloudCredentials from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion from prefect_dbt.cloud.models import TriggerJobRunOptions asyncio.run( trigger_dbt_cloud_job_run_and_wait_for_completion( dbt_cloud_credentials=DbtCloudCredentials( api_key="my_api_key", account_id=123456789 ), job_id=1, trigger_job_run_options=TriggerJobRunOptions( git_branch="staging", schema_override="dbt_cloud_pr_123", dbt_version_override="0.18.0", target_name_override="staging", timeout_seconds_override=3000, generate_docs_override=True, threads_override=8, steps_override=[ "dbt seed", "dbt run --fail-fast", "dbt test --fail fast", ], ), ) ) ``` """# noqalogger=get_run_logger()triggered_run_data_future=awaittrigger_dbt_cloud_job_run.submit(dbt_cloud_credentials=dbt_cloud_credentials,job_id=job_id,options=trigger_job_run_options,)run_id=(awaittriggered_run_data_future.result()).get("id")ifrun_idisNone:raiseRuntimeError("Unable to determine run ID for triggered job.")final_run_status,run_data=awaitwait_for_dbt_cloud_job_run(run_id=run_id,dbt_cloud_credentials=dbt_cloud_credentials,max_wait_seconds=max_wait_seconds,poll_frequency_seconds=poll_frequency_seconds,)iffinal_run_status==DbtCloudJobRunStatus.SUCCESS:try:list_run_artifacts_future=awaitlist_dbt_cloud_run_artifacts.submit(dbt_cloud_credentials=dbt_cloud_credentials,run_id=run_id,)run_data["artifact_paths"]=awaitlist_run_artifacts_future.result()exceptDbtCloudListRunArtifactsFailedasex:logger.warning("Unable to retrieve artifacts for job run with ID %s. Reason: %s",run_id,ex,)logger.info("dbt Cloud job run with ID %s completed successfully!",run_id,)returnrun_dataeliffinal_run_status==DbtCloudJobRunStatus.CANCELLED:raiseDbtCloudJobRunCancelled(f"Triggered job run with ID {run_id} was cancelled.")eliffinal_run_status==DbtCloudJobRunStatus.FAILED:whileretry_filtered_models_attempts>0:logger.info(f"Retrying job run with ID: {run_id} "f"{retry_filtered_models_attempts} more times")try:retry_filtered_models_attempts-=1run_data=awaitretry_dbt_cloud_job_run_subset_and_wait_for_completion(dbt_cloud_credentials=dbt_cloud_credentials,run_id=run_id,trigger_job_run_options=trigger_job_run_options,max_wait_seconds=max_wait_seconds,poll_frequency_seconds=poll_frequency_seconds,)returnrun_dataexceptException:passelse:raiseDbtCloudJobRunFailed(f"Triggered job run with ID: {run_id} failed.")else:raiseRuntimeError(f"Triggered job run with ID: {run_id} ended with unexpected"f"status {final_run_status.value}.")