diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java index 003703f86a4..eb2a5a356b7 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java @@ -65,7 +65,7 @@ public abstract class AColGroup implements Serializable { /** Public super types of compression ColGroups supported */ public static enum CompressionType { - UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCFOR, DDCFOR, DeltaDDC, LinearFunctional; + UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCFOR, DDCFOR, DeltaDDC, DDCLZW, LinearFunctional; public boolean isDense() { return this == DDC || this == CONST || this == DDCFOR || this == DDCFOR; @@ -86,7 +86,7 @@ public boolean isSDC() { * Protected such that outside the ColGroup package it should be unknown which specific subtype is used. */ protected static enum ColGroupType { - UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, SDCSingleZeros, SDCZeros, SDCFOR, DDCFOR, DeltaDDC, + UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, SDCSingleZeros, SDCZeros, SDCFOR, DDCFOR, DDCLZW, DeltaDDC, LinearFunctional; } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java index ac4defcabd5..140fde5af16 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java @@ -92,8 +92,9 @@ protected ColGroupDDC(IColIndex colIndexes, IDictionary dict, AMapToData data, i throw new DMLCompressionException("Invalid length of the data. is zero"); if(data.getUnique() != dict.getNumberOfValues(colIndexes.size())) - throw new DMLCompressionException("Invalid map to dict Map has:" + data.getUnique() + " while dict has " - + dict.getNumberOfValues(colIndexes.size())); + throw new DMLCompressionException( + "Invalid map to dict Map has:" + data.getUnique() + " while dict has " + + dict.getNumberOfValues(colIndexes.size())); int[] c = getCounts(); if(c.length != dict.getNumberOfValues(colIndexes.size())) throw new DMLCompressionException("Invalid DDC Construction"); @@ -175,8 +176,8 @@ private final void decompressToDenseBlockDenseDictSingleColOutContiguous(DenseBl decompressToDenseBlockDenseDictSingleColOutContiguous(c, rl, ru, offR + _colIndexes.get(0), values, _data); } - private final static void decompressToDenseBlockDenseDictSingleColOutContiguous(double[] c, int rl, int ru, int offR, - double[] values, AMapToData data) { + private final static void decompressToDenseBlockDenseDictSingleColOutContiguous(double[] c, int rl, int ru, + int offR, double[] values, AMapToData data) { data.decompressToRange(c, rl, ru, offR, values); } @@ -375,15 +376,17 @@ private void leftMultByMatrixNoPreAggSingleCol(MatrixBlock matrix, MatrixBlock r return; else if(matrix.isInSparseFormat()) { if(cl != 0 || cu != _data.size()) - lmSparseMatrixNoPreAggSingleCol(matrix.getSparseBlock(), nColM, retV, nColRet, dictVals, rl, ru, cl, cu); + lmSparseMatrixNoPreAggSingleCol(matrix.getSparseBlock(), nColM, retV, nColRet, dictVals, rl, ru, cl, + cu); else lmSparseMatrixNoPreAggSingleCol(matrix.getSparseBlock(), nColM, retV, nColRet, dictVals, rl, ru); } else if(!matrix.getDenseBlock().isContiguous()) - lmDenseMatrixNoPreAggSingleColNonContiguous(matrix.getDenseBlock(), nColM, retV, nColRet, dictVals, rl, ru, cl, - cu); + lmDenseMatrixNoPreAggSingleColNonContiguous(matrix.getDenseBlock(), nColM, retV, nColRet, dictVals, rl, ru, + cl, cu); else - lmDenseMatrixNoPreAggSingleCol(matrix.getDenseBlockValues(), nColM, retV, nColRet, dictVals, rl, ru, cl, cu); + lmDenseMatrixNoPreAggSingleCol(matrix.getDenseBlockValues(), nColM, retV, nColRet, dictVals, rl, ru, cl, + cu); } private void lmSparseMatrixNoPreAggSingleCol(SparseBlock sb, int nColM, DenseBlock retV, int nColRet, double[] vals, @@ -538,7 +541,8 @@ private void lmMatrixNoPreAggMultiCol(MatrixBlock matrix, MatrixBlock result, in lmDenseMatrixNoPreAggMultiCol(matrix, result, rl, ru, cl, cu); } - private void lmSparseMatrixNoPreAggMultiCol(MatrixBlock matrix, MatrixBlock result, int rl, int ru, int cl, int cu) { + private void lmSparseMatrixNoPreAggMultiCol(MatrixBlock matrix, MatrixBlock result, int rl, int ru, int cl, + int cu) { final DenseBlock db = result.getDenseBlock(); final SparseBlock sb = matrix.getSparseBlock(); @@ -618,7 +622,8 @@ public void leftMMIdentityPreAggregateDense(MatrixBlock that, MatrixBlock ret, i } @Override - public void rightDecompressingMult(MatrixBlock right, MatrixBlock ret, int rl, int ru, int nRows, int crl, int cru) { + public void rightDecompressingMult(MatrixBlock right, MatrixBlock ret, int rl, int ru, int nRows, int crl, + int cru) { if(_dict instanceof IdentityDictionary) identityRightDecompressingMult(right, ret, rl, ru, crl, cru); else @@ -672,7 +677,8 @@ private void defaultRightDecompressingMult(MatrixBlock right, MatrixBlock ret, i } } - final void vectMM(double aa, double[] b, double[] c, int endT, int jd, int crl, int cru, int offOut, int k, int vLen, DoubleVector vVec) { + final void vectMM(double aa, double[] b, double[] c, int endT, int jd, int crl, int cru, int offOut, int k, + int vLen, DoubleVector vVec) { vVec = vVec.broadcast(aa); final int offj = k * jd; final int end = endT + offj; @@ -985,8 +991,8 @@ private void leftMMIdentityPreAggregateDenseSingleRow(double[] values, int pos, } } - private void leftMMIdentityPreAggregateDenseSingleRowRangeIndex(double[] values, int pos, double[] values2, int pos2, - int cl, int cu) { + private void leftMMIdentityPreAggregateDenseSingleRowRangeIndex(double[] values, int pos, double[] values2, + int pos2, int cl, int cu) { IdentityDictionary a = (IdentityDictionary) _dict; final int firstCol = pos2 + _colIndexes.get(0); @@ -1112,13 +1118,13 @@ protected boolean allowShallowIdentityRightMult() { public AColGroup convertToDeltaDDC() { int numCols = _colIndexes.size(); int numRows = _data.size(); - + DblArrayCountHashMap map = new DblArrayCountHashMap(Math.max(numRows, 64)); double[] rowDelta = new double[numCols]; double[] prevRow = new double[numCols]; DblArray dblArray = new DblArray(rowDelta); int[] rowToDictId = new int[numRows]; - + double[] dictVals = _dict.getValues(); for(int i = 0; i < numRows; i++) { @@ -1129,18 +1135,19 @@ public AColGroup convertToDeltaDDC() { if(i == 0) { rowDelta[j] = val; prevRow[j] = val; - } else { + } + else { rowDelta[j] = val - prevRow[j]; prevRow[j] = val; } } - + rowToDictId[i] = map.increment(dblArray); } - + if(map.size() == 0) return new ColGroupEmpty(_colIndexes); - + ACount[] vals = map.extractValues(); final int nVals = vals.length; final double[] dictValues = new double[nVals * numCols]; @@ -1153,7 +1160,7 @@ public AColGroup convertToDeltaDDC() { oldIdToNewId[dac.id] = i; idx += numCols; } - + DeltaDictionary deltaDict = new DeltaDictionary(dictValues, numCols); AMapToData newData = MapToFactory.create(numRows, nVals); for(int i = 0; i < numRows; i++) { @@ -1162,4 +1169,7 @@ public AColGroup convertToDeltaDDC() { return ColGroupDeltaDDC.create(_colIndexes, deltaDict, newData, null); } + public AColGroup convertToDDCLZW() { + return ColGroupDDCLZW.create(_colIndexes, _dict, _data, null); + } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java new file mode 100644 index 00000000000..a8c279828fb --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java @@ -0,0 +1,727 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.colgroup; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutorService; + +import jdk.incubator.vector.DoubleVector; +import jdk.incubator.vector.VectorSpecies; +import org.apache.arrow.vector.complex.writer.BitWriter; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.DMLCompressionException; +import org.apache.sysds.runtime.compress.colgroup.ColGroupUtils.P; +import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IdentityDictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.colgroup.indexes.RangeIndex; +import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; +import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; +import org.apache.sysds.runtime.compress.colgroup.offset.AOffsetIterator; +import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory; +import org.apache.sysds.runtime.compress.colgroup.scheme.DDCScheme; +import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme; +import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.compress.estim.EstimationFactors; +import org.apache.sysds.runtime.compress.estim.encoding.EncodingFactory; +import org.apache.sysds.runtime.compress.estim.encoding.IEncode; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.data.SparseBlockMCSR; +import org.apache.sysds.runtime.data.SparseRow; +import org.apache.sysds.runtime.functionobjects.Builtin; +import org.apache.sysds.runtime.functionobjects.Minus; +import org.apache.sysds.runtime.functionobjects.Plus; +import org.apache.sysds.runtime.matrix.data.LibMatrixMult; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import org.jboss.netty.handler.codec.compression.CompressionException; +import shaded.parquet.it.unimi.dsi.fastutil.ints.IntArrayList; +import shaded.parquet.it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap; + +import java.util.HashMap; + +/** + * Class to encapsulate information about a column group that is encoded with dense dictionary encoding (DDC) whose + * mapping vector is additionally lzw compressed. Idea: - DDCLZW stores the mapping vector exclusively in compressed + * form. - No persistent MapToData cache is maintained. - Sequential operations decode on-the-fly, while operations + * requiring random access explicitly materialize and fall back to DDC. + */ +public class ColGroupDDCLZW extends APreAgg implements IMapToDataGroup { + private static final long serialVersionUID = -5769772089913918987L; + + private final int[] _dataLZW; // LZW compressed representation of the mapping + private final int _nRows; // Number of rows in the mapping vector + private final int _nUnique; // Number of unique values in the mapping vector + + // Builds a packed 64-bit key for (prefixCode(w), nextSymbol(k)) pairs used in the LZW dictionary. (TODO) + private static long packKey(int prefixCode, int nextSymbol) { + return (((long) prefixCode) << 32) | (nextSymbol & 0xffffffffL); + } + + // Compresses a mapping (AMapToData) into an LZW-compressed byte/integer/? array. + private static int[] compress(AMapToData data) { + if(data == null) + throw new IllegalArgumentException("Invalid input: data is null"); + + final int nRows = data.size(); + if(nRows <= 0) { + throw new IllegalArgumentException("Invalid input: data has no rows"); + } + + final int nUnique = data.getUnique(); + if(nUnique <= 0) { + throw new IllegalArgumentException("Invalid input: data has no unique values"); + } + + // Fast-path: single symbol + if(nRows == 1) + return new int[] {data.getIndex(0)}; + + // LZW dictionary. Maps (prefixCode, nextSymbol) -> newCode (to a new code). + // Using fastutil keeps lookups fast. (TODO improve time/space complexity) + final Long2IntLinkedOpenHashMap dict = new Long2IntLinkedOpenHashMap(1 << 16); + dict.defaultReturnValue(-1); + + // Output buffer (heuristic capacity; avoids frequent reallocs) + final IntArrayList out = new IntArrayList(Math.max(16, nRows / 2)); + + // Codes {0,...,nUnique - 1} are reserved for the original symbols. + int nextCode = nUnique; + + // Initialize w with the first input symbol. + // AMapToData stores dictionary indices, not actual data values. + // Since indices reference positions in an IDictionary, they are always in the valid index range 0 … nUnique−1; + int w = data.getIndex(0); + + // Process the remaining input symbols. + // Example: _data = [2,0,2,3,0,2,1,0,2]. + for(int i = 1; i < nRows; i++) { + final int k = data.getIndex(i); // next input symbol + + if(k < 0 || k >= nUnique) + throw new IllegalArgumentException("Symbol out of range: " + k + " (nUnique=" + nUnique + ")"); + + final long key = packKey(w, k); // encode (w,k) into long key + + int wk = dict.get(key); // look if wk exists in dict + if(wk != -1) { + w = wk; // wk exists in dict so replace w by wk and continue. + } + else { + // wk does not exist in dict. output current phrase, add new phrase, restart at k + out.add(w); + dict.put(key, nextCode++); + w = k; // Start new phrase with k + } + } + + out.add(w); + return out.toIntArray(); + } + + // Unpack upper 32 bits (w) of (w,k) key pair. + private static int unpackfirst(long key) { + return (int) (key >>> 32); + } + + // Unpack lower 32 bits (k) of (w,k) key pair. + private static int unpacksecond(long key) { + return (int) (key); + } + + // Append symbol to end of int-array. + private static int[] packint(int[] arr, int last) { + int[] result = Arrays.copyOf(arr, arr.length + 1); + result[arr.length] = last; + return result; + } + + // Reconstruct phrase to lzw-code. + private static int[] unpack(int code, int nUnique, Map dict) { + // Base symbol (implicit alphabet) + if(code < nUnique) + return new int[] {code}; + + Stack stack = new Stack<>(); + int c = code; + + while(c >= nUnique) { + Long key = dict.get(c); + if(key == null) + throw new IllegalStateException("Missing dictionary entry for code: " + c); + + int symbol = unpacksecond(key); + stack.push(symbol); + c = unpackfirst(key); + } + + // Basissymbol + stack.push(c); + int[] outarray = new int[stack.size()]; + int i = 0; + // korrekt ins Output schreiben + while(!stack.isEmpty()) { + outarray[i++] = stack.pop(); + } + return outarray; + } + + // Decompresses an LZW-compressed vector into its pre-compressed AMapToData form until index. + private static AMapToData decompressFull(int[] codes, int nUnique, int nRows) { + return decompress(codes, nUnique, nRows, nRows); + } + + private final class LZWMappingIterator { + private final Map dict = new HashMap<>(); // LZW-dictionary. Maps code -> (prefixCode, nextSymbol). + private int lzwIndex = 0; // Current position in the LZW-compressed mapping (_dataLZW). + private int mapIndex = 0; // Number of mapping symbols returned so far. + private int nextCode = _nUnique; // Next free LZW code. + private int[] currentPhrase = null; // Current phrase being decoded from the LZW-compressed mapping. + private int currentPhraseIndex = 0; // Next position in the current phrase to return. + private int[] oldPhrase = null; // Previous phrase. + private int oldCode = -1; // Previous code. + + LZWMappingIterator() { + lzwIndex = 1; // First code consumed during initialization. + oldCode = _dataLZW[0]; // Decode the first code into initial phrase. + oldPhrase = unpack(oldCode, _nUnique, dict); + currentPhrase = oldPhrase; + currentPhraseIndex = 0; + mapIndex = 0; // No mapping symbols have been returned yet. + } + + // True if there are more mapping symbols to decode. + boolean hasNext() { + return mapIndex < _nRows; + } + + int next() { + if(!hasNext()) + throw new NoSuchElementException(); + + // If the current phrase still has symbols, return the next symbol from it. + if(currentPhraseIndex < currentPhrase.length) { + mapIndex++; + return currentPhrase[currentPhraseIndex++]; + } + + // Otherwises decode the next code into a new phrase. + if(lzwIndex >= _dataLZW.length) + throw new IllegalStateException("Invalid LZW index: " + lzwIndex); + + final int key = _dataLZW[lzwIndex++]; + + final int[] next; + if(key < _nUnique || dict.containsKey(key)) { + next = unpack(key, _nUnique, + dict); // Normal case: The code is either a base symbol or already present in the dictionary. + } + else { + next = packint(oldPhrase, oldPhrase[0]); // Special case. + } + + // Add new phrase to dictionary: nextCode -> (oldCode, firstSymbol(next)). + dict.put(nextCode++, packKey(oldCode, next[0])); + + // Advance decoder state. + oldCode = key; + oldPhrase = next; + + // Start returning symbols from the newly decoded phrase. + currentPhrase = next; + currentPhraseIndex = 0; + + mapIndex++; + return currentPhrase[currentPhraseIndex++]; + } + } + + // Decompresses an LZW-compressed vector into its pre-compressed AMapToData form until index. + private static AMapToData decompress(int[] codes, int nUnique, int nRows, int index) { + // Validate input arguments. + if(codes == null) + throw new IllegalArgumentException("codes is null"); + if(codes.length == 0) + throw new IllegalArgumentException("codes is empty"); + if(nUnique <= 0) + throw new IllegalArgumentException("Invalid alphabet size: " + nUnique); + if(nRows <= 0) { + throw new IllegalArgumentException("Invalid nRows: " + nRows); + } + if(index > nRows) { + throw new IllegalArgumentException("Index is larger than Data Length: " + index); + } + + // Return empty Map if index is zero. + if(index == 0) + return MapToFactory.create(0, nUnique); + + // Maps: code -> packKey(prefixCode, lastSymbolOfPhrase). + // Base symbols (0..nUnique-1) are implicit and not stored here. + final Map dict = new HashMap<>(); + + // Output mapping that will be reconstructed. + AMapToData out = MapToFactory.create(index, nUnique); + int outPos = 0; // Current write position in the output mapping. + + // Decode the first code. The first code always expands to a valid phrase without needing + // any dictionary entries. + int old = codes[0]; + int[] oldPhrase = unpack(old, nUnique, dict); + + for(int v : oldPhrase) { + if(outPos == index) + break; + out.set(outPos++, v); + } + + // Next free dictionary code. Codes 0..nUnique-1 are reserved for base symbols. + int nextCode = nUnique; + + // Process remaining codes. + for(int i = 1; i < codes.length; i++) { + int key = codes[i]; + + int[] next; + if(key < nUnique || dict.containsKey(key)) { + // Normal case: The code is either a base symbol or already present in the dictionary. + next = unpack(key, nUnique, dict); + } + else { + // KwKwK special case: The current code refers to a phrase that is being defined right now. + // next = oldPhrase + first(oldPhrase). + int first = oldPhrase[0]; + next = packint(oldPhrase, first); + } + + // Append the reconstructed phrase to the output mapping. + for(int v : next) { + out.set(outPos++, v); + if(outPos == index) + // Stop immediately once done. + return out; + } + + // Add new phrase to dictionary: nextCode -> (old, firstSymbol(next)). + final int first = next[0]; + dict.put(nextCode++, packKey(old, first)); + + // Advance. + old = key; + oldPhrase = next; + } + + // Safety check: decoder must produce exactly nRows symbols. + if(outPos != index) + throw new IllegalStateException("Decompression length mismatch: got " + outPos + " expected " + index); + + // Return the reconstructed mapping. + return out; + } + + // Build Constructor: Used when creating a new DDCLZW instance during compression/build time. (TODO) + private ColGroupDDCLZW(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) { + super(colIndexes, dict, cachedCounts); + + // Derive metadadata + _nRows = data.size(); + _nUnique = dict.getNumberOfValues(colIndexes.size()); + + // Compress mapping to LZW + _dataLZW = compress(data); + + if(CompressedMatrixBlock.debug) { + if(getNumValues() == 0) + throw new DMLCompressionException("Invalid construction with empty dictionary"); + if(_nRows == 0) + throw new DMLCompressionException("Invalid length of the data. is zero"); + if(data.getUnique() != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException( + "Invalid map to dict Map has:" + data.getUnique() + " while dict has " + + dict.getNumberOfValues(colIndexes.size())); + int[] c = getCounts(); + if(c.length != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid DDC Construction"); + data.verify(); + } + } + + // Read Constructor: Used when creating this group from a serialized form (e.g., reading a compressed matrix from disk/memory stream). (TODO) + private ColGroupDDCLZW(IColIndex colIndexes, IDictionary dict, int[] dataLZW, int nRows, int nUnique, + int[] cachedCounts) { + super(colIndexes, dict, cachedCounts); + + _dataLZW = dataLZW; + _nRows = nRows; + _nUnique = nUnique; + + if(CompressedMatrixBlock.debug) { + if(getNumValues() == 0) + throw new DMLCompressionException("Invalid construction with empty dictionary"); + if(_nRows <= 0) + throw new DMLCompressionException("Invalid length of the data. is zero"); + if(_nUnique != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid map to dict Map has:" + _nUnique + " while dict has " + + dict.getNumberOfValues(colIndexes.size())); + int[] c = getCounts(); + if(c.length != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid DDC Construction"); + } + } + + // Factory method for creating a column group. (AColGroup g = ColGroupDDCLZW.create(...);) + public static AColGroup create(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) { + if(dict == null) + return new ColGroupEmpty(colIndexes); + else if(data.getUnique() == 1) + return ColGroupConst.create(colIndexes, dict); + else + return new ColGroupDDCLZW(colIndexes, dict, data, cachedCounts); + } + + /* + * TODO: Operations with complex access patterns shall be uncompressed to ddc format. + * ... return ColGroupDDC.create(...,decompress(_dataLZW),...). We need to decide which methods are + * suitable for sequential and which arent. those who arent then we shall materialize and fall back to ddc + * */ + + public AColGroup convertToDDC() { + final AMapToData map = decompress(_dataLZW, _nUnique, _nRows, _nRows); + final int[] counts = getCounts(); // may be null depending on your group + return ColGroupDDC.create(_colIndexes, _dict, map, counts); + } + + public AColGroup convertToDDC(int index) { + final AMapToData map = decompress(_dataLZW, _nUnique, _nRows, index); + final int[] counts = getCounts(); // may be null depending on your group + return ColGroupDDC.create(_colIndexes, _dict, map, counts); + } + + // Deserialize ColGroupDDCLZW object in binary stream. + public static ColGroupDDCLZW read(DataInput in) throws IOException { + final IColIndex colIndexes = ColIndexFactory.read(in); + final IDictionary dict = DictionaryFactory.read(in); + + // Metadata for lzw mapping. + final int nRows = in.readInt(); + final int nUnique = in.readInt(); + + // Read compressed mapping array. + final int len = in.readInt(); + if(len < 0) + throw new IOException("Invalid LZW data length: " + len); + + final int[] dataLZW = new int[len]; + for(int i = 0; i < len; i++) + dataLZW[i] = in.readInt(); + + // cachedCounts currently not serialized (mirror ColGroupDDC.read which passes null) + return new ColGroupDDCLZW(colIndexes, dict, dataLZW, nRows, nUnique, null); + } + + // Serialize a ColGroupDDC-object into binary stream. + @Override + public void write(DataOutput out) throws IOException { + _colIndexes.write(out); + _dict.write(out); + out.writeInt(_nRows); + out.writeInt(_nUnique); + out.writeInt(_dataLZW.length); // TODO: correct ? + for(int i : _dataLZW) + out.writeInt(i); + } + + @Override + public double getIdx(int r, int colIdx) { + // TODO: soll schnell sein + final AMapToData map = decompress(_dataLZW, _nUnique, _nRows, r); + // TODO: ColumnIndex + return map.getIndex(r); + } + + @Override + public CompressionType getCompType() { + return CompressionType.DDCLZW; + } + + @Override + protected ColGroupType getColGroupType() { + return ColGroupType.DDCLZW; + } + + @Override + public boolean containsValue(double pattern) { + return _dict.containsValue(pattern); + } + + @Override + public double getCost(ComputationCostEstimator e, int nRows) { + final int nVals = getNumValues(); + final int nCols = getNumCols(); + return e.getCost(nRows, nRows, nCols, nVals, _dict.getSparsity()); + } + + @Override + public ICLAScheme getCompressionScheme() { + //TODO: in ColGroupDDCFor nicht implementiert - sollen wir das erstellen? Inhalt: ncols wie DDC + throw new NotImplementedException(); + } + + @Override + protected int numRowsToMultiply() { + return _nRows; + } + + @Override + protected AColGroup copyAndSet(IColIndex colIndexes, IDictionary newDictionary) { + return new ColGroupDDCLZW(colIndexes, newDictionary, _dataLZW, _nRows, _nUnique, getCachedCounts()); + } + + @Override + public long getExactSizeOnDisk() { + long ret = super.getExactSizeOnDisk(); + ret += 4; // _nRows size + ret += 4; // _nUnique size + ret += 4; // dataLZW.length + ret += (long) _dataLZW.length * 4; //lzw codes + return ret; + } + + @Override + public AMapToData getMapToData() { + throw new NotImplementedException(); // or decompress and return data... decompress(_dataLZW, _nUnique, _nRows, _nRows) + } + + @Override + public boolean sameIndexStructure(AColGroupCompressed that) { + return that instanceof ColGroupDDCLZW && ((ColGroupDDCLZW) that)._dataLZW == _dataLZW; + } + + @Override + protected double computeMxx(double c, Builtin builtin) { + return _dict.aggregate(c, builtin); + } + + @Override + protected void computeColMxx(double[] c, Builtin builtin) { + _dict.aggregateCols(c, builtin, _colIndexes); + } + + @Override + public AColGroup sliceRows(int rl, int ru) { + try { + AMapToData map = decompress(_dataLZW, _nUnique, _nRows, ru); + return ColGroupDDCLZW.create(_colIndexes, _dict, map.slice(rl, ru), null); + } + catch(Exception e) { + throw new DMLRuntimeException("Failed to slice out sub part DDCLZW: " + rl + ", " + ru, e); + } + } + + @Override + protected void decompressToDenseBlockTransposedSparseDictionary(DenseBlock db, int rl, int ru, SparseBlock dict) { + + } + + @Override + protected void decompressToDenseBlockTransposedDenseDictionary(DenseBlock db, int rl, int ru, double[] dict) { + + } + + @Override + protected void decompressToSparseBlockTransposedSparseDictionary(SparseBlockMCSR db, SparseBlock dict, + int nColOut) { + + } + + @Override + protected void decompressToSparseBlockTransposedDenseDictionary(SparseBlockMCSR db, double[] dict, int nColOut) { + + } + + @Override + protected void decompressToDenseBlockSparseDictionary(DenseBlock db, int rl, int ru, int offR, int offC, + SparseBlock sb) { + + } + + @Override + protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC, + double[] values) { + + } + + @Override + protected void decompressToSparseBlockSparseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC, + SparseBlock sb) { + + } + + @Override + protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC, + double[] values) { + + } + + @Override + public void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock result, int rl, int ru, int cl, int cu) { + + } + + @Override + public AColGroup scalarOperation(ScalarOperator op) { + return null; + } + + @Override + public AColGroup binaryRowOpLeft(BinaryOperator op, double[] v, boolean isRowSafe) { + return null; + } + + @Override + public AColGroup binaryRowOpRight(BinaryOperator op, double[] v, boolean isRowSafe) { + return null; + } + + @Override + public AColGroup unaryOperation(UnaryOperator op) { + return null; + } + + @Override + public AColGroup append(AColGroup g) { + return null; + } + + @Override + protected AColGroup appendNInternal(AColGroup[] groups, int blen, int rlen) { + return null; + } + + @Override + public AColGroup recompress() { + return null; + } + + @Override + public CompressedSizeInfoColGroup getCompressionInfo(int nRow) { + return null; + } + + @Override + protected AColGroup fixColIndexes(IColIndex newColIndex, int[] reordering) { + return null; + } + + @Override + protected void sparseSelection(MatrixBlock selection, P[] points, MatrixBlock ret, int rl, int ru) { + + } + + @Override + protected void denseSelection(MatrixBlock selection, P[] points, MatrixBlock ret, int rl, int ru) { + + } + + @Override + public AColGroup[] splitReshape(int multiplier, int nRow, int nColOrg) { + return new AColGroup[0]; + } + + @Override + protected boolean allowShallowIdentityRightMult() { + return false; + } + + @Override + protected AColGroup allocateRightMultiplication(MatrixBlock right, IColIndex colIndexes, IDictionary preAgg) { + return null; + } + + @Override + public void preAggregateDense(MatrixBlock m, double[] preAgg, int rl, int ru, int cl, int cu) { + + } + + @Override + public void preAggregateSparse(SparseBlock sb, double[] preAgg, int rl, int ru, int cl, int cu) { + + } + + @Override + protected void preAggregateThatDDCStructure(ColGroupDDC that, Dictionary ret) { + + } + + @Override + protected void preAggregateThatSDCZerosStructure(ColGroupSDCZeros that, Dictionary ret) { + + } + + @Override + protected void preAggregateThatSDCSingleZerosStructure(ColGroupSDCSingleZeros that, Dictionary ret) { + + } + + @Override + protected void preAggregateThatRLEStructure(ColGroupRLE that, Dictionary ret) { + + } + + @Override + public void leftMMIdentityPreAggregateDense(MatrixBlock that, MatrixBlock ret, int rl, int ru, int cl, int cu) { + + } + + @Override + protected int[] getCounts(int[] out) { + return new int[0]; // If returns exeption test wont work. + } + + @Override + protected void computeRowSums(double[] c, int rl, int ru, double[] preAgg) { + + } + + @Override + protected void computeRowMxx(double[] c, Builtin builtin, int rl, int ru, double[] preAgg) { + + } + + @Override + protected void computeRowProduct(double[] c, int rl, int ru, double[] preAgg) { + + } +} + diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDDCLZWTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDDCLZWTest.java new file mode 100644 index 00000000000..dfc83673a90 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDDCLZWTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.compress.colgroup; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.EnumSet; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.compress.CompressionSettingsBuilder; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.ColGroupDeltaDDC; +import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory; +import org.apache.sysds.runtime.compress.colgroup.ColGroupIO; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.estim.ComEstExact; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.functionobjects.Builtin; +import org.apache.sysds.runtime.functionobjects.Divide; +import org.apache.sysds.runtime.functionobjects.Equals; +import org.apache.sysds.runtime.functionobjects.Multiply; +import org.apache.sysds.runtime.functionobjects.GreaterThan; +import org.apache.sysds.runtime.functionobjects.Minus; +import org.apache.sysds.runtime.functionobjects.Plus; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import org.apache.sysds.runtime.util.DataConverter; +import org.junit.Test; + +public class ColGroupDDCLZWTest { + protected static final Log LOG = LogFactory.getLog(ColGroupDDCLZWTest.class.getName()); + + // TODO: use csb instead of create. + /*CompressionSettingsBuilder csb = new CompressionSettingsBuilder().setSamplingRatio(1.0) + .setValidCompressions(EnumSet.of(AColGroup.CompressionType.DDCLZW)) + .setTransposeInput("false"); + CompressionSettings cs = csb.create(); + + final CompressedSizeInfoColGroup cgi = new ComEstExact(mbt, cs).getColGroupInfo(colIndexes); + CompressedSizeInfo csi = new CompressedSizeInfo(cgi); + AColGroup cg = ColGroupFactory.compressColGroups(mbt, csi, cs, 1).get(0);*/ + +} diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDDCTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDDCTest.java index 0f04cfc9c27..dd06226e093 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDDCTest.java +++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDDCTest.java @@ -25,9 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.sysds.runtime.compress.colgroup.AColGroup; -import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC; -import org.apache.sysds.runtime.compress.colgroup.ColGroupDeltaDDC; +import org.apache.sysds.runtime.compress.colgroup.*; import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; @@ -40,6 +38,92 @@ public class ColGroupDDCTest { protected static final Log LOG = LogFactory.getLog(ColGroupDDCTest.class.getName()); + @Test + public void testConvertToDDCLZWBasic() { + // TODO: neue Methode zum Vergleich + IColIndex colIndexes = ColIndexFactory.create(2); + double[] dictValues = new double[] {10.0, 20.0, 11.0, 21.0, 12.0, 22.0}; + Dictionary dict = Dictionary.create(dictValues); + + int[] src = new int[] { + // repeating base pattern + 0, 0, 2, 0, 2, 1, 0, 2, 1, 0, 2, 2, 0, 2, 1, 0, 2, 1, 0, 2, + + // variation / shifted pattern + 1, 0, 1, 2, 0, 1, 2, 0, 1, 1, 0, 1, 2, 0, 1, 2, 0, 1, + + // longer runs (good for phrase growth) + 2, 2, 2, 2, 2, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, + + // mixed noise + 2, 1, 0, 2, 1, 0, 2, 1, 0, 0, 2, 1, 0, 2, 1, 0, 2, 1, 1, 1, 1, 1, 1, 1, + + // repeating tail (tests dictionary reuse) + 2, 0, 2, 1, 0, 2, 1, 0, 2, 2, 0, 2, 1, 0, 2, 1, 0, 2, 0, 0, 0, 0, 0, 1}; + + final int nRows = src.length; + final int nUnique = 3; + AMapToData data = MapToFactory.create(nRows, nUnique); + for(int i = 0; i < nRows; i++) + data.set(i, src[i]); + + ColGroupDDC ddc = (ColGroupDDC) ColGroupDDC.create(colIndexes, dict, data, null); + AColGroup result = ddc.convertToDDCLZW(); + + assertNotNull(result); + assertTrue(result instanceof ColGroupDDCLZW); + + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) result; + AColGroup ddclzwDecompressed = ddclzw.convertToDDC(); + + assertNotNull(ddclzwDecompressed); + assertTrue(ddclzwDecompressed instanceof ColGroupDDC); + + ColGroupDDC ddc2 = (ColGroupDDC) ddclzwDecompressed; + + AMapToData d1 = ddc.getMapToData(); + AMapToData d2 = ddc2.getMapToData(); + + assertEquals(d1.size(), d2.size()); + assertEquals(d1.getUnique(), d2.getUnique()); + for(int i = 0; i < d1.size(); i++) + assertEquals("mapping mismatch at row " + i, d1.getIndex(i), d2.getIndex(i)); + + assertEquals(ddc.getColIndices(), ddc2.getColIndices()); + + // Testen der Teildekompression: + // Index entspricht der Anzahl der Zeichen, die dekodiert werden sollen (0 bis Index-1) + int index = 10; + ColGroupDDC ddcIndex = (ColGroupDDC) ddclzw.convertToDDC(index); + + AMapToData d3 = ddcIndex.getMapToData(); + assertEquals(index, d3.size()); + assertEquals(ddc.getColIndices(), ddcIndex.getColIndices()); + + for(int i = 0; i < index; i++) { + assertEquals(d1.getIndex(i), d3.getIndex(i)); + } + + // Testen von SliceRows + int low = 3; + int high = 10; + AColGroup slice = ddclzw.sliceRows(low, high); + if(slice instanceof ColGroupDDCLZW ddclzwslice) { + ColGroupDDC ddcSlice = (ColGroupDDC) ddclzwslice.convertToDDC(); + ColGroupDDC ddcSlice2 = (ColGroupDDC) ddc.sliceRows(low, high); + + AMapToData d4 = ddcSlice.getMapToData(); + AMapToData d5 = ddcSlice2.getMapToData(); + + assertEquals(d5.size(), d4.size()); + assertEquals(d5.getUnique(), d4.getUnique()); + + for(int i = 0; i < d4.size(); i++) + assertEquals("mapping mismatch at row " + i, d4.getIndex(i), d5.getIndex(i)); + } + + } + @Test public void testConvertToDeltaDDCBasic() { IColIndex colIndexes = ColIndexFactory.create(2);