Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@aws-cdk/aws-stepfunctions-tasks

Package Overview
Dependencies
Maintainers
5
Versions
253
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@aws-cdk/aws-stepfunctions-tasks

Task integrations for AWS StepFunctions

  • 1.38.0
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
2.7K
increased by49.83%
Maintainers
5
Weekly downloads
 
Created
Source

Tasks for AWS Step Functions


cdk-constructs: Experimental

The APIs of higher level constructs in this module are experimental and under active development. They are subject to non-backward compatible changes or removal in any future version. These are not subject to the Semantic Versioning model and breaking changes will be announced in the release notes. This means that while you may use them, you may need to update your source code when upgrading to a newer version of this package.


AWS Step Functions is a web service that enables you to coordinate the components of distributed applications and microservices using visual workflows. You build applications from individual components that each perform a discrete function, or task, allowing you to scale and change applications quickly.

A Task state represents a single unit of work performed by a state machine. All work in your state machine is performed by tasks.

This module is part of the AWS Cloud Development Kit project.

Table Of Contents

Task

A Task state represents a single unit of work performed by a state machine. In the CDK, the exact work to be In the CDK, the exact work to be done is determined by a class that implements IStepFunctionsTask.

AWS Step Functions integrates with some AWS services so that you can call API actions, and coordinate executions directly from the Amazon States Language in Step Functions. You can directly call and pass parameters to the APIs of those services.

Paths

In the Amazon States Language, a path is a string beginning with $ that you can use to identify components within JSON text.

Learn more about input and output processing in Step Functions here

InputPath

Both InputPath and Parameters fields provide a way to manipulate JSON as it moves through your workflow. AWS Step Functions applies the InputPath field first, and then the Parameters field. You can first filter your raw input to a selection you want using InputPath, and then apply Parameters to manipulate that input further, or add new values. If you don't specify an InputPath, a default value of $ will be used.

The following example provides the field named input as the input to the Task state that runs a Lambda function.

const submitJob = new sfn.Task(stack, 'Invoke Handler', {
  task: new tasks.RunLambdaTask(submitJobLambda),
  inputPath: '$.input'
});

OutputPath

Tasks also allow you to select a portion of the state output to pass to the next state. This enables you to filter out unwanted information, and pass only the portion of the JSON that you care about. If you don't specify an OutputPath, a default value of $ will be used. This passes the entire JSON node to the next state.

The response from a Lambda function includes the response from the function as well as other metadata.

The following example assigns the output from the Task to a field named result

const submitJob = new sfn.Task(stack, 'Invoke Handler', {
  task: new tasks.RunLambdaTask(submitJobLambda),
  outputPath: '$.Payload.result'
});

ResultPath

The output of a state can be a copy of its input, the result it produces (for example, output from a Task state’s Lambda function), or a combination of its input and result. Use ResultPath to control which combination of these is passed to the state output. If you don't specify an ResultPath, a default value of $ will be used.

The following example adds the item from calling DynamoDB's getItem API to the state input and passes it to the next state.

new sfn.Task(this, 'PutItem', {
  task: tasks.CallDynamoDB.getItem({
    item: {
      MessageId: new tasks.DynamoAttributeValue().withS('12345'),
    },
    tableName: 'my-table',
  }),
  resultPath: `$.Item`
});

⚠️ The OutputPath is computed after applying ResultPath. All service integrations return metadata as part of their response. When using ResultPath, it's not possible to merge a subset of the task output to the input.

Task parameters from the state JSON

Most tasks take parameters. Parameter values can either be static, supplied directly in the workflow definition (by specifying their values), or a value available at runtime in the state machine's execution (either as its input or an output of a prior state). Parameter values available at runtime can be specified via the Data class, using methods such as Data.stringAt().

The following example provides the field named input as the input to the Lambda function and invokes it asynchronously.

const submitJob = new sfn.Task(stack, 'Invoke Handler', {
  task: new tasks.RunLambdaTask(submitJobLambda, {
    payload: sfn.Data.StringAt('$.input'),
    invocationType: tasks.InvocationType.EVENT,
  }),
});

