Thursday, November 30, 2023

Difference Between ArrayList And Vector in Java

In many ways Vector class in Java is just like ArrayList apart from some differences and this post is about those differences between the ArrayList and Vector in Java.

There are many similarities between Vector and ArrayList classes in Java. Vector, just like ArrayList, is also a growable dynamic array. As of Java v1.2, this class was retrofitted to implement the List interface, making it a member of the Java Collections Framework.

With JDK 5 it was also retrofitted for generics and implements Iterable interface too which means it can also use enhanced for loop.

ArrayList Vs Vector in Java

  • ArrayList is not synchronized whereas Vector is Synchronized. Which means that the methods in the Vector class are Synchronized making Vector thread-safe and ready to use as-is in a multi-threaded environment.
    ArrayList, if needed in a multi-threaded environment, has to be synchronized externally using Collections.synchronizedList method which returns a synchronized (thread-safe) list backed by the specified list.
  • Refer How and why to synchronize ArrayList in Java to know how to synchronize an ArrayList.

  • A Vector, by default, doubles the size of its array when it needs to expand the array, while the ArrayList increases its array size by 50 percent.
  • As mentioned though Vector is retrofitted to implement the List interface it still has legacy methods which are not there in ArrayList. As Example methods like addElement(), removeElement(), capacity() which are there in Vector class but not in ArrayList.
  • Performance wise Vector is comparatively slower than the ArrayList because it is synchronized. That means only one thread can access method of Vector at the time and there is an overhead of acquiring lock on the object too.
  • For traversing an ArrayList as well as Vector, Iterator or ListIterator can be used. That Iterator/ListIterator is fail-fast and throws ConcurrentModificationException if any other Thread modifies the map structurally by adding or removing any element except Iterator's own remove() method.
    For Vector enumeration can also be used for traversing which is not fail-fast.

That's all for this topic Difference Between ArrayList And Vector in Java. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Difference Between ArrayList And LinkedList in Java
  2. Difference Between Array And ArrayList in Java
  3. Difference Between ArrayList And CopyOnWriteArrayList in Java
  4. How to Remove Elements From an ArrayList in Java
  5. Java Collections Interview Questions And Answers

You may also like-

  1. Fail-Fast Vs Fail-Safe Iterator in Java
  2. How to sort HashSet in Java
  3. Difference Between Comparable and Comparator in Java
  4. Polymorphism in Java
  5. Array in Java With Examples
  6. finalize method in Java
  7. Nested Try Statements in Java Exception Handling
  8. Spring Boot Event Driven Microservice With Kafka

OutputCommitter in Hadoop

In a distributed environment like Hadoop framework it is important to track the success or failure of all the tasks running on different nodes. The whole job should be marked as successfully finished or aborted based on the success (or failure of any task) of all the tasks. To ensure that Hadoop framework uses commit protocol and the class used for the purpose is known as OutputCommitter in Hadoop.

OutputCommitter class in Hadoop is an abstract class and its concrete implementation is the FileOutputCommitter class.

As per Hadoop docs-

FileOutputCommitter- An OutputCommitter that commits files specified in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.

Tasks performed by OutputCommitter

The Hadoop MapReduce framework relies on the OutputCommitter of the job to do the following tasks-

  1. Setup the job during initialization. For example, create the temporary output directory for the job during the initialization of the job.
  2. Job cleanup after the job completion. For example, remove the temporary output directory after the job completion.
  3. Setup the task temporary output.
  4. Check whether a task needs a commit. This is to avoid the commit procedure if a task does not need commit.
  5. Commit of the task output.
  6. Discard the task commit.

Methods in OutputCommitter class in Hadoop (For MR 2 API)

  • abortJob(JobContext jobContext, org.apache.hadoop.mapreduce.JobStatus.State state)- For aborting an unsuccessful job's output. Note that this is invoked for jobs with final runstate as JobStatus.FAILED or JobStatus.KILLED. This is called from the application master process for the entire job. This may be called multiple times.
  • abortTask(TaskAttemptContext taskContext)- Discard the task output. This is called from a task's process to clean up a single task's output that can not yet been committed. This may be called multiple times for the same task, but for different task attempts.
  • commitJob(JobContext jobContext)- For committing job's output after successful job completion. That is when job clean up also happens. Note that this is invoked for jobs with final runstate as SUCCESSFUL. This is called from the application master process for the entire job. This is guaranteed to only be called once.
  • commitTask(TaskAttemptContext taskContext)- To promote the task's temporary output to final output location. If needsTaskCommit(TaskAttemptContext) returns true and this task is the task that the AM determines finished first, this method is called to commit an individual task's output. This is to mark that tasks output as complete.
  • needsTaskCommit(TaskAttemptContext taskContext)- Check whether task needs a commit. This is called from each individual task's process that will output to HDFS, and it is called just for that task.
  • setupJob(JobContext jobContext)- For the framework to setup the job output during initialization. This is called from the application master process for the entire job. This will be called multiple times, once per job attempt.
  • setupTask(TaskAttemptContext taskContext)- Sets up output for the task. This is called from each individual task's process that will output to HDFS, and it is called just for that task. This may be called multiple times for the same task, but for different task attempts.

Reference- https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/OutputCommitter.html

That's all for this topic OutputCommitter in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Distributed Cache in Hadoop MapReduce
  2. Using Combiner to Improve MapReduce Performance in Hadoop
  3. How MapReduce Works in Hadoop
  4. Chaining MapReduce Job in Hadoop
  5. How to Compress Intermediate Map Output in Hadoop

You may also like-

  1. NameNode, DataNode And Secondary NameNode in HDFS
  2. HDFS Federation in Hadoop Framework
  3. Compressing File in bzip2 Format in Hadoop - Java Program
  4. Using Avro File With Hadoop MapReduce
  5. Fair Scheduler in YARN
  6. ArrayBlockingQueue in Java Concurrency
  7. String And Thread-Safety in Java
  8. Converting int to String - Java Program

Wednesday, November 29, 2023

ToolRunner and GenericOptionsParser in Hadoop

GenericOptionsParser is a utility class in Hadoop which resides in org.apache.hadoop.util package. GenericOptionsParser class helps in setting options through command line. It parses the command line arguments and sets them on a configuration object that can then be used in the application.

How GenericOptionsParser class is used

