I have a problem with mapreduce. Giving as input a list of song ("Songname"#"UserID"#"boolean") i must have as result a song list in which is specified how many time different useres listen them... so a  output ("Songname","timelistening").
I used hashtable to allow only one couple .
With short files it works well but when I put as input a list about 1000000 of records it returns me the same value (20) for all records.
This is my mapper:
    public static class CanzoniMapper extends Mapper<Object, Text, Text, IntWritable>{
    private IntWritable userID = new IntWritable(0);
    private Text song = new Text();
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        /*StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }*/
        String[] caratteri = value.toString().split("#");
        if(caratteri[2].equals("1")){
            song.set(caratteri[0]);
            userID.set(Integer.parseInt(caratteri[1]));
            context.write(song,userID);
        }
    }
  }
This is my reducer:
public static class CanzoniReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      Hashtable<IntWritable,Text> doppioni = new Hashtable<IntWritable,Text>();
      for (IntWritable val : values) {
        doppioni.put(val,key);
      }
      result.set(doppioni.size());
      //doppioni.clear();
      context.write(key,result);
    }
  }
and main:
Configuration conf = new Configuration();
    Job job = new Job(conf, "word count");
    job.setJarByClass(Canzoni.class);
    job.setMapperClass(CanzoniMapper.class);
    //job.setCombinerClass(CanzoniReducer.class);
    //job.setNumReduceTasks(2);
    job.setReducerClass(CanzoniReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
Any idea???