Each service integration has its own set of parameters that can be supplied.

Evaluate Expression

Use the EvaluateExpression to perform simple operations referencing state paths. The expression referenced in the task will be evaluated in a Lambda function (eval()). This allows you to not have to write Lambda code for simple operations.

Example: convert a wait time from milliseconds to seconds, concat this in a message and wait:

const convertToSeconds = new sfn.Task(this, 'Convert to seconds', {
  task: new tasks.EvaluateExpression({ expression: '$.waitMilliseconds / 1000' }),
  resultPath: '$.waitSeconds'
});

const createMessage = new sfn.Task(this, 'Create message', {
  // Note: this is a string inside a string.
  task: new tasks.EvaluateExpression({
    expression: '`Now waiting ${$.waitSeconds} seconds...`',
    runtime: lambda.Runtime.NODEJS_10_X,
  }),
  resultPath: '$.message'
});

const publishMessage = new sfn.Task(this, 'Publish message', {
  task: new tasks.PublishToTopic(topic, {
    message: sfn.TaskInput.fromDataAt('$.message'),
  }),
  resultPath: '$.sns'
});

const wait = new sfn.Wait(this, 'Wait', {
  time: sfn.WaitTime.secondsPath('$.waitSeconds')
});

new sfn.StateMachine(this, 'StateMachine', {
  definition: convertToSeconds
    .next(createMessage)
    .next(publishMessage)
    .next(wait)
});

The EvaluateExpression supports a runtime prop to specify the Lambda runtime to use to evaluate the expression. Currently, the only runtime supported is lambda.Runtime.NODEJS_10_X.

Batch

Step Functions supports Batch through the service integration pattern.

SubmitJob

The SubmitJob API submits an AWS Batch job from a job definition.

import * as batch from '@aws-cdk/aws-batch';

const batchQueue = new batch.JobQueue(this, 'JobQueue', {
  computeEnvironments: [
    {
      order: 1,
      computeEnvironment: new batch.ComputeEnvironment(this, 'ComputeEnv', {
        computeResources: { vpc },
      }),
    },
  ],
});

const batchJobDefinition = new batch.JobDefinition(this, 'JobDefinition', {
  container: {
    image: ecs.ContainerImage.fromAsset(path.resolve(__dirname, 'batchjob-image')),
  },
});

const task = new sfn.Task(this, 'Submit Job', {
  task: new tasks.RunBatchJob({
    jobDefinition: batchJobDefinition,
    jobName: 'MyJob',
    jobQueue: batchQueue,
  }),
});

DynamoDB

You can call DynamoDB APIs from a Task state. Read more about calling DynamoDB APIs here

GetItem

The GetItem operation returns a set of attributes for the item with the given primary key.

new sfn.Task(this, 'Get Item', {
  task: tasks.CallDynamoDB.getItem({
    partitionKey: {
      name: 'messageId',
      value: new tasks.DynamoAttributeValue().withS('message-007'),
    },
    tableName: 'my-table',
  }),
});

PutItem

The PutItem operation creates a new item, or replaces an old item with a new item.

new sfn.Task(this, 'PutItem', {
  task: tasks.CallDynamoDB.putItem({
    item: {
      MessageId: new tasks.DynamoAttributeValue().withS('message-007'),
      Text: new tasks.DynamoAttributeValue().withS(sfn.Data.stringAt('$.bar')),
      TotalCount: new tasks.DynamoAttributeValue().withN('10'),
    },
    tableName: 'my-table',
  }),
});

DeleteItem

The DeleteItem operation deletes a single item in a table by primary key.

new sfn.Task(this, 'DeleteItem', {
  task: tasks.CallDynamoDB.deleteItem({
    partitionKey: {
      name: 'MessageId',
      value: new tasks.DynamoAttributeValue().withS('message-007'),
    },
    tableName: 'my-table',
  }),
  resultPath: 'DISCARD',
});

UpdateItem

The UpdateItem operation edits an existing item's attributes, or adds a new item to the table if it does not already exist.