Rather than using GenericOptionsParser class directly generally you will implement Tool interface in your MapReduce class and use ToolRunner.run method to run your application which will use GenericOptionsParser internally to parse the command line arguments.

How GenericOptionsParser class helps

If you set configuration arguments with in your code then you are hard coding those arguments. Any change in any argument will require code change and recreation of jar.

Passing argument in command line gives the flexibility to add, reduce or change arguments without requiring any change in the code.

Generic Options

You can specify command line arguments using the following generic options.

  1. -archives <comma separated list of archives>- Specify comma separated archives to be unarchived on the compute machines. Applies only to job.
  2. -conf <configuration file>- Specify an application configuration file.
  3. -D <property>=<value>- Use value for given property.
  4. -files <comma separated list of files>- Specify comma separated files to be copied to the map reduce cluster. Applies only to job.
  5. -fs <file:///> or <hdfs://namenode:port>- Specify default filesystem URL to use. Overrides ‘fs.defaultFS’ property from configurations.
  6. -jt <local> or <resourcemanager:port>- Specify a ResourceManager. Applies only to job.
  7. -libjars <comma seperated list of jars>- Specify comma separated jar files to include in the classpath. Applies only to job.

GenericOptionParser with ToolRunner example

In the post Using Avro File With Hadoop MapReduce there is an example of using Avro file with MapReduce. In that example Avro schema is inlined with in the code.

Here the same example is written by passing that schema file (saleschema.avsc) as a command line argument.

saleschema.avsc

{
  "type": "record",    
  "name": "SalesRecord",
  "doc" : "Sales Records",
  "fields": 
 [
  {"name":"item", "type": "string"},
  {"name":"totalsales", "type": "int"}
 ]
}

MapReduce code

import java.io.File;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvroMR extends Configured implements Tool{

  //Mapper
  public static class ItemMapper extends Mapper<LongWritable, Text, AvroKey<Text>, 
      AvroValue<GenericRecord>>{
    private Text item = new Text();
    private GenericRecord record;
     @Override
    protected void setup(Context context)
            throws IOException, InterruptedException {
      // Getting the file passed as arg in command line
      Schema SALES_SCHEMA = new Schema.Parser().parse(new File("saleschema.avsc"));
      record = new GenericData.Record(SALES_SCHEMA);
    }
    public void map(LongWritable key, Text value, Context context) 
           throws IOException, InterruptedException {
      //splitting record
      String[] salesArr = value.toString().split("\t");        
      item.set(salesArr[0]);
      record.put("item", salesArr[0]);
      record.put("totalsales", Integer.parseInt(salesArr[1]));
      context.write(new AvroKey<Text>(item), new AvroValue<GenericRecord>(record));
    }
  }
  
  // Reducer
  public static class SalesReducer extends Reducer<AvroKey<Text>, AvroValue<GenericRecord>, 
             AvroKey<GenericRecord>, NullWritable>{    
    Schema SALES_SCHEMA;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      // Getting the file passed as arg in command line
      SALES_SCHEMA = new Schema.Parser().parse(new File("saleschema.avsc"));
    }
    public void reduce(AvroKey<Text> key, Iterable<AvroValue<GenericRecord>> values,
        Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (AvroValue<GenericRecord> value : values) {
        GenericRecord    record = value.datum();
        sum += (Integer)record.get("totalsales");
      }
      GenericRecord record = new GenericData.Record(SALES_SCHEMA);
      record.put("item", key.datum());
      record.put("totalsales", sum);
      context.write(new AvroKey<GenericRecord>(record), NullWritable.get());
    }
  }
  
  public static void main(String[] args) throws Exception{
    int exitFlag = ToolRunner.run(new AvroMR(), args);
    System.exit(exitFlag);
  }
    
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    Job job = Job.getInstance(conf, "AvroMR");
    job.setJarByClass(getClass());
    job.setMapperClass(ItemMapper.class);    
    job.setReducerClass(SalesReducer.class);
    AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
    // Schema file needed here also
    Schema SALES_SCHEMA = new Schema.Parser().parse(
        new File("/home/netjs/saleschema.avsc"));
    AvroJob.setMapOutputValueSchema(job, SALES_SCHEMA);
    AvroJob.setOutputKeySchema(job,    SALES_SCHEMA);    
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}
Running this Hadoop MapReduce program with schema file passed as command line argument.
hadoop jar /home/netjs/netjshadoop.jar org.netjs.AvroMR -files /home/netjs/saleschema.avsc /test/input/sales.txt /test/out/sale

Here location of the schema file in the local file system is passed as a command line argument.

You can see the content of Avro output file using the avro-tools jar

hadoop jar /PATH_TO_JAR/avro-tools-1.8.2.jar tojson /test/out/sale/part-r-00000.avro

{"item":"Item1","totalsales":1158}
{"item":"Item2","totalsales":642}
{"item":"Item3","totalsales":1507}

That's all for this topic ToolRunner and GenericOptions in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. How to Handle Missing And Under Replicated Blocks in HDFS
  2. What is SafeMode in Hadoop
  3. How to Compress MapReduce Job Output in Hadoop
  4. How to Write a Map Only Job in Hadoop MapReduce
  5. How to Check Hadoop MapReduce Logs

You may also like-

  1. Input Splits in Hadoop
  2. Capacity Scheduler in YARN
  3. How to Read And Write SequenceFile in Hadoop
  4. Parquet File Format in Hadoop
  5. Java Stream API Interview Questions
  6. Generating Getters And Setters Using Reflection - Java Program
  7. BigDecimal in Java
  8. Zipping Files in Java

Tuesday, November 28, 2023

How to Handle Missing And Under Replicated Blocks in HDFS

In this post we’ll see how to get information about missing or corrupt blocks in HDFS and how to fix it. We'll also see how to fix under replicated blocks in HDFS.

Get information about corrupt or missing HDFS blocks

For getting information about corrupt or missing blocks in HDFS you can use following HDFS command which prints out list of missing blocks and files they belong to.

hdfs fsck -list-corruptfileblocks

Fixing corrupt or missing HDFS blocks

Using that information you can decide how important the file is where you have missing blocks. Since the easiest way is to delete the file and copy it to HDFS again. If you are ok with deleting the files that have corrupt blocks you can use the following command.

hdfs fsck / -delete

This command deletes corrupted files.

If you still want to have a shot at fixing the blocks that are corrupted using the file names which you got from running the hdfs fsck -list-corruptfileblocks command you can use the following command.

