How to simulate inner join on very large files in java (without running out of memory)

Posted by Constantin on Programmers See other posts from Programmers or by Constantin
Published on 2013-09-06T13:24:59Z Indexed on 2014/06/05 3:38 UTC
Read the original article Hit count: 316

Filed under:
|

I am trying to simulate SQL joins using java and very large text files (INNER, RIGHT OUTER and LEFT OUTER). The files have already been sorted using an external sort routine.

The issue I have is I am trying to find the most efficient way to deal with the INNER join part of the algorithm. Right now I am using two Lists to store the lines that have the same key and iterate through the set of lines in the right file once for every line in the left file (provided the keys still match). In other words, the join key is not unique in each file so would need to account for the Cartesian product situations ...

left_01, 1
left_02, 1
right_01, 1
right_02, 1
right_03, 1

left_01 joins to right_01 using key 1
left_01 joins to right_02 using key 1
left_01 joins to right_03 using key 1
left_02 joins to right_01 using key 1
left_02 joins to right_02 using key 1
left_02 joins to right_03 using key 1

My concern is one of memory. I will run out of memory if i use the approach below but still want the inner join part to work fairly quickly. What is the best approach to deal with the INNER join part keeping in mind that these files may potentially be huge

public class Joiner {

  private void join(BufferedReader left, BufferedReader right, BufferedWriter output) throws Throwable {

    BufferedReader _left = left;
    BufferedReader _right = right;
    BufferedWriter _output = output;
    Record _leftRecord;
    Record _rightRecord;

    _leftRecord = read(_left);
    _rightRecord = read(_right);

    while( _leftRecord != null && _rightRecord != null ) {

      if( _leftRecord.getKey() < _rightRecord.getKey() ) {
        write(_output, _leftRecord, null);
        _leftRecord = read(_left);
      }
      else if( _leftRecord.getKey() > _rightRecord.getKey() ) {
        write(_output, null, _rightRecord);
        _rightRecord = read(_right);
      }
      else {
        List<Record> leftList = new ArrayList<Record>();
        List<Record> rightList = new ArrayList<Record>();

        _leftRecord = readRecords(leftList, _leftRecord, _left);
        _rightRecord = readRecords(rightList, _rightRecord, _right);

        for( Record equalKeyLeftRecord : leftList ){
          for( Record equalKeyRightRecord : rightList ){
            write(_output, equalKeyLeftRecord, equalKeyRightRecord);
          }
        }
      }
    }

    if( _leftRecord != null ) {
      write(_output, _leftRecord, null);
      _leftRecord = read(_left);
      while(_leftRecord != null) {
        write(_output, _leftRecord, null);
        _leftRecord = read(_left);        
      }
    }
    else {  
      if( _rightRecord != null ) {
        write(_output, null, _rightRecord);
        _rightRecord = read(_right);
        while(_rightRecord != null) {
          write(_output, null, _rightRecord);
          _rightRecord = read(_right);
        }
      }     
    }
    _left.close();
    _right.close();
    _output.flush();
    _output.close();
  }

  private Record read(BufferedReader reader) throws Throwable {
    Record record = null;

    String data = reader.readLine();
    if( data != null ) {
      record = new Record(data.split("\t"));
    }
    return record;
  }

  private Record readRecords(List<Record> list, Record record, BufferedReader reader) throws Throwable {
    int key = record.getKey();
    list.add(record);
    record = read(reader);
    while( record != null && record.getKey() == key) {
      list.add(record);
      record = read(reader);
    }
    return record;    
  }

  private void write(BufferedWriter writer, Record left, Record right) throws Throwable {   
    String leftKey = (left == null ? "null" : Integer.toString(left.getKey()));
    String leftData = (left == null ? "null" : left.getData());
    String rightKey = (right == null ? "null" : Integer.toString(right.getKey()));
    String rightData = (right == null ? "null" : right.getData());

    writer.write("[" + leftKey + "][" + leftData + "][" + rightKey + "][" + rightData + "]\n");
  }

  public static void main(String[] args) {
        try {

          BufferedReader leftReader = new BufferedReader(new FileReader("LEFT.DAT"));
          BufferedReader rightReader = new BufferedReader(new FileReader("RIGHT.DAT"));
          BufferedWriter output = new BufferedWriter(new FileWriter("OUTPUT.DAT"));

      Joiner joiner = new Joiner();
      joiner.join(leftReader, rightReader, output);
    } 
        catch (Throwable e) {
      e.printStackTrace();
    }
  }
}

After applying the ideas from the proposed answer, I changed the loop to this