const updateItemTask = new sfn.Task(this, 'UpdateItem', {
  task: tasks.CallDynamoDB.updateItem({
    partitionKey: {
      name: 'MessageId',
      value: new tasks.DynamoAttributeValue().withS('message-007'),
    },
    tableName: 'my-table',
    expressionAttributeValues: {
      ':val': new tasks.DynamoAttributeValue().withN(sfn.Data.stringAt('$.Item.TotalCount.N')),
      ':rand': new tasks.DynamoAttributeValue().withN('20'),
    },
    updateExpression: 'SET TotalCount = :val + :rand',
  }),
});

ECS

Step Functions supports ECS/Fargate through the service integration pattern.

RunTask

RunTask starts a new task using the specified task definition.

import * as ecs from '@aws-cdk/aws-ecs';

// See examples in ECS library for initialization of 'cluster' and 'taskDefinition'

new ecs.RunEcsFargateTask({
  cluster,
  taskDefinition,
  containerOverrides: [
    {
      containerName: 'TheContainer',
      environment: [
        {
          name: 'CONTAINER_INPUT',
          value: Data.stringAt('$.valueFromStateData'),
        }
      ]
    }
  ]
});

fargateTask.connections.allowToDefaultPort(rdsCluster, 'Read the database');

new sfn.Task(this, 'CallFargate', {
  task: fargateTask
});

EMR

Step Functions supports Amazon EMR through the service integration pattern. The service integration APIs correspond to Amazon EMR APIs but differ in the parameters that are used.

Read more about the differences when using these service integrations.

Create Cluster

Creates and starts running a cluster (job flow). Corresponds to the runJobFlow API in EMR.


const clusterRole = new iam.Role(stack, 'ClusterRole', {
  assumedBy: new iam.ServicePrincipal('ec2.amazonaws.com'),
});

const serviceRole = new iam.Role(stack, 'ServiceRole', {
  assumedBy: new iam.ServicePrincipal('elasticmapreduce.amazonaws.com'),
});

const autoScalingRole = new iam.Role(stack, 'AutoScalingRole', {
  assumedBy: new iam.ServicePrincipal('elasticmapreduce.amazonaws.com'),
});

autoScalingRole.assumeRolePolicy?.addStatements(
  new iam.PolicyStatement({
    effect: iam.Effect.ALLOW,
    principals: [
      new iam.ServicePrincipal('application-autoscaling.amazonaws.com'),
    ],
    actions: [
      'sts:AssumeRole',
    ],
  });
)

new sfn.Task(stack, 'Create Cluster', {
  task: new tasks.EmrCreateCluster({
    instances: {},
    clusterRole,
    name: sfn.TaskInput.fromDataAt('$.ClusterName').value,
    serviceRole,
    autoScalingRole,
    integrationPattern: sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
  }),
});

Termination Protection

Locks a cluster (job flow) so the EC2 instances in the cluster cannot be terminated by user intervention, an API call, or a job-flow error.

Corresponds to the setTerminationProtection API in EMR.

new sfn.Task(stack, 'Task', {
  task: new tasks.EmrSetClusterTerminationProtection({
    clusterId: 'ClusterId',
    terminationProtected: false,
  }),
});

Terminate Cluster

Shuts down a cluster (job flow). Corresponds to the terminateJobFlows API in EMR.

new sfn.Task(stack, 'Task', {
  task: new tasks.EmrTerminateCluster({
    clusterId: 'ClusterId'
  }),
});

Add Step

Adds a new step to a running cluster. Corresponds to the addJobFlowSteps API in EMR.

new sfn.Task(stack, 'Task', {
  task: new tasks.EmrAddStep({
    clusterId: 'ClusterId',
    name: 'StepName',
    jar: 'Jar',
    actionOnFailure: tasks.ActionOnFailure.CONTINUE,
  }),
});

Cancel Step

Cancels a pending step in a running cluster. Corresponds to the cancelSteps API in EMR.

new sfn.Task(stack, 'Task', {
  task: new tasks.EmrCancelStep({
    clusterId: 'ClusterId',
    stepId: 'StepId',
  }),
});

