Tasks for AWS Step Functions
AWS Step Functions is a web service that enables you to coordinate the
components of distributed applications and microservices using visual workflows.
You build applications from individual components that each perform a discrete
function, or task, allowing you to scale and change applications quickly.
A Task state represents a single unit of work performed by a state machine.
All work in your state machine is performed by tasks.
This module is part of the AWS Cloud Development Kit project.
Table Of Contents
Task
A Task state represents a single unit of work performed by a state machine. In the
CDK, the exact work to be In the CDK, the exact work to be done is determined by
a class that implements IStepFunctionsTask
.
AWS Step Functions integrates with some AWS services so that you can call API
actions, and coordinate executions directly from the Amazon States Language in
Step Functions. You can directly call and pass parameters to the APIs of those
services.
Paths
In the Amazon States Language, a path is a string beginning with $
that you
can use to identify components within JSON text.
Learn more about input and output processing in Step Functions here
InputPath
Both InputPath
and Parameters
fields provide a way to manipulate JSON as it
moves through your workflow. AWS Step Functions applies the InputPath
field first,
and then the Parameters
field. You can first filter your raw input to a selection
you want using InputPath, and then apply Parameters to manipulate that input
further, or add new values. If you don't specify an InputPath
, a default value
of $
will be used.
The following example provides the field named input
as the input to the Task
state that runs a Lambda function.
const submitJob = new tasks.LambdaInvoke(stack, 'Invoke Handler', {
lambdaFunction: submitJobLambda,
inputPath: '$.input'
});
OutputPath
Tasks also allow you to select a portion of the state output to pass to the next
state. This enables you to filter out unwanted information, and pass only the
portion of the JSON that you care about. If you don't specify an OutputPath
,
a default value of $
will be used. This passes the entire JSON node to the next
state.
The response from a Lambda function includes the response from the function
as well as other metadata.
The following example assigns the output from the Task to a field named result
const submitJob = new tasks.LambdaInvoke(stack, 'Invoke Handler', {
lambdaFunction: submitJobLambda,
outputPath: '$.Payload.result'
});
ResultPath
The output of a state can be a copy of its input, the result it produces (for
example, output from a Task state’s Lambda function), or a combination of its
input and result. Use ResultPath
to control which combination of these is
passed to the state output. If you don't specify an ResultPath
, a default
value of $
will be used.
The following example adds the item from calling DynamoDB's getItem
API to the state
input and passes it to the next state.
new tasks.DynamoGetItem(this, 'PutItem', {
item: { MessageId: { s: '12345'} },
tableName: 'my-table',
resultPath: `$.Item`,
});
⚠️ The OutputPath
is computed after applying ResultPath
. All service integrations
return metadata as part of their response. When using ResultPath
, it's not possible to
merge a subset of the task output to the input.
Task parameters from the state JSON
Most tasks take parameters. Parameter values can either be static, supplied directly
in the workflow definition (by specifying their values), or a value available at runtime
in the state machine's execution (either as its input or an output of a prior state).
Parameter values available at runtime can be specified via the Data
class,
using methods such as JsonPath.stringAt()
.
The following example provides the field named input
as the input to the Lambda function
and invokes it asynchronously.
const submitJob = new tasks.LambdaInvoke(stack, 'Invoke Handler', {
lambdaFunction: submitJobLambda,
payload: sfn.JsonPath.StringAt('$.input'),
invocationType: tasks.InvocationType.EVENT,
});
Each service integration has its own set of parameters that can be supplied.
Evaluate Expression
Use the EvaluateExpression
to perform simple operations referencing state paths. The
expression
referenced in the task will be evaluated in a Lambda function
(eval()
). This allows you to not have to write Lambda code for simple operations.
Example: convert a wait time from milliseconds to seconds, concat this in a message and wait:
const convertToSeconds = new tasks.EvaluateExpression(this, 'Convert to seconds', {
expression: '$.waitMilliseconds / 1000',
resultPath: '$.waitSeconds',
});
const createMessage = new tasks.EvaluateExpression(this, 'Create message', {
expression: '`Now waiting ${$.waitSeconds} seconds...`',
runtime: lambda.Runtime.NODEJS_10_X,
resultPath: '$.message',
});
const publishMessage = new tasks.SnsPublish(this, 'Publish message', {
topic,
message: sfn.TaskInput.fromDataAt('$.message'),
resultPath: '$.sns',
});
const wait = new sfn.Wait(this, 'Wait', {
time: sfn.WaitTime.secondsPath('$.waitSeconds')
});
new sfn.StateMachine(this, 'StateMachine', {
definition: convertToSeconds
.next(createMessage)
.next(publishMessage)
.next(wait)
});
The EvaluateExpression
supports a runtime
prop to specify the Lambda
runtime to use to evaluate the expression. Currently, the only runtime
supported is lambda.Runtime.NODEJS_10_X
.
Batch
Step Functions supports Batch through the service integration pattern.
SubmitJob
The SubmitJob API submits an AWS Batch job from a job definition.
import * as batch from '@aws-cdk/aws-batch';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';
const batchQueue = new batch.JobQueue(this, 'JobQueue', {
computeEnvironments: [
{
order: 1,
computeEnvironment: new batch.ComputeEnvironment(this, 'ComputeEnv', {
computeResources: { vpc },
}),
},
],
});
const batchJobDefinition = new batch.JobDefinition(this, 'JobDefinition', {
container: {
image: ecs.ContainerImage.fromAsset(path.resolve(__dirname, 'batchjob-image')),
},
});
const task = new tasks.BatchSubmitJob(this, 'Submit Job', {
jobDefinition: batchJobDefinition,
jobName: 'MyJob',
jobQueue: batchQueue,
});
DynamoDB
You can call DynamoDB APIs from a Task
state.
Read more about calling DynamoDB APIs here
GetItem
The GetItem operation returns a set of attributes for the item with the given primary key.
new tasks.DynamoGetItem(this, 'Get Item', {
key: { messageId: tasks.DynamoAttributeValue.fromString('message-007') },
table,
});
PutItem
The PutItem operation creates a new item, or replaces an old item with a new item.
new tasks.DynamoPutItem(this, 'PutItem', {
item: {
MessageId: tasks.DynamoAttributeValue.fromString('message-007'),
Text: tasks.DynamoAttributeValue.fromString(sfn.JsonPath.stringAt('$.bar')),
TotalCount: tasks.DynamoAttributeValue.fromNumber(10),
},
table,
});
DeleteItem
The DeleteItem operation deletes a single item in a table by primary key.
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';
new tasks.DynamoDeleteItem(this, 'DeleteItem', {
key: { MessageId: tasks.DynamoAttributeValue.fromString('message-007') },
table,
resultPath: sfn.JsonPath.DISCARD,
});
UpdateItem
The UpdateItem operation edits an existing item's attributes, or adds a new item
to the table if it does not already exist.
new tasks.DynamoUpdateItem(this, 'UpdateItem', {
key: { MessageId: tasks.DynamoAttributeValue.fromString('message-007') },
table,
expressionAttributeValues: {
':val': tasks.DynamoAttributeValue.numberFromString(sfn.JsonPath.stringAt('$.Item.TotalCount.N')),
':rand': tasks.DynamoAttributeValue.fromNumber(20),
},
updateExpression: 'SET TotalCount = :val + :rand',
});
ECS
Step Functions supports ECS/Fargate through the service integration pattern.
RunTask
RunTask starts a new task using the specified task definition.
EC2
The EC2 launch type allows you to run your containerized applications on a cluster
of Amazon EC2 instances that you manage.
When a task that uses the EC2 launch type is launched, Amazon ECS must determine where
to place the task based on the requirements specified in the task definition, such as
CPU and memory. Similarly, when you scale down the task count, Amazon ECS must determine
which tasks to terminate. You can apply task placement strategies and constraints to
customize how Amazon ECS places and terminates tasks. Learn more about task placement
The following example runs a job from a task definition on EC2
import * as ecs from '@aws-cdk/aws-ecs';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';
import * as sfn from '@aws-cdk/aws-stepfunctions';
const vpc = ec2.Vpc.fromLookup(stack, 'Vpc', {
isDefault: true,
});
const cluster = new ecs.Cluster(stack, 'Ec2Cluster', { vpc });
cluster.addCapacity('DefaultAutoScalingGroup', {
instanceType: new ec2.InstanceType('t2.micro'),
vpcSubnets: { subnetType: ec2.SubnetType.PUBLIC },
});
const taskDefinition = new ecs.TaskDefinition(stack, 'TD', {
compatibility: ecs.Compatibility.EC2,
});
taskDefinition.addContainer('TheContainer', {
image: ecs.ContainerImage.fromRegistry('foo/bar'),
memoryLimitMiB: 256,
});
const runTask = new tasks.EcsRunTask(stack, 'Run', {
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
cluster,
taskDefinition,
launchTarget: new tasks.EcsEc2LaunchTarget({
placementStrategies: [
ecs.PlacementStrategy.spreadAcrossInstances(),
ecs.PlacementStrategy.packedByCpu(),
ecs.PlacementStrategy.randomly(),
],
placementConstraints: [
ecs.PlacementConstraint.memberOf('blieptuut')
],
}),
});
Fargate
AWS Fargate is a serverless compute engine for containers that works with Amazon
Elastic Container Service (ECS). Fargate makes it easy for you to focus on building
your applications. Fargate removes the need to provision and manage servers, lets you
specify and pay for resources per application, and improves security through application
isolation by design. Learn more about Fargate
The Fargate launch type allows you to run your containerized applications without the need
to provision and manage the backend infrastructure. Just register your task definition and
Fargate launches the container for you.
The following example runs a job from a task definition on Fargate
import * as ecs from '@aws-cdk/aws-ecs';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';
import * as sfn from '@aws-cdk/aws-stepfunctions';
const vpc = ec2.Vpc.fromLookup(stack, 'Vpc', {
isDefault: true,
});
const cluster = new ecs.Cluster(stack, 'FargateCluster', { vpc });
const taskDefinition = new ecs.TaskDefinition(stack, 'TD', {
memoryMiB: '512',
cpu: '256',
compatibility: ecs.Compatibility.FARGATE,
});
const containerDefinition = taskDefinition.addContainer('TheContainer', {
image: ecs.ContainerImage.fromRegistry('foo/bar'),
memoryLimitMiB: 256,
});
const runTask = new tasks.EcsRunTask(stack, 'RunFargate', {
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
cluster,
taskDefinition,
containerOverrides: [{
containerDefinition,
environment: [{ name: 'SOME_KEY', value: sfn.JsonPath.stringAt('$.SomeKey') }],
}],
launchTarget: new tasks.EcsFargateLaunchTarget(),
});
EMR
Step Functions supports Amazon EMR through the service integration pattern.
The service integration APIs correspond to Amazon EMR APIs but differ in the
parameters that are used.
Read more about the differences when using these service integrations.
Create Cluster
Creates and starts running a cluster (job flow).
Corresponds to the runJobFlow
API in EMR.
const clusterRole = new iam.Role(stack, 'ClusterRole', {
assumedBy: new iam.ServicePrincipal('ec2.amazonaws.com'),
});
const serviceRole = new iam.Role(stack, 'ServiceRole', {
assumedBy: new iam.ServicePrincipal('elasticmapreduce.amazonaws.com'),
});
const autoScalingRole = new iam.Role(stack, 'AutoScalingRole', {
assumedBy: new iam.ServicePrincipal('elasticmapreduce.amazonaws.com'),
});
autoScalingRole.assumeRolePolicy?.addStatements(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
principals: [
new iam.ServicePrincipal('application-autoscaling.amazonaws.com'),
],
actions: [
'sts:AssumeRole',
],
});
)
new tasks.EmrCreateCluster(stack, 'Create Cluster', {
instances: {},
clusterRole,
name: sfn.TaskInput.fromDataAt('$.ClusterName').value,
serviceRole,
autoScalingRole,
integrationPattern: sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
});
Termination Protection
Locks a cluster (job flow) so the EC2 instances in the cluster cannot be
terminated by user intervention, an API call, or a job-flow error.
Corresponds to the setTerminationProtection
API in EMR.
new tasks.EmrSetClusterTerminationProtection(stack, 'Task', {
clusterId: 'ClusterId',
terminationProtected: false,
});
Terminate Cluster
Shuts down a cluster (job flow).
Corresponds to the terminateJobFlows
API in EMR.
new tasks.EmrTerminateCluster(stack, 'Task', {
clusterId: 'ClusterId'
});
Add Step
Adds a new step to a running cluster.
Corresponds to the addJobFlowSteps
API in EMR.
new tasks.EmrAddStep(stack, 'Task', {
clusterId: 'ClusterId',
name: 'StepName',
jar: 'Jar',
actionOnFailure: tasks.ActionOnFailure.CONTINUE,
});
Cancel Step
Cancels a pending step in a running cluster.
Corresponds to the cancelSteps
API in EMR.
new tasks.EmrCancelStep(stack, 'Task', {
clusterId: 'ClusterId',
stepId: 'StepId',
});
Modify Instance Fleet
Modifies the target On-Demand and target Spot capacities for the instance
fleet with the specified InstanceFleetName.
Corresponds to the modifyInstanceFleet
API in EMR.
new sfn.EmrModifyInstanceFleetByName(stack, 'Task', {
clusterId: 'ClusterId',
instanceFleetName: 'InstanceFleetName',
targetOnDemandCapacity: 2,
targetSpotCapacity: 0,
});
Modify Instance Group
Modifies the number of nodes and configuration settings of an instance group.
Corresponds to the modifyInstanceGroups
API in EMR.
new tasks.EmrModifyInstanceGroupByName(stack, 'Task', {
clusterId: 'ClusterId',
instanceGroupName: sfn.JsonPath.stringAt('$.InstanceGroupName'),
instanceGroup: {
instanceCount: 1,
},
});
Glue
Step Functions supports AWS Glue through the service integration pattern.
You can call the StartJobRun
API from a Task
state.
new GlueStartJobRun(stack, 'Task', {
jobName: 'my-glue-job',
arguments: {
key: 'value',
},
timeout: cdk.Duration.minutes(30),
notifyDelayAfter: cdk.Duration.minutes(5),
});
Lambda
Invoke a Lambda function.
You can specify the input to your Lambda function through the payload
attribute.
By default, Step Functions invokes Lambda function with the state input (JSON path '$')
as the input.
The following snippet invokes a Lambda Function with the state input as the payload
by referencing the $
path.
import * as lambda from '@aws-cdk/aws-lambda';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';
const myLambda = new lambda.Function(this, 'my sample lambda', {
code: Code.fromInline(`exports.handler = async () => {
return {
statusCode: '200',
body: 'hello, world!'
};
};`),
runtime: Runtime.NODEJS_12_X,
handler: 'index.handler',
});
new tasks.LambdaInvoke(this, 'Invoke with state input', {
lambdaFunction: myLambda,
});
When a function is invoked, the Lambda service sends these response
elements
back.
⚠️ The response from the Lambda function is in an attribute called Payload
The following snippet invokes a Lambda Function by referencing the $.Payload
path
to reference the output of a Lambda executed before it.
new tasks.LambdaInvoke(this, 'Invoke with empty object as payload', {
lambdaFunction: myLambda,
payload: sfn.TaskInput.fromObject({}),
});
new tasks.LambdaInvoke(this, 'Invoke with payload field in the state input', {
lambdaFunction: myOtherLambda,
payload: sfn.TaskInput.fromDataAt('$.Payload'),
});
The following snippet invokes a Lambda and sets the task output to only include
the Lambda function response.
new tasks.LambdaInvoke(this, 'Invoke and set function response as task output', {
lambdaFunction: myLambda,
payload: sfn.TaskInput.fromDataAt('$'),
outputPath: '$.Payload',
});
You can have Step Functions pause a task, and wait for an external process to
return a task token. Read more about the callback pattern
To use the callback pattern, set the token
property on the task. Call the Step
Functions SendTaskSuccess
or SendTaskFailure
APIs with the token to
indicate that the task has completed and the state machine should resume execution.
The following snippet invokes a Lambda with the task token as part of the input
to the Lambda.
new tasks.LambdaInvoke(stack, 'Invoke with callback', {
lambdaFunction: myLambda,
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
payload: sfn.TaskInput.fromObject({
token: sfn.JsonPath.taskToken,
input: sfn.JsonPath.stringAt('$.someField'),
}),
});
⚠️ The task will pause until it receives that task token back with a SendTaskSuccess
or SendTaskFailure
call. Learn more about Callback with the Task
Token.
SageMaker
Step Functions supports AWS SageMaker through the service integration pattern.
Create Training Job
You can call the CreateTrainingJob
API from a Task
state.
new sfn.SagemakerTrainTask(this, 'TrainSagemaker', {
trainingJobName: sfn.JsonPath.stringAt('$.JobName'),
role,
algorithmSpecification: {
algorithmName: 'BlazingText',
trainingInputMode: tasks.InputMode.FILE,
},
inputDataConfig: [{
channelName: 'train',
dataSource: {
s3DataSource: {
s3DataType: tasks.S3DataType.S3_PREFIX,
s3Location: tasks.S3Location.fromJsonExpression('$.S3Bucket'),
},
},
}],
outputDataConfig: {
s3OutputLocation: tasks.S3Location.fromBucket(s3.Bucket.fromBucketName(stack, 'Bucket', 'mybucket'), 'myoutputpath'),
},
resourceConfig: {
instanceCount: 1,
instanceType: ec2.InstanceType.of(ec2.InstanceClass.P3, ec2.InstanceSize.XLARGE2),
volumeSize: cdk.Size.gibibytes(50),
},
stoppingCondition: {
maxRuntime: cdk.Duration.hours(1),
},
});
Create Transform Job
You can call the CreateTransformJob
API from a Task
state.
new sfn.SagemakerTransformTask(this, 'Batch Inference', {
transformJobName: 'MyTransformJob',
modelName: 'MyModelName',
role,
transformInput: {
transformDataSource: {
s3DataSource: {
s3Uri: 's3://inputbucket/train',
s3DataType: S3DataType.S3Prefix,
}
}
},
transformOutput: {
s3OutputPath: 's3://outputbucket/TransformJobOutputPath',
},
transformResources: {
instanceCount: 1,
instanceType: ec2.InstanceType.of(ec2.InstanceClass.M4, ec2.InstanceSize.XLarge),
}
});
SNS
Step Functions supports Amazon SNS through the service integration pattern.
You can call the Publish
API from a Task
state to publish to an SNS topic.
import * as sns from '@aws-cdk/aws-sns';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';
const topic = new sns.Topic(this, 'Topic');
const task1 = new tasks.SnsPublish(this, 'Publish1', {
topic,
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
message: sfn.TaskInput.fromDataAt('$.state.message'),
});
const task2 = new tasks.SnsPublish(this, 'Publish2', {
topic,
message: sfn.TaskInput.fromObject({
field1: 'somedata',
field2: sfn.JsonPath.stringAt('$.field2'),
})
});
Step Functions
Start Execution
You can manage AWS Step Functions executions.
AWS Step Functions supports it's own StartExecution
API as a service integration.
const child = new sfn.StateMachine(stack, 'ChildStateMachine', {
definition: sfn.Chain.start(new sfn.Pass(stack, 'PassState')),
});
const task = new StepFunctionsStartExecution(stack, 'ChildTask', {
stateMachine: child,
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
input: sfn.TaskInput.fromObject({
token: sfn.JsonPath.taskToken,
foo: 'bar'
}),
name: 'MyExecutionName'
});
new sfn.StateMachine(stack, 'ParentStateMachine', {
definition: task
});
Invoke Activity
You can invoke a Step Functions Activity which enables you to have
a task in your state machine where the work is performed by a worker that can
be hosted on Amazon EC2, Amazon ECS, AWS Lambda, basically anywhere. Activities
are a way to associate code running somewhere (known as an activity worker) with
a specific task in a state machine.
When Step Functions reaches an activity task state, the workflow waits for an
activity worker to poll for a task. An activity worker polls Step Functions by
using GetActivityTask, and sending the ARN for the related activity.
After the activity worker completes its work, it can provide a report of its
success or failure by using SendTaskSuccess
or SendTaskFailure
. These two
calls use the taskToken provided by GetActivityTask to associate the result
with that task.
The following example creates an activity and creates a task that invokes the activity.
const submitJobActivity = new sfn.Activity(this, 'SubmitJob');
new tasks.StepFunctionsInvokeActivity(this, 'Submit Job', {
activity: submitJobActivity,
});
SQS
Step Functions supports Amazon SQS
You can call the SendMessage
API from a Task
state
to send a message to an SQS queue.
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';
import * as sqs from '@aws-cdk/aws-sqs';
const queue = new sqs.Queue(this, 'Queue');
const task1 = new tasks.SqsSendMessage(this, 'Send1', {
queue,
messageBody: sfn.TaskInput.fromDataAt('$.message'),
});
const task2 = new tasks.SqsSendMessage(this, 'Send2', {
queue,
messageBody: sfn.TaskInput.fromObject({
field1: 'somedata',
field2: sfn.JsonPath.stringAt('$.field2'),
}),
});