/* Copyright (C) 2009  CSE,IIT Bombay  http://www.cse.iitb.ac.in

This file is part of the ConStore open source storage facility for concept-nets.

ConStore is free software and distributed under the 
Creative Commons Attribution-Noncommercial-No Derivative Works 3.0 Unported License;
you can copy, distribute and transmit the work
with the work attribution in the manner specified by the author or licensor.
You may not use this work for commercial purposes and may not alter, 
transform, or build upon this work.

Please refer the legal code of the license, available at
http://creativecommons.org/licenses/by-nc-nd/3.0/legalcode

ConStore is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.  */

package iitb.con.indexing.isam;

import iitb.con.ds.Node;
import iitb.con.indexing.IndexTree;
import iitb.con.io.BufferedFileAdapter;
import iitb.con.io.IOAdapter;
import iitb.con.util.KeyList;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * ISAMTree is index structure to index the instances based on their attribute.<p>
 * 
 * ISAM stands for Indexed Sequential Access Method, a method for indexing data for fast retrieval.
 * In an ISAM system, data is organized into records which are composed of fixed length fields. 
 * Records are stored sequentially (as leaf nodes), originally to speed access on a disk. 
 * A secondary set of hash tables (non-leaf nodes) contain "pointers" into the leaf-node blocks, 
 * allowing individual records to be retrieved without having to search the entire data set.<p>
 * 
 * ISAM is a static indexing structure, works well for the structure involving few updates.
 * That is the reason there is no update method provided. Hence user need to create index again if
 * the data gets changed.
 * 
 * @author Prathab K
 *
 */
public class ISAMTree<K extends Comparable<K>> implements IndexTree<K> {
    
    /** Block size of the node */
    private static final int BLOCK_SIZE = 4096;
    
    /** IOAdapter for non-leaf nodes file */
    private IOAdapter nonLeafNodesFile;
    
    /** IOAdapter for leaf nodes file */
    private IOAdapter leafNodesFile;
    
    /** Buffer for non-leaf nodes */
    private ByteBuffer nonLeafBuffer;
    
    /**Buffer for leaf nodes */
    private ByteBuffer leafBuffer;
    
    /** <Key-BlockId of leaf node> List */
    private KeyList<K,Short> nonLeafNodeKeys;
    
    /** blocks count of non-leaf nodes in file */ 
    private short nidxBlockCount = 1;
    
    /** blocks count of leaf nodes in file */
    private short lidxBlockCount = 1;
    
    
    /**
     * Constructs the ISAMTree for the specified index file
     * @param fileName index file name
     * @param mode file opening mode (r, rw)
     * @throws FileNotFoundException if file not found
     * @throws IOException if file operation fails
     */
    public ISAMTree(String fileName, String mode) 
    throws FileNotFoundException, IOException
    {
        if(mode.equals("r")) {

        }else if(mode.equals("rw")) {
            nonLeafBuffer = ByteBuffer.allocate(BLOCK_SIZE);
            leafBuffer = ByteBuffer.allocate(BLOCK_SIZE);
        }else{
            throw new IOException("Invalid opening mode");
        }
        
        File file = new File(fileName + NON_LEAF_EXT);
        int length = (int)file.length();
        nonLeafNodesFile = new BufferedFileAdapter(fileName + NON_LEAF_EXT, mode);
        leafNodesFile = new BufferedFileAdapter(fileName + LEAF_EXT, mode);
    }
    
    /** 
     * Creates the index file for the give nodes list
     * @param nodes nodes list - {@link Node}
     * @throws IOException if file operation fails
     */
    public void create(List<Node<K>> nodes)
    throws IOException {
        Collections.sort(nodes);
        
        initializeBuffer(nonLeafBuffer);
        initializeBuffer(leafBuffer);
        
        LeafNode<K> leafNode = new LeafNode<K>();
        ArrayList<Integer> tempList = new ArrayList<Integer>();
        
        if(nodes.size() > 0) {
            leafNode.key = nodes.get(0).key;
            writeIntoBuffer(new NonLeafNode(leafNode.key,lidxBlockCount));
        }
        
        //for(Node<K> node : nodes) {
        for(int i=0 ; i < nodes.size(); i++) {
            if((i!= nodes.size() - 1) && nodes.get(i).compareTo(nodes.get(i+1)) == 0) {
                tempList.add(nodes.get(i).value);
            }else {
                leafNode.key = nodes.get(i).key;
                tempList.add(nodes.get(i).value);
                int[] a = new int[tempList.size()]; //TODO: put as list & make code generic
                int j = 0;
                for(Integer n : tempList) {
                    a[j++] = n;
                    if(n <= 0) System.out.println(leafNode.key + " - " + n);
                }
                leafNode.instanceId = a;
                tempList.clear();
                writeIntoBuffer(leafNode);
            }
        }
        commitBuffers();
    }
    