private void join(RandomAccessFile left, RandomAccessFile right, BufferedWriter output) throws Throwable {

    long _pointer = 0;

    RandomAccessFile _left = left;
    RandomAccessFile _right = right;

    BufferedWriter _output = output;

    Record _leftRecord;
    Record _rightRecord;

    _leftRecord = read(_left);
    _rightRecord = read(_right);

    while( _leftRecord != null && _rightRecord != null ) {

        if( _leftRecord.getKey() < _rightRecord.getKey() ) {
            write(_output, _leftRecord, null);
            _leftRecord = read(_left);
        }
        else if( _leftRecord.getKey() > _rightRecord.getKey() ) {
            write(_output, null, _rightRecord);             
            _pointer = _right.getFilePointer();
            _rightRecord = read(_right);
        }
        else {          
            long _tempPointer = 0;
            int key = _leftRecord.getKey();                                                         
            while( _leftRecord != null && _leftRecord.getKey() == key ) {
                _right.seek(_pointer);
                _rightRecord = read(_right);
                while( _rightRecord != null && _rightRecord.getKey() == key ) {
                    write(_output, _leftRecord, _rightRecord );
                    _tempPointer = _right.getFilePointer();
                    _rightRecord = read(_right); 
                }
                _leftRecord = read(_left);
            }
            _pointer = _tempPointer;
        }
    }

    if( _leftRecord != null ) {
        write(_output, _leftRecord, null);
        _leftRecord = read(_left);
        while(_leftRecord != null) {
            write(_output, _leftRecord, null);
            _leftRecord = read(_left);              
        }
    }
    else {  
        if( _rightRecord != null ) {
            write(_output, null, _rightRecord);
            _rightRecord = read(_right);
            while(_rightRecord != null) {
                write(_output, null, _rightRecord);
                _rightRecord = read(_right);
            }
        }           
    }
    _left.close();
    _right.close();
    _output.flush();
    _output.close();
}

UPDATE While this approach worked, it was terribly slow and so I have modified this to create files as buffers and this works very well. Here is the update ...

private long getMaxBufferedLines(File file) throws Throwable {
    long freeBytes = Runtime.getRuntime().freeMemory() / 2;     
    return (freeBytes / (file.length() / getLineCount(file)));
}

private void join(File left, File right, File output, JoinType joinType) throws Throwable {

    BufferedReader leftFile = new BufferedReader(new FileReader(left));
    BufferedReader rightFile = new BufferedReader(new FileReader(right));
    BufferedWriter outputFile = new BufferedWriter(new FileWriter(output));
    long maxBufferedLines = getMaxBufferedLines(right);

    Record leftRecord;
    Record rightRecord;

    leftRecord = read(leftFile);
    rightRecord = read(rightFile);

    while( leftRecord != null && rightRecord != null ) {

        if( leftRecord.getKey().compareTo(rightRecord.getKey()) < 0) {
            if( joinType == JoinType.LeftOuterJoin 
                    || joinType == JoinType.LeftExclusiveJoin
                    || joinType == JoinType.FullExclusiveJoin
                    || joinType == JoinType.FullOuterJoin ) { 
                write(outputFile, leftRecord, null);
            }
            leftRecord = read(leftFile);
        }
        else if( leftRecord.getKey().compareTo(rightRecord.getKey()) > 0 ) {
            if( joinType == JoinType.RightOuterJoin 
                    || joinType == JoinType.RightExclusiveJoin
                    || joinType == JoinType.FullExclusiveJoin
                    || joinType == JoinType.FullOuterJoin ) {
                write(outputFile, null, rightRecord);
            }
            rightRecord = read(rightFile);
        }
        else if( leftRecord.getKey().compareTo(rightRecord.getKey()) == 0 ) {
            String key = leftRecord.getKey(); 
            List<File> rightRecordFileList = new ArrayList<File>();
            List<Record> rightRecordList = new ArrayList<Record>();
            rightRecordList.add(rightRecord);
            rightRecord = consume(key, rightFile, rightRecordList, rightRecordFileList, maxBufferedLines);

            while( leftRecord != null && leftRecord.getKey().compareTo(key) == 0 ) {
                processRightRecords(outputFile, leftRecord, rightRecordFileList, rightRecordList, joinType);
                leftRecord = read(leftFile);
            }

            // need a dispose for deleting files in list
        }
        else {
            throw new Exception("DATA IS NOT SORTED");
        }
    }

    if( leftRecord != null ) {
        if( joinType == JoinType.LeftOuterJoin 
                || joinType == JoinType.LeftExclusiveJoin
                || joinType == JoinType.FullExclusiveJoin
                || joinType == JoinType.FullOuterJoin ) {
            write(outputFile, leftRecord, null);
        }
        leftRecord = read(leftFile);
        while(leftRecord != null) {
            if( joinType == JoinType.LeftOuterJoin 
                    || joinType == JoinType.LeftExclusiveJoin
                    || joinType == JoinType.FullExclusiveJoin
                    || joinType == JoinType.FullOuterJoin ) {
                write(outputFile, leftRecord, null);
            }
            leftRecord = read(leftFile);              
        }
    }
    else {  
        if( rightRecord != null ) {
            if( joinType == JoinType.RightOuterJoin 
                    || joinType == JoinType.RightExclusiveJoin
                    || joinType == JoinType.FullExclusiveJoin
                    || joinType == JoinType.FullOuterJoin ) {
                write(outputFile, null, rightRecord);
            }
            rightRecord = read(rightFile);
            while(rightRecord != null) {
                if( joinType == JoinType.RightOuterJoin 
                        || joinType == JoinType.RightExclusiveJoin
                        || joinType == JoinType.FullExclusiveJoin
                        || joinType == JoinType.FullOuterJoin ) {
                    write(outputFile, null, rightRecord);
                }
                rightRecord = read(rightFile);
            }
        }           
    }
    leftFile.close();
    rightFile.close();
    outputFile.flush();
    outputFile.close();
}