hdfs fsck <path to file> -locations -blocks -files

This command prints out locations for every block. Using that information you can go the data nodes where block is stored. You can verify if there is any network or hardware related error or any file system problem and fixing that will make the block healthy again or not.

Fixing under replicated blocks problem in Hadoop

If you have under replicated blocks in HDFS for files then you can use hdfs fsck / command to get that information.

Then you can use the following script where hdfs dfs -setrep <replication number> command is used to set required replication factor for the files.

 
$ hdfs fsck / | grep 'Under replicated' | awk -F':' '{print $1}' >> /tmp/files

$ for problemfile in `cat /tmp/files`; do echo "Setting replication for $problemfile"; hdfs dfs -setrep 3 $problemfile; done 

Actually when you run hdfs fsck / command the output is in the following form for the under replicated blocks -

File name: Under replicated <block>.
   Target Replicas is 3 but found 1 live replica(s), 0 decommissioned replica(s), 0 decommissioning replica(s).
From this output using awk command you take the file name where word “Under replicated” is found and write them in a temp file. Then you set replication factor to 3 ( in this case) for those files.

That's all for this topic How to Handle Missing And Under Replicated Blocks in HDFS. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Replica Placement Policy in Hadoop Framework
  2. NameNode, DataNode And Secondary NameNode in HDFS
  3. HDFS Commands Reference List
  4. HDFS High Availability
  5. File Read in HDFS - Hadoop Framework Internal Steps

You may also like-

  1. Speculative Execution in Hadoop
  2. YARN in Hadoop
  3. How to Compress Intermediate Map Output in Hadoop
  4. How to Configure And Use LZO Compression in Hadoop
  5. How HashMap Internally Works in Java
  6. Stream API in Java 8
  7. How to Run a Shell Script From Java Program
  8. Lazy Initializing Spring Beans

Thursday, November 23, 2023

Spring Boot Microservice + API Gateway + Resilience4J

In this post we'll see how to configure API Gateway in Microservices using Spring Cloud Gateway with Eureka as service registry and Resilience4J as circuit breaker.

Spring Cloud Gateway

Spring Cloud Gateway project provides an API Gateway which acts as a front for routing to different microservices.

Spring Cloud Gateway

Spring Cloud Gateway is built on Spring Boot, Spring WebFlux, and Project Reactor. Spring Cloud Gateway requires the Netty runtime provided by Spring Boot and Spring Webflux. It does not work in a traditional Servlet Container or when built as a WAR. Therefore, you'll see the Gateway service starting on a Netty server rather than on a Tomcat server which is used for Spring Boot web applications by default.

  1. Route: It is the basic building block of the gateway. It is defined by an ID, a destination URI, a collection of predicates, and a collection of filters. A route is matched if the aggregate predicate is true and forwards the request to the destination URI.
  2. Predicate: This is a Java 8 Function Predicate. The input type is a Spring Framework ServerWebExchange. This lets you match on anything from the HTTP request, such as headers or parameters.
  3. Filter: These are instances of GatewayFilter that have been constructed with a specific factory. Used to modify requests and responses and apply features such as rate limiting, circuit breaker before or after sending the downstream request.

Advantages of using API gateway

  1. API gateway sits between an external client and multiple microservices and acts as a single entry & exit point. That makes gateway as a single implementation point to provide cross cutting concerns to all the microservices such as security (Initial authentication), monitoring/metrics, and resiliency (circuit breaker, rate limiting).
  2. Client doesn’t need to know about all the backend services. They need to communicate only with API gateway.

Spring Boot Microservice with API Gateway example

In this example we'll configure an API Gateway, register it as a service with Eureka and also configure Resilience4J as circuit breaker at the gateway level.

We'll use the example used in this post as a base example- Spring Boot Microservice - Service Registration and Discovery With Eureka with that you should have CustomerService and AccountService ready and registered with Eureka.

Creating Gateway Service

Gateway service is created as a separate microservice. If you are using Spring Tool Suite then create a new Spring Starter Project and name it gateway-service.

Select "Gateway" as dependency under "Spring Cloud Routing". Select "Resilience4J" under "Spring Cloud Circuit Breaker" and "Eureka Discovery Client" under "Spring Cloud Discovery".

With that the created pom.xml should look like this-

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.2</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
  <groupId>com.netjstech</groupId>
  <artifactId>gatewayservice</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>gateway-service</name>
  <description>Cloud Gateway Service</description>
  <properties>
    <java.version>17</java.version>
    <spring-cloud.version>2022.0.4</spring-cloud.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-gateway</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <scope>runtime</scope>
      <optional>true</optional>
    </dependency>
  </dependencies>
  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>

</project>
  • Eureka client is needed because gateway-service will be registered with Eureka server.
  • Resilience4J is needed to implement circuit breaker functionality as gateway level.
  • Actuator is needed to see metrics for circuit breaker.
  • Of course, cloud-starter-gateway is needed since we are implementing an API gateway.

Application class with @EnableDiscoveryClient annotation

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@SpringBootApplication
@EnableDiscoveryClient
public class GatewayServiceApplication {

	public static void main(String[] args) {
		SpringApplication.run(GatewayServiceApplication.class, args);
	}
}

API Gateway configuration

You can configure gateway using config or code. Here it is done using config in application.yml file.

server:
  port: 8181
  
eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka
  
management:
  endpoint:
    health:
      show-details: always
  endpoints:
    web:
      exposure:
        include: health
  health:
    circuitbreakers:
      enabled: true
 
resilience4j.circuitbreaker:
  configs:
    default:
      registerHealthIndicator: true
      