    /**
     * Retrieves the values for the specified key
     * @param key key of the index structure
     * @return matched values for the given key
     * @throws IOException if the file operation fails
     */
    public int[] getValues(K key) throws IOException {
        if(nonLeafNodeKeys == null)
            nonLeafNodeKeys = getMetaIndex(key);
        
        Short blockId = nonLeafNodeKeys.getPrev(key);
        
        if(blockId != null) {
            byte[] b = leafNodesFile.read(getLocation(blockId), BLOCK_SIZE);
            LeafNode<K> leafNode = new LeafNode<K>(key);
            ByteBuffer buf = ByteBuffer.wrap(b);
            int blockSize = buf.getInt();
            buf.limit(blockSize);
            while(buf.hasRemaining()) {
                /*leafNode.deSerialize(buf);
                if(leafNode.key.compareTo(key) == 0)
                    return leafNode.instanceId;*/
                if(leafNode.attributeDeSerialize(buf, key) != null)
                    return leafNode.instanceId;
            }
        }
        return null;
    }
    
    /**
     * Returns the meta-index of the index structure.
     * The meta-index is a basically the non-leaf node of the index tree structure.
     * @param key key of the index structure
     * @return key-id list ; {@link KeyList}
     * @throws IOException if the file operation fails
     */
    public KeyList<K,Short> getMetaIndex(K key) throws IOException {
        KeyList<K,Short> nonLeafNodesTable = new KeyList<K,Short>();
        ByteBuffer[] buffers = nonLeafNodesFile.readFileAsBlocks();
        
        for(int i=0; i < buffers.length; i++) {
            int blockSize = buffers[i].getInt();
            buffers[i].limit(blockSize);
            while(buffers[i].hasRemaining()) {
                NonLeafNode nlNode = new NonLeafNode(key);
                nlNode.deSerialize(buffers[i]);
                nonLeafNodesTable.add((K)nlNode.value, nlNode.BlockId);
            }
        }
        return nonLeafNodesTable;
    }
    
    
    /**
     * Writes into the buffer of the given leaf node.
     * If the buffer overflows then it is truncated and written to the file.
     * The block id of leaf node information is updated in the non-leaf node.
     * @param leafNode leaf node - {@link LeafNode}
     * @throws IOException if file operation fails
     */
    private void writeIntoBuffer(LeafNode<K> leafNode) 
    throws IOException {
        //TODO: if buf is > 4096 then this will throw overflow error? (**BUG**)
        /* BUG: If many attributes has same values buf.capacity() will exceed 4KB
         * If many attributes have same prefix, duplicate blocks will be fetched
         * so the key must be unique
         * 
         */
        ByteBuffer buf = leafNode.serialize(leafNode);

        if((leafBuffer.position() + buf.capacity()) < BLOCK_SIZE) {
            leafBuffer.put(buf);
        }else {
            
            leafBuffer.putInt(0, leafBuffer.position());
            leafBuffer.rewind();
            leafNodesFile.write(leafBuffer, getLocation(lidxBlockCount));
            initializeBuffer(leafBuffer);
            lidxBlockCount++;
            leafBuffer.put(buf);
            writeIntoBuffer(new NonLeafNode(leafNode.key, lidxBlockCount));
        }
    }
    
    /**
     * Writes into the buffer of the given non-leaf node.
     * If the buffer overflows then it is truncated and written to the file.
     * @param nonLeafNode non-leaf node - {@link NonLeafNode}
     * @throws IOException if file operation fails
     */
    private void writeIntoBuffer(NonLeafNode nonLeafNode) 
    throws IOException {
        ByteBuffer buf = nonLeafNode.serialize(nonLeafNode);
        
        if((nonLeafBuffer.position() + buf.capacity()) < BLOCK_SIZE) {
            nonLeafBuffer.put(buf);
        }else {
            nonLeafBuffer.putInt(0, nonLeafBuffer.position());
            nonLeafBuffer.rewind();
            nonLeafNodesFile.write(nonLeafBuffer, getLocation(nidxBlockCount));
            nidxBlockCount++;
            initializeBuffer(nonLeafBuffer);
            nonLeafBuffer.put(buf);
        }
    }
    
    /**
     * Initializes the buffer.
     * @param buffer
     */
    private void initializeBuffer(ByteBuffer buffer) {
        buffer.clear();
        buffer.putInt(BLOCK_SIZE); // dummy value to store the block size
    }
    
    /**
     * Commits the contents of the buffer to the file
     * @throws IOException if the file operation fails
     */
    private void commitBuffers() throws IOException {
        leafBuffer.putInt(0, leafBuffer.position());
        leafBuffer.flip();
        leafNodesFile.write(leafBuffer, getLocation(lidxBlockCount));
        nonLeafBuffer.putInt(0, nonLeafBuffer.position());
        nonLeafBuffer.flip();
        nonLeafNodesFile.write(nonLeafBuffer, getLocation(nidxBlockCount));
    }
    
    /**
     * Closes the index structure files.
     * @throws IOException if the file operation fails
     */
    public void close() throws IOException {
        if(nonLeafNodesFile != null)
            nonLeafNodesFile.close();
        if(leafNodesFile != null)
            leafNodesFile.close();
    }
    
    /**
     * Returns the file location for the specified block id 
     * @param blockId block id in the index file 
     * @return file location
     */
    private long getLocation(int blockId) {
        return ((blockId * BLOCK_SIZE) - BLOCK_SIZE);
    }

}