Skip to content

Add bad nm job #3058

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package datawave.ingest.mapreduce.job.validation;

import java.io.IOException;
import java.net.InetAddress;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class BadNodeManagerJob extends Configured implements Tool {

public static class FailOnSumulatedBadHostsMapper extends Mapper<LongWritable,Text,Text,Text> {
private List<String> badHosts;

@Override
protected void setup(Context context) {
badHosts = List.of(context.getConfiguration().get("bad.host").split(","));
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String hostname = InetAddress.getLocalHost().getHostName();
if (badHosts.contains(hostname)) {
Thread.sleep(5000);
throw new RuntimeException("Failing on bad host: " + hostname);
} else {
Thread.sleep(30000);
context.write(new Text("OK"), new Text("1"));
}
}
}

public static class FaileOnSimulatedBadHostsReducer extends Reducer<Text,Text,Text,Text> {
private List<String> badHosts;

@Override
protected void setup(Context context) {
badHosts = List.of(context.getConfiguration().get("bad.host").split(","));
}

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String hostname = InetAddress.getLocalHost().getHostName();
System.out.println(hostname);
if (badHosts.contains(hostname)) {
Thread.sleep(5000);
throw new RuntimeException("Reducer failing on bad host: " + hostname);
} else {
Thread.sleep(30000);
context.write(key, new Text("done"));
}
}
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();

conf.set("mapreduce.map.memory.mb", "50");
conf.set("mapreduce.reduce.memory.mb", "50");
conf.set("mapreduce.map.java.opts", "-Xmx40m");
conf.set("mapreduce.reduce.java.opts", "-Xmx40m");

conf.set("bad.host.list", args[0]);

Job job = Job.getInstance(conf, "BadNodeManagerJob");
job.setJarByClass(BadNodeManagerJob.class);

job.setMapperClass(FailOnSumulatedBadHostsMapper.class);
job.setReducerClass(FaileOnSimulatedBadHostsReducer.class);

conf.set("datawave.bad.nodemanager.tasks", args[1]);
job.setNumReduceTasks(Integer.parseInt(args[1]));

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(FixedMapperInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

// Dummy Output Path
TextOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path("/tmp/output-" + System.currentTimeMillis()));

return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new BadNodeManagerJob(), args);
System.exit(res);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package datawave.ingest.mapreduce.job.validation;

import java.io.DataInput;
import java.io.DataOutput;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class FixedMapperInputFormat extends InputFormat<LongWritable,Text> {
public static class DummyRecordReader extends RecordReader<LongWritable,Text> {
private boolean read = false;
private long key = 0;

@Override
public void initialize(InputSplit split, TaskAttemptContext context) {}

@Override
public boolean nextKeyValue() {
if (read)
return false;
read = true;
return true;
}

@Override
public LongWritable getCurrentKey() {
return new LongWritable(key++);
}

@Override
public Text getCurrentValue() {
return new Text("dummy");
}

@Override
public float getProgress() {
return read ? 1.0f : 0.0f;
}

@Override
public void close() {}
}

public static class DummyInputSplit extends InputSplit implements Writable {
@Override
public long getLength() {
return 1;
}

@Override
public String[] getLocations() {
return new String[0];
}

@Override
public void write(DataOutput out) {}

@Override
public void readFields(DataInput in) {}
}

@Override
public List<InputSplit> getSplits(JobContext context) {
List<InputSplit> splits = new ArrayList<>();
for (int i = 0; i < context.getConfiguration().getInt("datawave.bad.nodemanager.tasks", 1); i++) {
splits.add(new DummyInputSplit());
}
return splits;
}

@Override
public RecordReader<LongWritable,Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new DummyRecordReader();
}
}