public void processRightRecords(BufferedWriter outputFile, Record leftRecord, List<File> rightFiles, List<Record> rightRecords, JoinType joinType) throws Throwable {

    for(File rightFile : rightFiles) {
        BufferedReader rightReader = new BufferedReader(new FileReader(rightFile));
        Record rightRecord = read(rightReader);
        while(rightRecord != null){

            if( joinType == JoinType.LeftOuterJoin 
                    || joinType == JoinType.RightOuterJoin
                    || joinType == JoinType.FullOuterJoin
                    || joinType == JoinType.InnerJoin ) {               

                write(outputFile, leftRecord, rightRecord);
            }
            rightRecord = read(rightReader);
        }
        rightReader.close();
    }

    for(Record rightRecord : rightRecords) {
        if( joinType == JoinType.LeftOuterJoin 
                || joinType == JoinType.RightOuterJoin
                || joinType == JoinType.FullOuterJoin
                || joinType == JoinType.InnerJoin ) {               

            write(outputFile, leftRecord, rightRecord);
        }
    }
}

/**
 * consume all records having key (either to a single list or multiple files) each file will 
 * store a buffer full of data. The right record returned represents the outside flow (key is 
 * already positioned to next one or null) so we can't use this record in below while loop or 
 * within this block in general when comparing current key. The trick is to keep consuming 
 * from a List. When it becomes empty, re-fill it from the next file until all files have 
 * been consumed (and the last node in the list is read). The next outside iteration will be 
 * ready to be processed (either it will be null or it points to the next biggest key
 * @throws Throwable 
 * 
 */
private Record consume(String key, BufferedReader reader, List<Record> records, List<File> files, long bufferMaxRecordLines ) throws Throwable {
    boolean processComplete = false;
    Record record = records.get(records.size() - 1);

    while(!processComplete){
        long recordCount = records.size();
        if( record.getKey().compareTo(key) == 0 ){
            record = read(reader);                
            while( record != null && record.getKey().compareTo(key) == 0 && recordCount < bufferMaxRecordLines ) {
                records.add(record);
                recordCount++;
                record = read(reader);
            }
        }
        processComplete = true;
        // if record is null, we are done
        if( record != null ) {

            // if the key has changed, we are done
            if( record.getKey().compareTo(key) == 0 ) {

                // Same key means we have exhausted the buffer.
                // Dump entire buffer into a file. The list of file 
                // pointers will keep track of the files ...                    
                processComplete = false;
                dumpBufferToFile(records, files);
                records.clear();
                records.add(record);
            }
        }
    }
    return record;
}

/**
 * Dump all records in List of Record objects to a file. Then, add that 
 * file to List of File objects
 * 
 * NEED TO PLACE A LIMIT ON NUMBER OF FILE POINTERS (check size of file list)
 * 
 * @param records
 * @param files
 * @throws Throwable 
 */
private void dumpBufferToFile(List<Record> records, List<File> files) throws Throwable {
    String prefix = "joiner_" + files.size() + 1;
    String suffix = ".dat";
    File file = File.createTempFile(prefix, suffix, new File("cache"));
    BufferedWriter writer = new BufferedWriter(new FileWriter(file));       

    for( Record record : records ) {
        writer.write( record.dump() );
    }

    files.add(file);
    writer.flush();
    writer.close();
}

© Programmers or respective owner

Related posts about java

Related posts about file-handling