/* Copyright (C) 2009  CSE,IIT Bombay  http://www.cse.iitb.ac.in

This file is part of the ConStore open source storage facility for concept-nets.

ConStore is free software and distributed under the 
Creative Commons Attribution-Noncommercial-No Derivative Works 3.0 Unported License;
you can copy, distribute and transmit the work
with the work attribution in the manner specified by the author or licensor.
You may not use this work for commercial purposes and may not alter, 
transform, or build upon this work.

Please refer the legal code of the license, available at
http://creativecommons.org/licenses/by-nc-nd/3.0/legalcode

ConStore is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.  */

package iitb.con.ds;

import iitb.con.core.Attribute;
import iitb.con.core.DataType;
import iitb.con.core.Entity;
import iitb.con.core.EntityInstance;
import iitb.con.core.Instance;
import iitb.con.core.Relation;
import iitb.con.core.RelationInstance;
import iitb.con.core.Type;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * InstanceSerializer serializes <tt>Instance</tt> to bytes and vice versa
 * 
 * @author Prathab K
 * @see Instance
 */
public class InstanceSerializer implements ItemSerializer<Instance> {
    
    /** Type table to get the type information of an instance */
    private ItemTable<String,Short,Type> typeTable;
    
    /**
     * Initializes with TypeTable
     * @param typeTable Type table
     * @see TypeTable
     */
    public InstanceSerializer(ItemTable<String, Short,Type> typeTable){
        this.typeTable = typeTable;
    }
    
    //TODO: Improve the coding of this class & optimize for the performance
    
    /**
     * Serializes <tt>Instance</tt> to bytes
     * @param instance Instance
     * @return serialized <tt>Instance</tt> as {@link ByteBuffer}
     */
    public ByteBuffer serialize(Instance instance) {
        Type type = typeTable.get(instance.getClass().getSimpleName(), null);
        
        int instanceElementSize = computeSize(instance,type) + 2;
        ByteBuffer buf = ByteBuffer.allocate(instanceElementSize);
    
        buf.putShort(type.id);
        
        if(instance instanceof EntityInstance) {
            //no specific attributes
        }else {
            RelationInstance r = (RelationInstance) instance;
            buf.putInt(r.leftInstanceId); //left instance id
            buf.putInt(r.rightInstanceId); //right instance id
        }
        
        List<Attribute> attributes = type.getAllAttributes();
        for(Attribute attribute : attributes) {
            Object value = instance.getAttributeValue(attribute.name);
            if(value instanceof String) {
                buf.putShort((short)((String)value).length());
                buf.put(((String)value).getBytes());
            }else if(value instanceof Short) {
                buf.putShort(((Short)value).shortValue());
            }else if(value instanceof Integer) {
                    buf.putInt(((Integer)value).intValue());
            }else if(value instanceof Float) {
                buf.putFloat(((Float)value).floatValue());
            }else if(value instanceof Long) {
                buf.putLong(((Long)value).longValue());
            }else if(value instanceof Double) {
                buf.putDouble(((Double)value).doubleValue());
            }else if(value instanceof Date) {
                buf.putShort((short)((String)value).length());
                buf.put(((String)value).getBytes());
            }else if(value instanceof Boolean) {
                if(((Boolean)value).booleanValue()) 
                    buf.put((byte)1);
                else
                    buf.put((byte)0);
            }else if(value instanceof List) { // repeating attributes
                List l = (List) value;
                buf.putShort((short)l.size());
                if(!l.isEmpty()) {
                    for(Object obj : l) {
                        if(obj instanceof String) {
                            buf.putShort((short)((String)obj).length());
                            buf.put(((String)obj).getBytes());
                        }else if(obj instanceof Short) {
                            buf.putShort(((Short)obj).shortValue());
                        }else if(obj instanceof Integer) {
                            buf.putInt(((Integer)obj).intValue());
                        }else if(obj instanceof Float) {
                            buf.putFloat(((Float)obj).floatValue());
                        }else if(obj instanceof Long) {
                            buf.putLong(((Long)obj).longValue());
                        }else if(obj instanceof Date) {
                            buf.putShort((short)((String)obj).length());
                            buf.put(((String)obj).getBytes());
                        }else if(obj instanceof Boolean) {
                            if(((Boolean)obj).booleanValue()) 
                                buf.put((byte)1);
                            else
                                buf.put((byte)0);
                        }
                    }
                }
            }else {
                //TODO: throw exception
            }
        }
        buf.rewind();
        return buf;
    }
    