Modify Instance Fleet

Modifies the target On-Demand and target Spot capacities for the instance fleet with the specified InstanceFleetName.

Corresponds to the modifyInstanceFleet API in EMR.

new sfn.Task(stack, 'Task', {
  task: new tasks.EmrModifyInstanceFleetByName({
    clusterId: 'ClusterId',
    instanceFleetName: 'InstanceFleetName',
    targetOnDemandCapacity: 2,
    targetSpotCapacity: 0,
  }),
});

Modify Instance Group

Modifies the number of nodes and configuration settings of an instance group.

Corresponds to the modifyInstanceGroups API in EMR.

new sfn.Task(stack, 'Task', {
  task: new tasks.EmrModifyInstanceGroupByName({
    clusterId: 'ClusterId',
    instanceGroupName: sfn.Data.stringAt('$.InstanceGroupName'),
    instanceGroup: {
      instanceCount: 1,
    },
  }),
});

Glue

Step Functions supports AWS Glue through the service integration pattern.

You can call the StartJobRun API from a Task state.

new sfn.Task(stack, 'Task', {
  task: new tasks.RunGlueJobTask(jobName, {
    arguments: {
      key: 'value',
    },
    timeout: cdk.Duration.minutes(30),
    notifyDelayAfter: cdk.Duration.minutes(5),
  }),
});

Lambda

Invoke a Lambda function.

You can specify the input to your Lambda function through the payload attribute. By default, Step Functions invokes Lambda function with the state input (JSON path '$') as the input.

The following snippet invokes a Lambda Function with the state input as the payload by referencing the $ path.

new sfn.Task(this, 'Invoke with state input');

When a function is invoked, the Lambda service sends these response elements back.

⚠️ The response from the Lambda function is in an attribute called Payload

The following snippet invokes a Lambda Function by referencing the $.Payload path to reference the output of a Lambda executed before it.

new sfn.Task(this, 'Invoke with empty object as payload', {
  task: new tasks.RunLambdaTask(myLambda, {
    payload: sfn.TaskInput.fromObject({})
  }),
});

new sfn.Task(this, 'Invoke with payload field in the state input', {
  task: new tasks.RunLambdaTask(myOtherLambda, {
    payload: sfn.TaskInput.fromDataAt('$.Payload'),
  }),
});

The following snippet invokes a Lambda and sets the task output to only include the Lambda function response.

new sfn.Task(this, 'Invoke and set function response as task output', {
  task: new tasks.RunLambdaTask(myLambda, {
    payload: sfn.TaskInput.fromDataAt('$'),
  }),
  outputPath: '$.Payload',
});

You can have Step Functions pause a task, and wait for an external process to return a task token. Read more about the callback pattern

To use the callback pattern, set the token property on the task. Call the Step Functions SendTaskSuccess or SendTaskFailure APIs with the token to indicate that the task has completed and the state machine should resume execution.

The following snippet invokes a Lambda with the task token as part of the input to the Lambda.

  const task = new sfn.Task(stack, 'Invoke with callback', {
    task: new tasks.RunLambdaTask(myLambda, {
      integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,
      payload: {
        token: sfn.Context.taskToken,
        input: sfn.TaskInput.fromDataAt('$.someField'),
      }
    })
  });

⚠️ The task will pause until it receives that task token back with a SendTaskSuccess or SendTaskFailure call. Learn more about Callback with the Task Token.

SageMaker

Step Functions supports AWS SageMaker through the service integration pattern.

Create Training Job

You can call the CreateTrainingJob API from a Task state.

