Automating EMR step submission from AWS Lambda

What is EMR?

Elastic MapReduce is an Amazon web service tool for big data processing and analysis. The MapReduce is a framework that allows programmers to process large scale data across distributed processors in parallel.

Amazon EMR processes big data across a Hadoop cluster of virtual servers on Amazon Elastic Compute Cloud (EC2) and Amazon Simple Storage Service (S3). The elastic in EMR’s name refers to its dynamic resizing ability, which allows it to ramp up or reduce resource use depending on the demand at any given time.

 

What is AWS Lambda?

Lambda is a compute service that lets you run code without provisioning or managing servers. Lambda runs your code on a high-availability compute infrastructure and performs all of the administration of the compute resources, including server and operating system maintenance, capacity provisioning and automatic scaling, code monitoring and logging. With Lambda, you can run code for virtually any type of application or backend service. Lambda runs the code in response to the events.

 

Automating EMR step execution from Lambda

As discussed lambda executes the code in response to the events. The events can be S3 based, SQS or SNS.

The below java example shows how to automate an EMR step via AWS Lambda

 

/* Get AWS Credentials*/

AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain()).withRegion(Regions.US_EAST_1).build();

/* Get the list of clusters which are running or either in waiting submit the step to your required lambda*/


ArrayList<String> ClusterStatus = new ArrayList<String>();
ClusterStatus.add("RUNNING");
ClusterStatus.add("WAITING");
ListClustersRequest lcreq = new ListClustersRequest().withClusterStates(ClusterStatus);
ListClustersResult lcres = emr.listClusters(lcreq);
for (ClusterSummary csummary : lcres.getClusters()) {
if (csummary.getName().equalsIgnoreCase(System.getenv("EMR_NAME"))) {

emrJobFlowId = csummary.getId();
}
}

if(!emrJobFlowId.isEmpty()) {

List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
AddJobFlowStepsRequest req = new AddJobFlowStepsRequest();
req.withJobFlowId(emrJobFlowId);

// Run a custom jar file as a step
HadoopJarStepConfig hadoopConfig1 = new HadoopJarStepConfig().withJar("command-runner.jar").withArgs(
"spark-submit", "--deploy-mode", "client", "--class", System.getenv("EMR_CLASS_NAME"),
System.getenv("EMR_JAR_LOCATION")

 

Once the above lambda executes, a step is automatically submitted to the EMR and the EMR completes the job execution.