    /**
     * Deserializes the given bytes as <tt>Instance</tt>
     * @param buf bytes as {@link ByteBuffer}
     * @return deserialized bytes as <tt>Instance</tt>
     */
    public Instance deSerialize(ByteBuffer buf) {
        Instance instance= null;
        
        buf.rewind();
        short typeId = buf.getShort();
        Type type = (Type) typeTable.get(typeId);
        if(type instanceof Entity) { 
            instance = deSerializeEntityInstance(buf, type);
        }else if(type instanceof Relation){
            instance = deSerializeRelationInstance(buf, type);
        }else {
            //TODO: throw exception
        }
        
        return instance;
    }
    
    /**
     * Deserializes the given bytes to object
     * @param buf bytes as {@link ByteBuffer}
     * @param name attribute name
     * @return deserialized attribute value as <tt>Object</tt>
     */
    public Object attributeDeSerialize(ByteBuffer buf, Object name) {
        buf.rewind();
        short typeId = buf.getShort();
        Type type = (Type) typeTable.get(typeId);
        if(type instanceof Entity) { 
            return deSerializeSelectiveAttribute(buf, name, type);
        }else if(type instanceof Relation){
            buf.position(buf.position() + 8); //skip the two int values of left & right entity id
            return deSerializeSelectiveAttribute(buf, name, type);
        }else {
            //TODO: throw exception
        }
        return null;
    }
    
    /**
     * Deserializes the EnityInstance
     * @param buf bytes as {@link ByteBuffer}
     * @param type instance's type
     * @return Entity Instance
     * @see EntityInstance
     */
    private Instance deSerializeEntityInstance(ByteBuffer buf, Type type) {
        EntityInstance instance = 
            (EntityInstance) createClass("types.entities." + type.name);
        instance.setType(type);
        //TODO: get package name from file
        
        deSerializeAttributes(buf, instance, type);
        
        return instance;
    }
    
    /**
     * Deserializes the RelationInstance
     * @param buf bytes as {@link ByteBuffer}
     * @param type instance's type
     * @return Relation Instance
     * @see RelationInstance
     */
    private Instance deSerializeRelationInstance(ByteBuffer buf, Type type) {
        RelationInstance instance = (
                RelationInstance) createClass("types.relations." + type.name);
        //TODO: get package name from file
        
        instance.setType(type);
        instance.leftInstanceId = buf.getInt();
        instance.rightInstanceId = buf.getInt();
        
        deSerializeAttributes(buf, instance, type);
        
        return instance;
    }
    
    /**
     * Deserializes the specified attribute of an instance
     * @param buf bytes as {@link ByteBuffer}
     * @param name attribute name
     * @param type instance's type
     * @return Attribute's value as object
     */
    private Object deSerializeSelectiveAttribute(ByteBuffer buf, Object name, Type type) {
        List<Attribute> attributes = type.getAllAttributes();
        for(Attribute attribute : attributes) {
            if(attribute.name.equals(name)) {
                if(attribute.repeating)
                    return getAttributeMultiValue(buf,attribute);
                else
                    return getAttributeSingleValue(buf,attribute);
            }else {
                if(attribute.repeating)
                    skipAttributeMultiValue(buf,attribute);
                else
                    skipAttributeSingleValue(buf,attribute);
            }
        }
        return null;
    }
    
    /**
     * Deserializes the Instance's attributes
     * @param buf bytes as {@link ByteBuffer}
     * @param instance Instance
     * @param type instance's type
     * @return Attribute List
     * @see Attribute
     */
    private void deSerializeAttributes(ByteBuffer buf, Instance instance, Type type) {
        List<Attribute> attributes = type.getAllAttributes();
        for(Attribute attribute : attributes) {
            if(attribute.repeating)
                setAttributeMultiValue(buf, instance, attribute);
            else
                setAttributeSingleValue(buf, instance, attribute);
        }
    }
    