new sfn.Task(stack, 'TrainSagemaker', {
  task: new tasks.SagemakerTrainTask({
    trainingJobName: sfn.Data.stringAt('$.JobName'),
    role,
    algorithmSpecification: {
      algorithmName: 'BlazingText',
      trainingInputMode: tasks.InputMode.FILE,
    },
    inputDataConfig: [
      {
        channelName: 'train',
        dataSource: {
          s3DataSource: {
            s3DataType: tasks.S3DataType.S3_PREFIX,
            s3Location: tasks.S3Location.fromJsonExpression('$.S3Bucket'),
          },
        },
      },
    ],
    outputDataConfig: {
      s3OutputLocation: tasks.S3Location.fromBucket(s3.Bucket.fromBucketName(stack, 'Bucket', 'mybucket'), 'myoutputpath'),
    },
    resourceConfig: {
      instanceCount: 1,
      instanceType: ec2.InstanceType.of(ec2.InstanceClass.P3, ec2.InstanceSize.XLARGE2),
      volumeSizeInGB: 50,
    },
    stoppingCondition: {
      maxRuntime: cdk.Duration.hours(1),
    },
  }),
});

Create Transform Job

You can call the CreateTransformJob API from a Task state.

const transformJob = new tasks.SagemakerTransformTask(
    transformJobName: "MyTransformJob",
    modelName: "MyModelName",
    role,
    transformInput: {
        transformDataSource: {
            s3DataSource: {
                s3Uri: 's3://inputbucket/train',
                s3DataType: S3DataType.S3Prefix,
            }
        }
    },
    transformOutput: {
        s3OutputPath: 's3://outputbucket/TransformJobOutputPath',
    },
    transformResources: {
        instanceCount: 1,
        instanceType: ec2.InstanceType.of(ec2.InstanceClass.M4, ec2.InstanceSize.XLarge),
});

const task = new sfn.Task(this, 'Batch Inference', {
    task: transformJob
});

SNS

Step Functions supports Amazon SNS through the service integration pattern.

You can call the Publish API from a Task state to publish to an SNS topic.

import * as sns from '@aws-cdk/aws-sns';

// ...

const topic = new sns.Topic(this, 'Topic');

// Use a field from the execution data as message.
const task1 = new sfn.Task(this, 'Publish1', {
    task: new tasks.PublishToTopic(topic, {
        integrationPattern: sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
        message: TaskInput.fromDataAt('$.state.message'),
    })
});

// Combine a field from the execution data with
// a literal object.
const task2 = new sfn.Task(this, 'Publish2', {
    task: new tasks.PublishToTopic(topic, {
        message: TaskInput.fromObject({
            field1: 'somedata',
            field2: Data.stringAt('$.field2'),
        })
    })
});

Step Functions

You can manage AWS Step Functions executions.

AWS Step Functions supports it's own StartExecution API as a service integration.

// Define a state machine with one Pass state
const child = new sfn.StateMachine(stack, 'ChildStateMachine', {
    definition: sfn.Chain.start(new sfn.Pass(stack, 'PassState')),
});

// Include the state machine in a Task state with callback pattern
const task = new sfn.Task(stack, 'ChildTask', {
  task: new tasks.ExecuteStateMachine(child, {
    integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,
    input: {
      token: sfn.Context.taskToken,
      foo: 'bar'
    },
    name: 'MyExecutionName'
  })
});

// Define a second state machine with the Task state above
new sfn.StateMachine(stack, 'ParentStateMachine', {
  definition: task
});

SQS

Step Functions supports Amazon SQS

You can call the SendMessage API from a Task state to send a message to an SQS queue.

import * as sqs from '@aws-cdk/aws-sqs';

// ...

const queue = new sns.Queue(this, 'Queue');

// Use a field from the execution data as message.
const task1 = new sfn.Task(this, 'Send1', {
    task: new tasks.SendToQueue(queue, {
        messageBody: TaskInput.fromDataAt('$.message'),
        // Only for FIFO queues
        messageGroupId: '1234'
    })
});

// Combine a field from the execution data with
// a literal object.
const task2 = new sfn.Task(this, 'Send2', {
    task: new tasks.SendToQueue(queue, {
        messageBody: TaskInput.fromObject({
            field1: 'somedata',
            field2: Data.stringAt('$.field2'),
        }),
        // Only for FIFO queues
        messageGroupId: '1234'
    })
});

Keywords

FAQs

Package last updated on 08 May 2020

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc