import java.io.IOException import java.io.DataInput import java.io.DataOutput import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.io.Text import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import kotlin.collections.Map.Entry /** * This class handles in- and output issues for a set of bindings of variables to their correponding values. */ class BindingsWritable : Writable, Iterable>, Cloneable { /** * The map in which the variable/value-pairs are hold in main memory */ private val bindings = HashMap() /** * Returns the set of variables, which are bound * @return the set of variables, which are bound */ val variables: Set get() = this.bindings.keys /** * Returns the bound value of a given variable * @param variable the variable the bound value of which will be returned * @return the bound value of the given variable, or null if the variable is not bound */ operator fun get(variable: String): String? = this.bindings[variable] /** * This methods binds a given variable to the given value * @param variable the variable to bind * @param value the value to which the variable is bound */ fun put(variable: String, value: String) { this.bindings[variable] = value } /** * Returns the number of bound variables * @return number of bound variables */ fun size(): Int = this.bindings.size /** * Returns the set of variable/value-pairs * @return the set of variable/value-pairs */ fun getBindings(): Set> = this.bindings.entries override fun iterator(): Iterator> = this.getBindings().iterator() /** * Provides a pretty-printing of the variable/value-pairs * @return a pretty-printing of the variable/value-pairs */ override fun toString(): String { val resultBuilder = StringBuilder() resultBuilder.append("{") var firstTime = true for ((key, value) in this.bindings) { if (firstTime) { firstTime = false } else { resultBuilder.append(", ") } resultBuilder.append(key) resultBuilder.append(" = ") resultBuilder.append(value) } resultBuilder.append("}") return resultBuilder.toString() } /** * Creates a clone of this object * @return a clone of this object */ override public fun clone(): BindingsWritable { val clone = BindingsWritable() for ((key, value) in this) { clone.put(key, value) } return clone } /** * Removes all variable/value-pairs */ fun clear() { this.bindings.clear() } /** * For reading the fields of this object * @param in the data input from which is read */ override fun readFields(input: DataInput) { this.clear() val size = input.readInt() for (i in 0 until size) { val variable = input.readUTF() val value = input.readUTF() this.bindings[variable] = value } } /** * For writing the field of this object * @param out the data out to which is written */ override fun write(out: DataOutput) { out.writeInt(this.bindings.size) for ((key, value) in bindings) { out.writeUTF(key) out.writeUTF(value) } } companion object { /** * A static method for creating a new BindingsWritable object and reading in its fields * @param in the data input from which is read * @return the newly created BindingsWritable object * @throws IOException */ @JvmStatic fun read(input: DataInput): BindingsWritable { val w = BindingsWritable() w.readFields(input) return w } } } /** * This Mapper processes input triples, checks if a given triple pattern matches, binds the corresponding variables and writes these bindings to the context */ open class TriplePatternMapper /** * The constructor to initialize this TriplePatternMapper with a given triple pattern * @param pattern the pattern of the given triple pattern * @throws Exception */ protected constructor( /** * The pattern of the triple pattern. * It must have the length 3 containing strings with RDF terms and variables (the latter starting with "?") */ private val pattern: Array ) : Mapper() { init { if (pattern.size != 3) { throw Exception("Invalid input!") } } /** * The main map method to check if a triple pattern matches and binding the corresponding variables to the values from the input */ override fun map(key: Any, value: Text, context: Context) { // input is given as a text line of comma separated values of the subject, predicate and object of the current triple val csv = value.toString().split(",") if (csv.size != 3) { throw RuntimeException("Invalid input!") } val bindings = TriplePatternMapper.matches(this.pattern, csv) if (bindings != null) { // write the bound variables with their values... context.write(TriplePatternMapper.outkey, TriplePatternMapper.bindings) } } companion object { private val outkey = Text() // reusable key private val bindings = BindingsWritable() // reusable bindings /** * Checks whether or not a given triple pattern matches the current triple * @param pattern the given triple pattern * @param csv the current triple * @return Bound variables with their values, or null if the triple pattern does not match */ @JvmStatic fun matches(pattern: Array, csv: List): BindingsWritable? { // reusable objects must be always cleared before reuse... TriplePatternMapper.bindings.clear() var matches = true for (i in 0..2) { if (pattern[i].startsWith("?")) { // bind variable to the value from the input TriplePatternMapper.bindings.put(pattern[i], csv[i]) } else { if (csv[i].compareTo(pattern[i]) != 0) { // triple pattern does not match! matches = false break } } } return if (matches) TriplePatternMapper.bindings else null } } } /** * This class implements the triple pattern ?person foaf:name "Magic Sets" */ class TriplePatternOfQuery1 /** * Constructor calling the constructor of its super class with the parameters necessary for processing the triple pattern ?person foaf:name "Magic Sets" * @throws Exception */ protected constructor() : TriplePatternMapper(arrayOf("?person", "", "\"Magic Sets\"")) /** * Main program to run the query with the triple pattern ?person foaf:name "Magic Sets". * The input path to the rdf data given as comma separated values (csv) must be given as well as the output path, where the result is stored. * @param args the input and output path must be given * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ fun main(args: Array) { val inputPath = Path(args[0]) val outputDir = Path(args[1]) // Create configuration val conf = Configuration(true) // Create job val job = Job.getInstance() job.setJarByClass(BindingsWritable::class.java) job.setJobName("Query 1") // Setup MapReduce job.setMapperClass(TriplePatternOfQuery1::class.java) job.setNumReduceTasks(0) // Specify key / value job.setOutputKeyClass(Text::class.java) job.setOutputValueClass(BindingsWritable::class.java) // Input FileInputFormat.addInputPath(job, inputPath) job.setInputFormatClass(TextInputFormat::class.java) // Output FileOutputFormat.setOutputPath(job, outputDir) job.setOutputFormatClass(TextOutputFormat::class.java) // Delete output if exists val hdfs = FileSystem.get(conf) if (hdfs.exists(outputDir)) hdfs.delete(outputDir, true) // Execute job val code = if (job.waitForCompletion(true)) 0 else 1 System.exit(code) }