    /**
     * Sets the single-valued attribute of the instance during the deserialization
     * @param buf bytes as {@link ByteBuffer}
     * @param instance Instance
     * @param attribute Instance's attribute
     */
    private void  setAttributeSingleValue(ByteBuffer buf, Instance instance, Attribute attribute) {
        short size = 0;
        byte[] tempBuf = null;
        if(attribute.dataType == DataType.STRING) {
            size = buf.getShort();
            tempBuf = new byte[size];
            buf.get(tempBuf);
            instance.setAttributeValue(attribute.name, new String(tempBuf));
        }else if(attribute.dataType == DataType.SHORT) {
            instance.setAttributeValue(attribute.name, new Short(buf.getShort()));
        }else if(attribute.dataType == DataType.INTEGER) {
            instance.setAttributeValue(attribute.name, new Integer(buf.getInt()));
        }else if(attribute.dataType == DataType.FLOAT) {
            instance.setAttributeValue(attribute.name, new Float(buf.getFloat()));
        }else if(attribute.dataType == DataType.DOUBLE) {
            instance.setAttributeValue(attribute.name, new Double(buf.getDouble()));
        }else if(attribute.dataType == DataType.TIME) {
            size = buf.getShort();
            tempBuf = new byte[size];
            buf.get(tempBuf);
            instance.setAttributeValue(attribute.name, new String(tempBuf));
        }else if(attribute.dataType == DataType.BOOLEAN) {
            if (buf.get() == 0)
                instance.setAttributeValue(attribute.name, new Boolean(false));
            else
                instance.setAttributeValue(attribute.name, new Boolean(true));
        }else {
            //TODO: throw exception
        }
    }
    
    /**
     * Sets the multi-valued attribute of the instance during the deserialization
     * @param buf bytes as {@link ByteBuffer}
     * @param instance Instance
     * @param attribute Instance's attribute
     */
    private void setAttributeMultiValue(ByteBuffer buf, Instance instance, Attribute attribute) {
        short size = 0;
        byte[] tempBuf = null;
        short values_count = buf.getShort();
        
        if(attribute.dataType == DataType.STRING) {
            List<String> values = new ArrayList<String>();
            for(short i = 0; i < values_count ; i++ ) {
                size = buf.getShort();
                tempBuf = new byte[size];
                buf.get(tempBuf);
                values.add(new String(tempBuf));
            }
            instance.setAttributeValue(attribute.name, values);
        }else if(attribute.dataType == DataType.SHORT) {
            List<Short> values = new ArrayList<Short>();
            for(short i = 0; i < values_count ; i++ ) {
                values.add(new Short(buf.getShort()));
            }
            instance.setAttributeValue(attribute.name, values);
        }else if(attribute.dataType == DataType.INTEGER) {
            List<Integer> values = new ArrayList<Integer>();
            for(short i = 0; i < values_count ; i++ ) {
                values.add(new Integer(buf.getInt()));
            }
            instance.setAttributeValue(attribute.name, values);
        }else if(attribute.dataType == DataType.FLOAT) {
            List<Float> values = new ArrayList<Float>();
            for(short i = 0; i < values_count ; i++ ) {
                values.add(new Float(buf.getFloat()));
            }
            instance.setAttributeValue(attribute.name, values);
        }else if(attribute.dataType == DataType.DOUBLE) {
            List<Double> values = new ArrayList<Double>();
            for(short i = 0; i < values_count ; i++ ) {
                values.add(new Double(buf.getDouble()));
            }
            instance.setAttributeValue(attribute.name, values);
        }else if(attribute.dataType == DataType.TIME) {
            List<String> values = new ArrayList<String>();
            for(short i = 0; i < values_count ; i++ ) {
                size = buf.getShort();
                tempBuf = new byte[size];
                buf.get(tempBuf);
                values.add(new String(tempBuf));
            }
            instance.setAttributeValue(attribute.name, values);
        }else if(attribute.dataType == DataType.BOOLEAN) {
            if (buf.get() == 0)
                instance.setAttributeValue(attribute.name, new Boolean(false));
            else
                instance.setAttributeValue(attribute.name, new Boolean(true));
            List<Boolean> values = new ArrayList<Boolean>();
            for(short i = 0; i < values_count ; i++ ) {
                if (buf.get() == 0)
                    values.add(new Boolean(false));
                else
                    values.add(new Boolean(true));
            }
            instance.setAttributeValue(attribute.name, values);
        }else {
            //TODO: throw exception
        }
    }
    
    
    /**
     * Gets the single-valued attribute of the instance during the deserialization
     * @param buf bytes as {@link ByteBuffer}
     * @param attribute Instance's attribute
     * @return attribute value as <tt>Object</tt>
     */
    private Object getAttributeSingleValue(ByteBuffer buf, Attribute attribute) {
        short size = 0;
        byte[] tempBuf = null;
        if(attribute.dataType == DataType.STRING) {
            size = buf.getShort();
            tempBuf = new byte[size];
            buf.get(tempBuf);
            return new String(tempBuf);
        }else if(attribute.dataType == DataType.SHORT) {
            return new Short(buf.getShort());
        }else if(attribute.dataType == DataType.INTEGER) {
            return new Integer(buf.getInt());
        }else if(attribute.dataType == DataType.FLOAT) {
            return new Float(buf.getFloat());
        }else if(attribute.dataType == DataType.DOUBLE) {
            return new Double(buf.getDouble());
        }else if(attribute.dataType == DataType.TIME) {
            size = buf.getShort();
            tempBuf = new byte[size];
            buf.get(tempBuf);
            return new String(tempBuf);
        }else if(attribute.dataType == DataType.BOOLEAN) {
            if (buf.get() == 0)
                return new Boolean(false);
            else
                return new Boolean(true);
        }else {
            return null;
            //TODO: throw exception
        }
    }
    
    /**
     * Gets the multi-valued attribute of the instance during the deserialization
     * @param buf bytes as {@link ByteBuffer}
     * @param attribute Instance's attribute
     * @return attribute values as <tt>Object</tt>
     */
    private Object getAttributeMultiValue(ByteBuffer buf, Attribute attribute) {
        short size = 0;
        byte[] tempBuf = null;
        short values_count = buf.getShort();
        
        if(attribute.dataType == DataType.STRING) {
            List<String> values = new ArrayList<String>();
            for(short i = 0; i < values_count ; i++ ) {
                size = buf.getShort();
                tempBuf = new byte[size];
                buf.get(tempBuf);
                values.add(new String(tempBuf));
            }
            return values;
        }else if(attribute.dataType == DataType.SHORT) {
            List<Short> values = new ArrayList<Short>();
            for(short i = 0; i < values_count ; i++ ) {
                values.add(new Short(buf.getShort()));
            }
            return values;
        }else if(attribute.dataType == DataType.INTEGER) {
            List<Integer> values = new ArrayList<Integer>();
            for(short i = 0; i < values_count ; i++ ) {
                values.add(new Integer(buf.getInt()));
            }
            return values;
        }else if(attribute.dataType == DataType.FLOAT) {
            List<Float> values = new ArrayList<Float>();
            for(short i = 0; i < values_count ; i++ ) {
                values.add(new Float(buf.getFloat()));
            }
            return values;
        }else if(attribute.dataType == DataType.DOUBLE) {
            List<Double> values = new ArrayList<Double>();
            for(short i = 0; i < values_count ; i++ ) {
                values.add(new Double(buf.getDouble()));
            }
            return values;
        }else if(attribute.dataType == DataType.TIME) {
            List<String> values = new ArrayList<String>();
            for(short i = 0; i < values_count ; i++ ) {
                size = buf.getShort();
                tempBuf = new byte[size];
                buf.get(tempBuf);
                values.add(new String(tempBuf));
            }
            return values;
        }else if(attribute.dataType == DataType.BOOLEAN) {
            List<Boolean> values = new ArrayList<Boolean>();
            for(short i = 0; i < values_count ; i++ ) {
                if (buf.get() == 0)
                    values.add(new Boolean(false));
                else
                    values.add(new Boolean(true));
            }
            return values;
        }else {
            return null;
            //TODO: throw exception
        }
    }
    
    /**
     * Skips the buffer position for the single-valued attribute of the instance during the deserialization
     * @param buf bytes as {@link ByteBuffer}
     * @param attribute Instance's attribute
     */
    private void skipAttributeSingleValue(ByteBuffer buf, Attribute attribute) {
        short size = 0;
        if(attribute.dataType == DataType.STRING) {
            size = buf.getShort();
            buf.position(buf.position() + size);
        }else if(attribute.dataType == DataType.SHORT) {
            buf.position(buf.position() + DataType.SHORT_SIZE);
        }else if(attribute.dataType == DataType.INTEGER) {
            buf.position(buf.position() + DataType.INT_SIZE);
        }else if(attribute.dataType == DataType.FLOAT) {
            buf.position(buf.position() + DataType.FLOAT_SIZE);
        }else if(attribute.dataType == DataType.DOUBLE) {
            buf.position(buf.position() + DataType.DOUBLE_SIZE);
        }else if(attribute.dataType == DataType.TIME) {
            size = buf.getShort();
            buf.position(buf.position() + size);
        }else if(attribute.dataType == DataType.BOOLEAN) {
            buf.position(buf.position() + DataType.BOOL_SIZE);
        }else {
            //TODO: throw exception
        }
    }
    