spring:
  application:
    name: gateway-service
  cloud:
    gateway:
      discovery:
        locator:
          enabled: false
          lower-case-service-id: true
      routes:
        - id: customer-service
          uri: http://localhost:8081 #lb://customer-service
          predicates:
            - Path=/customer/**
          filters:
            - name: CircuitBreaker
              args:
                name: CusCircuitBreaker
                fallbackUri: forward:/fallback/customer
        - id: account-service
          uri: lb://account-service
          predicates:
            - Path=/account/**   
          filters:
            - name: CircuitBreaker
              args:
                name: AcctCircuitBreaker
                fallbackUri: forward:/fallback/account

If we go over the configuration properties from the beginning of the file.

  • Netty server, this service starts on, listens on the port 8181.
  • Next there is configuration for actuator so that metrics for circuit breaker is also accessible when http://localhost:8181/actuator/health is accessed.
  • Properties for gateway starts from routes which has a id, a destination URI to which the request is forwarded when the predicate matches.
  • Predicate has a wildcard which means URLs having anything after /customer/ or /account/ should be matched with these predicates.
  • Here note that URI for customer-service is a direct URL (a static route) where as for account-service it is lb://account-service which means a load balanced instance.
  • Since we are also configuring circuit breaker that configuration is provided using filters. A fallback URI is configured here and that path is mapped to a method in Controller. This method is called as a fallback when service is not available and circuit is open.

Gateway Fallback Controller

import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/fallback")
public class GatewayFallbackController {
  @RequestMapping("/account")
    public ResponseEntity<String> getAccount() {
      System.out.println("In account fallback");
      return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                 .body("There is a problem in AccountService, please try after some time");
    }
    
  @RequestMapping("/customer")
    public ResponseEntity<String> getCustomer() {
      System.out.println("In customer fallback");
      return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                 .body("There is a problem in CustomerService, please try after some time");
    }
}

Note that @RequestMapping is used with methods too rather than using @GetMapping, @PostMapping etc. as that makes more sense with fallback methods which are neither used to fetch data nor to insert or update or delete.

Resilience4J Configuration

Circuit breaker configuration can also be prvided using code or config in this example it is provided using code.

import java.time.Duration;

import org.springframework.cloud.circuitbreaker.resilience4j.ReactiveResilience4JCircuitBreakerFactory;
import org.springframework.cloud.circuitbreaker.resilience4j.Resilience4JConfigBuilder;
import org.springframework.cloud.client.circuitbreaker.Customizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.SlidingWindowType;
import io.github.resilience4j.timelimiter.TimeLimiterConfig;

@Configuration
public class ResilienceConfig {
  @Bean
  public Customizer<ReactiveResilience4JCircuitBreakerFactory> defaultCustomizer() {
      return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
          .timeLimiterConfig(TimeLimiterConfig.custom()
              .timeoutDuration(Duration.ofMillis(2000))
              .build())
            .circuitBreakerConfig(CircuitBreakerConfig.custom()
                    .slidingWindowSize(10)
                    .slidingWindowType(SlidingWindowType.COUNT_BASED)
                    .permittedNumberOfCallsInHalfOpenState(3)
                    .failureRateThreshold(50.0F)
                    .waitDurationInOpenState(Duration.ofSeconds(5))
                    .slowCallDurationThreshold(Duration.ofMillis(200))
                    .slowCallRateThreshold(50.0F)
                    .automaticTransitionFromOpenToHalfOpenEnabled(true)
                    .minimumNumberOfCalls(5)
                    .build())
          .build());
  }
}

To get more information about the configured properties please refer this post- Spring Boot Microservice Circuit Breaker Using Resilience4j

Testing API gateway

To test API gateway please start all the services- eureka-discovery-service, customer-service, account-service and gateway-service.

Now all the interaction with the services is done using gateway-service. If we want to get customer information we'll call http://localhost:8181/customer/1 not http://localhost:8081/customer/1 same way to access account-service http://localhost:8181/account/.

Using the predicate which was configured in application.yml it will match it to the correct destination URL.

Inserting new Account

API Gateway Example

Accessing Customer information along with associated accounts.

If Customer Service is not reachable (goes to fallback)-

API Gateway with circuit breaker

If Account Service is not reachable (goes to fallback)-

You can access the circuit breaker metrics using the URL- http://localhost:8181/actuator/health Following image shows the metrics when the circuit is open because of the consecutive failed calls.

That's all for this topic Spring Boot Microservice + API Gateway + Resilience4J. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Spring Tutorial Page


Related Topics

  1. Spring Boot Microservice - Externalized Configuration With Spring Cloud Config
  2. Spring Boot Microservice - Load-Balancing With Spring Cloud LoadBalancer
  3. Spring Boot Microservice - Eureka + LoadBalancer + Feign
  4. Spring Boot Observability - Distributed Tracing, Metrics
  5. Spring Boot Event Driven Microservice With Kafka

You may also like-

  1. Spring Bean Life Cycle
  2. Connection Pooling Using C3P0 Spring Example
  3. Spring MVC - Binding List of Objects Example
  4. Spring Batch Processing With List of Objects in batchUpdate() Method
  5. Package in Java
  6. Java Lambda Expression Callable Example
  7. Abstract Class in Python
  8. Angular Event Binding With Examples

Wednesday, November 22, 2023

Apache Avro Format in Hadoop

Apache Avro file format created by Doug cutting is a data serialization system for Hadoop. Avro provides simple integration with dynamic languages. Avro implementations for C, C++, C#, Java, PHP, Python, and Ruby are available.

Avro file

Avro file has two things-

  • Data definition (Schema)
  • Data

Both data definition and data are stored together in one file. With in the Avro data there is a header, in that there is a metadata section where the schema is stored. All objects stored in the file must be written according to that schema.


Avro Schema

Avro relies on schemas for reading and writing data. Avro schemas are defined with JSON that helps in data interoperability. Schemas are composed of primitive types (null, boolean, int, long, float, double, bytes, and string) and complex types (record, enum, array, map, union, and fixed).

While defining schema you can write it in a separate file having .avsc extension.

Avro Data

Avro data is serialized and stored in binary format which makes for a compact and efficient storage. Avro data itself is not tagged with type information because the schema used to write data is always available when the data is read. The schema is required to parse data. This permits each datum to be written with no per-value overheads, making serialization both fast and small.

Avro file format

Avro specifies an object container file format. A file has a schema, and all objects stored in the file must be written according to that schema, using binary encoding.

Objects are stored in blocks that may be compressed. Synchronization markers are used between blocks to permit efficient splitting of files for MapReduce processing.

A file consists of:

  • A file header, followed by
  • one or more file data blocks

Following image shows the Avro file format.

Header Data block Data block …….

Avro file header consists of:

  1. Four bytes, ASCII 'O', 'b', 'j', followed by 1.
  2. File metadata, including the schema.
  3. The 16-byte, randomly-generated sync marker for this file.

A file header is thus described by the following schema:

{"type": "record", "name": "org.apache.avro.file.Header",
 "fields" : [
   {"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}},
   {"name": "meta", "type": {"type": "map", "values": "bytes"}},
   {"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}},
  ]
}
A file data block consists of:
  1. A long indicating the count of objects in this block.
  2. A long indicating the size in bytes of the serialized objects in the current block, after any codec is applied
  3. The serialized objects. If a codec is specified, this is compressed by that codec.
  4. The file's 16-byte sync marker.

How schema is defined in Avro

Avro schema is defined using JSON and consists of-

  1. A JSON string, naming a defined type.
  2. A JSON object, of the form: {"type": "typeName" ...attributes...}
    where typeName is either a primitive or derived type name, as defined below. Attributes not defined in this document are permitted as metadata, but must not affect the format of serialized data.
  3. A JSON array, representing a union of embedded types.

Primitive Types in Avro

Primitive types used in Avro are as follows-

  • null: no value
  • boolean: a binary value
  • int: 32-bit signed integer
  • long: 64-bit signed integer
  • float: single precision (32-bit) IEEE 754 floating-point number
  • double: double precision (64-bit) IEEE 754 floating-point number
  • bytes: sequence of 8-bit unsigned bytes
  • string: unicode character sequence
As example if you are defining field of type String
 {"name": "personName",  "type": "string"}

Complex Types in Avro

Avro supports six kinds of complex types: record, enum, array, map, union and fixed.

record- Records are defined using the type name "record" and support following attributes:

  • name- A JSON string providing the name of the record, this is a required attribute.
  • doc- A JSON string providing documentation to the user of this schema, this is an optional attribute.
  • aliases- A JSON array of strings, providing alternate names for this record, this is an optional attribute.
  • fields- A JSON array, listing fields, this is a required attribute. Each field in Record is a JSON object with the following attributes:
    • name- A JSON string providing the name of the field, this is a required attribute.
    • doc- A JSON string describing this field for users, this is an optional attribute.
    • type- A JSON object defining a schema, or a JSON string naming a record definition, this is a required attribute.
    • default- A default value for this field, used when reading instances that lack this field, this is an optional attribute.
    • order- Specifies how this field impacts sort ordering of this record, this is an optional attribute. Valid values are "ascending" (the default), "descending", or "ignore".
    • aliases- A JSON array of strings, providing alternate names for this field, this is an optional attribute.
As example schema for Person having Id, Name and Address fields.
{
 "type": "record",
 "name": "PersonRecord",
 "doc": "Person Record",
 "fields": [
  {"name":"Id",  "type":"long"},
  {"name":"Name",  "type":"string"},
  {"name":"Address",   "type":"string"}
 ]
}

enum- Enums use the type name "enum" and support the following attributes:

  • name- A JSON string providing the name of the enum, this is a required attribute. namespace, a JSON string that qualifies the name;
  • aliases- A JSON array of strings, providing alternate names for this enum, this is an optional attribute.
  • doc- a JSON string providing documentation to the user of this schema, this is an optional attribute.
  • symbols- A JSON array, listing symbols, as JSON strings, this is a required attribute. All symbols in an enum must be unique; duplicates are prohibited.
For example, four seasons can be defined as:
{ "type": "enum",
  "name": "Seasons",
  "symbols" : ["WINTER", "SPRING", "SUMMER", "AUTUMN"]
}

array- Arrays use the type name "array" and support a single attribute:

  • items- The schema of the array's items.
For example, an array of strings is declared with:
{"type": "array", "items": "string"}

map- Maps use the type name "map" and support one attribute:

  • values- The schema of the map's values.
Map keys are assumed to be strings. For example, a map from string to int is declared with:
{"type": "map", "values": "int"}

union- Unions are represented using JSON arrays. For example, ["null", "string"] declares a schema which may be either a null or string. Avro data confirming to this union should match one of the schemas represented by union.

fixed- Fixed uses the type name "fixed" and supports following attributes:

  • name- A string naming this fixed, this is a required attribute. namespace, a string that qualifies the name;
  • aliases- A JSON array of strings, providing alternate names for this enum, this is an optional attribute.
  • size- An integer, specifying the number of bytes per value, this is a required attribute.
For example, 16-byte quantity may be declared with:
{"type": "fixed", "size": 16, "name": "md5"}

Data encoding in Avro

Avro specifies two serialization encodings: binary and JSON. Most applications will use the binary encoding, as it is smaller and faster.

Reference: https://avro.apache.org/docs/1.8.2/index.html

That's all for this topic Apache Avro Format in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Parquet File Format in Hadoop
  2. Sequence File in Hadoop
  3. How to Configure And Use LZO Compression in Hadoop
  4. File Write in HDFS - Hadoop Framework Internal Steps
  5. Java Program to Read File in HDFS

You may also like-

  1. How to Check Hadoop MapReduce Logs
  2. How to Compress Intermediate Map Output in Hadoop
  3. Shuffle And Sort Phases in Hadoop MapReduce
  4. What is SafeMode in Hadoop
  5. How to Create Ubuntu Bootable USB
  6. Java Multi-Threading Interview Questions
  7. Difference Between ArrayList And CopyOnWriteArrayList in Java
  8. Invoking Getters And Setters Using Reflection - Java Program

Monday, November 20, 2023

Word Count MapReduce Program in Hadoop

The first MapReduce program most of the people write after installing Hadoop is invariably the word count MapReduce program.

That’s what this post shows, detailed steps for writing word count MapReduce program in Java, IDE used is Eclipse.

Creating and copying input file to HDFS

If you already have a file in HDFS which you want to use as input then you can skip this step.

First thing is to create a file which will be used as input and copy it to HDFS.

Let’s say you have a file wordcount.txt with the following content.

Hello wordcount MapReduce Hadoop program.
This is my first MapReduce program.

You want to copy this file to /user/process directory with in HDFS. If that path doesn’t exist then you need to create those directories first.

hdfs dfs -mkdir -p /user/process

Then copy the file wordcount.txt to this directory.

hdfs dfs -put /netjs/MapReduce/wordcount.txt /user/process 

Word count MapReduce example Java program

Now you can write your wordcount MapReduce code. WordCount example reads text files and counts the frequency of the words. Each mapper takes a line of the input file as input and breaks it into words. It then emits a key/value pair of the word (In the form of (word, 1)) and each reducer sums the counts for each word and emits a single key/value with the word and sum.

In the word count MapReduce code there is a Mapper class (MyMapper) with map function and a Reducer class (MyReducer) with a reduce function.

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
  // Map function
  public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text word = new Text();
    public void map(LongWritable key, Text value, Context context) 
           throws IOException, InterruptedException {
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\\s+");
      for (String str : stringArr) {
        word.set(str);
        context.write(word, new IntWritable(1));
      }           
    }
  }
    
  // Reduce function
  public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{        
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
            throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
  public static void main(String[] args)  throws Exception{
    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "WC");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(MyMapper.class);    
    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Required jars for Hadoop MapReduce code

You will also need to add at least the following Hadoop jars so that your code can compile. You will find these jars inside the /share/hadoop directory of your Hadoop installation. With in /share/hadoop path look in hdfs, mapreduce and common directories for required jars.

 
hadoop-common-2.9.0.jar
hadoop-hdfs-2.9.0.jar
hadoop-hdfs-client-2.9.0.jar
hadoop-mapreduce-client-core-2.9.0.jar
hadoop-mapreduce-client-common-2.9.0.jar
hadoop-mapreduce-client-jobclient-2.9.0.jar
hadoop-mapreduce-client-hs-2.9.0.jar
hadoop-mapreduce-client-app-2.9.0.jar
commons-io-2.4.jar

Creating jar of your wordcount MapReduce code

Once you are able to compile your code you need to create jar file. In the eclipse IDE righ click on your Java program and select Export – Java – jar file.

Running the MapReduce code

You can use the following command to run the program. Assuming you are in your hadoop installation directory.

bin/hadoop jar /netjs/MapReduce/wordcount.jar org.netjs.WordCount  /user/process /user/out

Explanation for the arguments passed is as follows-

/netjs/MapReduce/wordcount.jar is the path to your jar file.

org.netjs.WordCount is the fully qualified path to your Java program class.

/user/process – path to input directory.

/user/out – path to output directory.

One your word count MapReduce program is succesfully executed you can verify the output file.

 
hdfs dfs -ls /user/out

Found 2 items
-rw-r--r--   1 netjs supergroup          0 2018-02-27 13:37 /user/out/_SUCCESS
-rw-r--r--   1 netjs supergroup         77 2018-02-27 13:37 /user/out/part-r-00000

As you can see Hadoop framework creates output files using part-r-xxxx format. Since only one reducer is used here so there is only one output file part-r-00000. You can see the content of the file using the following command.

 
hdfs dfs -cat /user/out/part-r-00000

Hadoop      1
Hello       1
MapReduce   2
This        1
first       1
is          1
my          1
program.    2
wordcount   1

That's all for this topic Word Count MapReduce Program in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Introduction to Hadoop Framework
  2. MapReduce Flow in YARN
  3. Speculative Execution in Hadoop
  4. What is HDFS
  5. How to Compress Intermediate Map Output in Hadoop

You may also like-

  1. Replica Placement Policy in Hadoop Framework
  2. Data Locality in Hadoop
  3. Java Program to Read File in HDFS
  4. Data Compression in Hadoop
  5. YARN in Hadoop
  6. Uber Mode in Hadoop
  7. How to Create Immutable Class in Java
  8. Lambda Expressions in Java 8

Saturday, November 18, 2023

Capacity Scheduler in YARN

In the post YARN in Hadoop we have already seen that it is the scheduler component of the ResourceManager which is responsible for allocating resources to the running jobs. The scheduler component is pluggable in Hadoop and there are two options capacity scheduler and fair scheduler. This post talks about the capacity scheduler in YARN, its benefits and how capacity scheduler can be configured in Hadoop cluster.


YARN Capacity scheduler

Capacity scheduler in YARN allows multi-tenancy of the Hadoop cluster where multiple users can share the large cluster.

Every organization having their own private cluster leads to a poor resource utilization. An organization may provide enough resources in the cluster to meet their peak demand but that peak demand may not occur that frequently, resulting in poor resource utilization at rest of the time.

Thus sharing cluster among organizations is a more cost effective idea. However, organizations are concerned about sharing a cluster because they are worried that they may not get enough resources at the time of peak utilization. The CapacityScheduler in YARN mitigates that concern by giving each organization capacity guarantees.

Capacity scheduler in YARN functionality

Capacity scheduler in Hadoop works on the concept of queues. Each organization gets its own dedicated queue with a percentage of the total cluster capacity for its own use. For example if there are two organizations sharing the cluster, one organization may be given 60% of the cluster capacity where as the organization is given 40%.

On top of that, to provide further control and predictability on sharing of resources, the CapacityScheduler supports hierarchical queues. Organization can further divide its allocated cluster capacity into separate sub-queues for separate set of users with in the organization.

Capacity scheduler is also flexible and allows allocation of free resources to any queue beyond its capacity. This provides elasticity for the organizations in a cost-effective manner. When the queue to which these resources actually belongs has increased demand the resources are allocated to it when those resources are released from other queues.

Capacity scheduler in YARN configuration

To configure the ResourceManager to use the CapacityScheduler, set the following property in the conf/yarn-site.xml:

yarn.resourcemanager.scheduler.class- org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler 
For setting up queues in CapacityScheduler you need to make changes in etc/hadoop/capacity-scheduler.xml configuration file.

The CapacityScheduler has a predefined queue called root. All queues in the system are children of the root queue.

Setting up further queues- Configure property yarn.scheduler.capacity.root.queues with a list of comma-separated child queues.

Setting up sub-queues with in a queue- configure property yarn.scheduler.capacity.<queue-path>.queues
Here queue-path is the full path of the queue’s hierarchy, starting at root, with . (dot) as the delimiter.

Capacity of the queue- Configure property yarn.scheduler.capacity.<queue-path>.capacity
Queue capacity is provided in percentage (%). The sum of capacities for all queues, at each level, must be equal to 100. Applications in the queue may consume more resources than the queue’s capacity if there are free resources, providing elasticity.

Capacity scheduler queue configuration example

If there are two child queues starting from root XYZ and ABC. XYZ further divides the queue into technology and development. XYZ is given 60% of the cluster capacity and ABC is given 40%.

<property>
  <name>yarn.scheduler.capacity.root.queues</name>
  <value>XYZ, ABC</value>
</property>
<property>
  <name>yarn.scheduler.capacity.root.XYZ.queues</name>
  <value>technology,marketing</value>
</property>
<property>
  <name>yarn.scheduler.capacity.root.XYZ.capacity</name>
  <value>60</value>
</property>
<property>
  <name>yarn.scheduler.capacity.root.ABC.capacity</name>
  <value>40</value>
</property>

If you want to limit the elasticity for applications in the queue. Restricting XYZ's elasticity to 80% so that it doesn't use more than 80% of the total cluster capacity even if resources are available. In other words ABC has 20% to start with immediately.

<property>
  <name>yarn.scheduler.capacity.root.XYZ.maximum-capacity</name>
  <value>80</value>
</property>
For the two sub-queues of XYZ, you want to allocate 70% of the allocated queue capacity to technology and 30% to marketing.
<property>
  <name>yarn.scheduler.capacity.root.XYZ.technology.capacity</name>
  <value>70</value>
</property>

<property>
  <name>yarn.scheduler.capacity.root.XYZ.marketing.capacity</name>
  <value>30</value>
</property>

Reference: https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html

That's all for this topic Capacity Scheduler in YARN. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Introduction to Hadoop Framework
  2. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  3. Replica Placement Policy in Hadoop Framework
  4. Speculative Execution in Hadoop
  5. MapReduce Flow in YARN

You may also like-

  1. HDFS Federation in Hadoop Framework
  2. What is SafeMode in Hadoop
  3. Java Program to Read File in HDFS
  4. Data Compression in Hadoop
  5. Uber Mode in Hadoop
  6. CopyOnWriteArrayList in Java
  7. Type Casting in Java
  8. Getting All The Schemas in a DB - Java Program

Friday, November 17, 2023

Fair Scheduler in YARN

In the post YARN in Hadoop we have already seen that it is the scheduler component of the ResourceManager which is responsible for allocating resources to the running jobs. The scheduler component is pluggable in Hadoop and there are two options for scheduler- capacity scheduler and fair scheduler. This post talks about the fair scheduler in YARN, its benefits and how fair scheduler can be configured in Hadoop cluster.

YARN Fair Scheduler

Fair scheduler in YARN allocates resources to applications in such a way that all apps get, on average, an equal share of resources over time. By default, the Fair Scheduler bases scheduling fairness decisions only on memory. It can be configured to schedule with both memory and CPU, in the form (X mb, Y vcores).

When there is a single app running, that app uses the entire Hadoop cluster. When other apps are submitted, they don't have to wait for the running app to finish, resources that free up are assigned to the new apps, so that each app eventually gets roughly the same amount of resources.


Queues in Fair Scheduler

Fair scheduler organizes apps further into “queues”, and shares resources fairly between these queues.

A default queue can be configured to be used by users. If an app specifically lists a queue in a container resource request, the request is submitted to that queue. There are also options to configure queue placement policy based on user names, primary group of the user, secondary group of the user.

Within each queue, a separate scheduling policy can be used to share resources between the running apps. The default is memory-based fair sharing, but FIFO and multi-resource with Dominant Resource Fairness can also be configured.

As example- If there are two queues A and B, where queue A uses fair scheduling policy and queue B uses FIFO policy then the jobs submitted to queue A will use resources fairly where as jobs submitted to queue B will use the resources on First come basis.

Hierarchical Queues in Fair Scheduler

The fair scheduler in YARN supports hierarchical queues which means sub-queues can be created with in a dedicated queue. All queues descend from a queue named “root”.

A queue’s name starts with the names of its parents, with periods as separators. So a queue named “parent1” under the root queue, would be referred to as “root.parent1”, and a queue named “queue2” under a queue named “parent1” would be referred to as “root.parent1.queue2”.

YARN fair scheduler configuration

To use the Fair Scheduler first assign the appropriate scheduler class in yarn-site.xml:

<property>
  <name>yarn.resourcemanager.scheduler.class</name>
  <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property> 

For setting up queues, changes are done in the configuration file etc/hadoop/fair, some of the configured elements are as follows-

1- queue element- Represent queues, queue elements can take an optional attribute ‘type’, which when set to ‘parent’ makes it a parent queue. Each queue element may contain the following properties:
  • minResources: minimum resources the queue is entitled to, in the form “X mb, Y vcores”.
  • maxResources: maximum resources a queue is allocated, expressed either in absolute values (X mb, Y vcores) or as a percentage of the cluster resources (X% memory, Y% cpu).
  • maxChildResources: maximum resources any child queue is allocated, expressed either in absolute values (X mb, Y vcores) or as a percentage of the cluster resources (X% memory, Y% cpu).
  • weight: to share the cluster non-proportionally with other queues. Weights default to 1, and a queue with weight 2 should receive approximately twice as many resources as a queue with the default weight.
  • schedulingPolicy: to set the scheduling policy of any queue. The allowed values are “fifo”/“fair”/“drf” or any class that extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy. Defaults to “fair”.

2- queueMaxResourcesDefault element: which sets the default max resource limit for queue; overridden by maxResources element in each queue.

3- defaultQueueSchedulingPolicy element: which sets the default scheduling policy for queues; overridden by the schedulingPolicy element in each queue if specified. Defaults to “fair”.

4- queuePlacementPolicy element: which contains a list of rule elements that tell the scheduler how to place incoming apps into queues. Rules are applied in the order that they are listed. All rules accept the “create” argument, which indicates whether the rule can create a new queue. “Create” defaults to true; if set to false and the rule would place the app in a queue that is not configured in the allocations file, we continue on to the next rule. Valid rules are:
  • specified: the app is placed into the queue it requested.
  • user: the app is placed into a queue with the name of the user who submitted it.
  • primaryGroup: the app is placed into a queue with the name of the primary group of the user who submitted it.
  • secondaryGroupExistingQueue: the app is placed into a queue with a name that matches a secondary group of the user who submitted it.
  • nestedUserQueue: the app is placed into a queue with the name of the user under the queue suggested by the nested rule.
  • default: the app is placed into the queue specified in the ‘queue’ attribute of the default rule.
  • reject: the app is rejected.

Fair scheduler queue configuration example

If there are two child queues starting from root XYZ and ABC. XYZ queue is further divided into two child queues technology and development.

<?xml version="1.0"?>
<allocations>
  <queue name="ABC">
    <minResources>10000 mb,10vcores</minResources>
    <maxResources>60000 mb,30vcores</maxResources>
    <weight>2.0</weight>
    <schedulingPolicy>fair</schedulingPolicy>
  </queue> 
  
<queue name="XYZ">
      <minResources>20000 mb,0vcores</minResources>
      <maxResources>80000 mb,0vcores</maxResources>
      <weight>3.0</weight>
      <schedulingPolicy>fifo</schedulingPolicy>
      <queue name="technology" />
      <queue name="development" />
</queue>
<queueMaxResourcesDefault>40000 mb,20vcores</queueMaxResourcesDefault>

<queuePlacementPolicy>
    <rule name="specified" />
    <rule name="primaryGroup" create="false" />
    <rule name="default" queue="ABC"/>
  </queuePlacementPolicy>
</allocations>

Reference: https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/FairScheduler.html

That's all for this topic Fair Scheduler in YARN. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Introduction to Hadoop Framework
  2. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  3. Uber Mode in Hadoop
  4. Replica Placement Policy in Hadoop Framework
  5. MapReduce Flow in YARN

You may also like-

  1. File Write in HDFS - Hadoop Framework Internal Steps
  2. HDFS High Availability
  3. HDFS Commands Reference List
  4. Word Count MapReduce Program in Hadoop
  5. Compressing File in bzip2 Format in Hadoop - Java Program
  6. Synchronization in Java Multi-Threading
  7. Java Collections Interview Questions
  8. How to Create Password Protected Zip File in Java

Thursday, November 16, 2023

YARN in Hadoop

YARN (Yet Another Resource Negotiator) is the cluster resource management and job scheduling layer of Hadoop. YARN is introduced in Hadoop 2.x version to address the scalability issues in MRv1. It also decouples resource management and data processing components making it possible for other distributed data processing engines to run on Hadoop cluster.

This YARN tutorial gives an insight into Apache Hadoop YARN architecture, how resource management is done in YARN and what advantages YARN in Hadoop provides over the classic MapReduce version1.


Problems with Hadoop MRv1

  • Scalability issues- In MapReduce 1 there were two processes responsible for job execution. JobTracker was responsible for resource management, scheduling the job, monitoring the progress of the task where as TaskTracker was responsible for running the assigned map and reduce tasks. This over dependence on single process JobTracker created performance bottlenecks and scalability issues in large clusters running a lots of applications.
  • Cluster resource utilization- In Hadoop 1.0 map slots and reduce slots were configured in each node using the following parameters-
    mapreduce.tasktracker.map.tasks.maximum
    mapreduce.tasktracker.reduce.tasks.maximum
    
    Because of this static configuration there was no inter change possible. If more map slots were needed on a node where all the map slots were in use, even if there were free reduce slots those can't be used as map slots and vice versa.
  • Tightly coupled with MapReduce- Hadoop 1.0 design tightly coupled it with batch processing MapReduce framework. The design was not abstracting resource management and storage (HDFS) in such a way that different processing engines (like spark, tez) could also run on the Hadoop cluster.

Hadoop Yarn architecture

In Hadoop YARN the functionalities of resource management and job scheduling/monitoring are split into separate daemons. There is a global ResourceManager to manage the cluster resources and per-application ApplicationMaster to manage the application tasks.

The major components of YARN in Hadoop are as follows-

  • ResourceManager- This is the master process in YARN performing the job of accepting jobs, scheduling those jobs and allocating required resources to the jobs. There are two components of ResourceManager performing these tasks.
    • Scheduler- The Scheduler is responsible for allocating resources to the various running applications.
    • ApplicationsManager- The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure.
  • NodeManager- The NodeManager daemon runs on each cluster node. NodeManager is responsible for containers running on the node, monitoring their resource usage (CPU, memory, disk, network) and reporting the same to the ResourceManager/Scheduler.
  • ApplicationMaster- The ApplicationMaster process is application specific which means for a MapReduce job there will be a MapReduce specific ApplicationMaster, for a Spark job there will be a Spark specific ApplicationMaster.
    ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress.

How YARN addresses the issues in Hadoop 1.x

We already discussed the problems in MRv1 like scalability, no optimum resource utilization and design tightly coupled with MapReduce processing. Now when you have a good idea of YARN architecture lets see how YARN addresses these issues.

  • Scalabitlity- In YARN cluster resource management is done by ResourceManager and application tasks are managed by ApplicationMaster, rather than having a single process JobTracker handling all these tasks. That makes YARN more scalable.
  • Resource utilization- In Hadoop YARN there are no pre-configured map and reduce slots instead there is a concept of containers that are negotiated by the ApplicationMaster as per the needs of the submitted application.
    The ApplicationMaster is application specific so a MapReduce ApplicationMaster will request for containers to run MapReduce job where as Spark ApplicationMaster will request containers for running Spark tasks.
  • Loosely coupled design- YARN is designed in such a way that it decouples cluster resource management and job scheduling from data processing component. In a Hadoop cluster now storage layer, resource management and job scheduling layer and distributed data processing applications are separate independent layers. That makes it possible to run Applications other than MapReduce on Hadoop cluster.
YARN in Hadoop

Application flow in Hadoop YARN

Once an application is submitted its first stop is ResourceManager. Scheduler component of the ResourceManager schedules the application to run, where as ApplicationsManager component of the ResourceManager will negotiate the first container for the application where the application specific ApplicationMaster starts running.

It is the responsibility of ApplicationMaster to negotiate more resources from ResourceManager for running the application tasks. These containers will be allocated on different nodes in the cluster.

ApplicationMaster will communicate with the NodeManagers running on the nodes where the containers are allocated to launch the containers on those nodes. NodeManager monitors the container's resource usage (cpu, memory, disk, network) and report the same to the ResourceManager/Scheduler.

Following image shows the communication among the ResourceManager, NodeManager and ApplicationMaster processes. Here two applications are submitted one MapReduce and another Spark so that there are two application specific ApplicationMaster processes started.

Hadoop YARN flow

That's all for this topic YARN in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Introduction to Hadoop Framework
  2. Capacity Scheduler in YARN
  3. Fair Scheduler in YARN
  4. Uber Mode in Hadoop
  5. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode

You may also like-

  1. HDFS High Availability
  2. Word Count MapReduce Program in Hadoop
  3. Data Compression in Hadoop
  4. HDFS Commands Reference List
  5. Speculative Execution in Hadoop
  6. Varargs in Java
  7. ConcurrentHashMap in Java
  8. Heap Memory Allocation in Java