    /**
     * Skips the buffer position for the multi-valued attribute of the instance during the deserialization
     * @param buf bytes as {@link ByteBuffer}
     * @param attribute Instance's attribute
     */
    private void skipAttributeMultiValue(ByteBuffer buf, Attribute attribute) {
        short size = 0;
        short values_count = buf.getShort();
        
        if(attribute.dataType == DataType.STRING) {
            for(short i = 0; i < values_count ; i++ ) {
                size = buf.getShort();
                buf.position(buf.position() + size);
            }
        }else if(attribute.dataType == DataType.SHORT) {
            for(short i = 0; i < values_count ; i++ ) {
                buf.position(buf.position() + DataType.SHORT_SIZE);
            }
        }else if(attribute.dataType == DataType.INTEGER) {
            for(short i = 0; i < values_count ; i++ ) {
                buf.position(buf.position() + DataType.INT_SIZE);
            }
        }else if(attribute.dataType == DataType.FLOAT) {
            for(short i = 0; i < values_count ; i++ ) {
                buf.position(buf.position() + DataType.FLOAT_SIZE);
            }
        }else if(attribute.dataType == DataType.DOUBLE) {
            for(short i = 0; i < values_count ; i++ ) {
                buf.position(buf.position() + DataType.DOUBLE_SIZE);
            }
        }else if(attribute.dataType == DataType.TIME) {
            for(short i = 0; i < values_count ; i++ ) {
                size = buf.getShort();
                buf.position(buf.position() + size);
            }
        }else if(attribute.dataType == DataType.BOOLEAN) {
            for(short i = 0; i < values_count ; i++ ) {
                buf.position(buf.position() + DataType.BOOL_SIZE);
            }
        }else {
            //TODO: throw exception
        }
    }
    
    /**
     * Computes the size of the given Instance in terms of bytes
     * @param instance Instance
     * @param type instance's type
     * @return size in bytes
     */
    private int computeSize(Instance instance, Type type) {
        int size = 0;
        
        List<Attribute> attributes = type.getAllAttributes();
        
        if(attributes != null) {
            for(Attribute attribute : attributes) {
                Object value = instance.getAttributeValue(attribute.name);
                if(value instanceof String) {
                    size += 2;
                    size += ((String)value).length();
                }else if(value instanceof Short) {
                    size += 2;
                }else if(value instanceof Integer) {
                    size += 4;
                }else if(value instanceof Float) {
                    size +=8;
                }else if(value instanceof Long) {
                    size +=8;
                }else if(value instanceof Double) {
                    size +=8;
                }else if(value instanceof Date) {
                    size += 2;
                    size += ((String)value).length();
                }else if(value instanceof Boolean) {
                    size ++;
                }else if(value instanceof List) { // repeating attributes
                    size += 2; //no. of values
                    List l = (List) value;
                    if(!l.isEmpty()) {
                        for(Object obj : l) {
                            if(obj instanceof String) {
                                size += 2;
                                size += ((String)obj).length();
                            }else if(obj instanceof Short) {
                                size += 2;
                            }else if(obj instanceof Integer) {
                                size += 4;
                            }else if(obj instanceof Float) {
                                size +=8;
                            }else if(obj instanceof Long) {
                                size +=8;
                            }else if(obj instanceof Double) {
                                size +=8;
                            }else if(obj instanceof Date) {
                                size += 2;
                                size += ((String)obj).length();
                            }else if(obj instanceof Boolean) {
                                size ++;
                            }
                        }
                    }
                }else {
                    //TODO: throw exception
                }
            }
        }
        if(instance instanceof EntityInstance) {
            //no extra attributes
        }else {
            //RelationInstance r = (RelationInstance) instance;
            //left instance id(4) + right instance id (4) 
            size += 8;
        }
        return size;
    }

    /**
     * Create class at run-time for specified class name
     * @param name class name
     * @return class instance
     * @throws InstantiationException if instantiation fails
     * @throws IllegalAccessException if method invoked on the class is not valid
     * @throws ClassNotFoundException if specified class is not found
     */
    private Object createClass(String name) {
        try {
              Class classDefinition = Class.forName(name);
              return  classDefinition.newInstance();
          } catch (InstantiationException ie) {
              ie.printStackTrace();
          } catch (IllegalAccessException iae) {
              iae.printStackTrace();
          } catch (ClassNotFoundException cnfe) {
              cnfe.printStackTrace();
          }
          return null;
    }
    

    
}