From 09e6fd0a6dfe26ba593ae73450871bee74abc378 Mon Sep 17 00:00:00 2001 From: Vi Vuong Date: Mon, 28 Jul 2025 17:09:28 +0200 Subject: [PATCH 1/2] FIXME in spark instruction --- .../hops/codegen/template/TemplateRow.java | 2 +- .../sysds/resource/cost/CPCostUtils.java | 1 + .../instructions/GPUInstructionParser.java | 1 + .../fed/CumulativeOffsetFEDInstruction.java | 2 + .../fed/UnaryMatrixFEDInstruction.java | 2 +- .../gpu/MatrixBuiltinGPUInstruction.java | 4 + .../CumulativeAggregateSPInstruction.java | 113 +++++++-- .../spark/CumulativeOffsetSPInstruction.java | 221 ++++++++++++++---- .../spark/UnaryMatrixSPInstruction.java | 197 +--------------- 9 files changed, 296 insertions(+), 247 deletions(-) diff --git a/src/main/java/org/apache/sysds/hops/codegen/template/TemplateRow.java b/src/main/java/org/apache/sysds/hops/codegen/template/TemplateRow.java index 955bf778b8b..fc430374599 100644 --- a/src/main/java/org/apache/sysds/hops/codegen/template/TemplateRow.java +++ b/src/main/java/org/apache/sysds/hops/codegen/template/TemplateRow.java @@ -71,7 +71,7 @@ public class TemplateRow extends TemplateBase private static final OpOp1[] SUPPORTED_VECT_UNARY = new OpOp1[]{ OpOp1.EXP, OpOp1.SQRT, OpOp1.LOG, OpOp1.ABS, OpOp1.ROUND, OpOp1.CEIL, OpOp1.FLOOR, OpOp1.SIGN, OpOp1.SIN, OpOp1.COS, OpOp1.TAN, OpOp1.ASIN, OpOp1.ACOS, OpOp1.ATAN, OpOp1.SINH, OpOp1.COSH, OpOp1.TANH, - OpOp1.CUMSUM, OpOp1.CUMMIN, OpOp1.CUMMAX, OpOp1.SPROP, OpOp1.SIGMOID}; + OpOp1.CUMSUM, OpOp1.ROWCUMSUM, OpOp1.CUMMIN, OpOp1.CUMMAX, OpOp1.SPROP, OpOp1.SIGMOID}; private static final OpOp2[] SUPPORTED_VECT_BINARY = new OpOp2[]{ OpOp2.MULT, OpOp2.DIV, OpOp2.MINUS, OpOp2.PLUS, OpOp2.POW, OpOp2.MIN, OpOp2.MAX, OpOp2.XOR, OpOp2.EQUAL, OpOp2.NOTEQUAL, OpOp2.LESS, OpOp2.LESSEQUAL, OpOp2.GREATER, OpOp2.GREATEREQUAL, diff --git a/src/main/java/org/apache/sysds/resource/cost/CPCostUtils.java b/src/main/java/org/apache/sysds/resource/cost/CPCostUtils.java index 0558e17c9cd..47261198c15 100644 --- a/src/main/java/org/apache/sysds/resource/cost/CPCostUtils.java +++ b/src/main/java/org/apache/sysds/resource/cost/CPCostUtils.java @@ -481,6 +481,7 @@ public static long getInstNFLOP( costs = 40; break; case "ucumk+": + case "urowcumk+": case "ucummin": case "ucummax": case "ucum*": diff --git a/src/main/java/org/apache/sysds/runtime/instructions/GPUInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/GPUInstructionParser.java index 4fca2ad0eae..cf767f3dc07 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/GPUInstructionParser.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/GPUInstructionParser.java @@ -145,6 +145,7 @@ public class GPUInstructionParser extends InstructionParser // Cumulative Ops String2GPUInstructionType.put( "ucumk+" , GPUINSTRUCTION_TYPE.BuiltinUnary); + String2GPUInstructionType.put( "urowcumk+", GPUINSTRUCTION_TYPE.BuiltinUnary); String2GPUInstructionType.put( "ucum*" , GPUINSTRUCTION_TYPE.BuiltinUnary); String2GPUInstructionType.put( "ucumk+*" , GPUINSTRUCTION_TYPE.BuiltinUnary); String2GPUInstructionType.put( "ucummin" , GPUINSTRUCTION_TYPE.BuiltinUnary); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/CumulativeOffsetFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/CumulativeOffsetFEDInstruction.java index 0e333285c0f..c4d1f308774 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/fed/CumulativeOffsetFEDInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/CumulativeOffsetFEDInstruction.java @@ -51,6 +51,8 @@ private CumulativeOffsetFEDInstruction(Operator op, CPOperand in1, CPOperand in2 if ("bcumoffk+".equals(opcode)) _uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucumk+")); + else if ("browcumoffk+".equals(opcode)) + _uop = new UnaryOperator(Builtin.getBuiltinFnObject("urowcumk+")); else if ("bcumoff*".equals(opcode)) _uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucum*")); else if ("bcumoff+*".equals(opcode)) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java index f125c01d4b0..61f9c4e06d0 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java @@ -76,7 +76,7 @@ public static UnaryMatrixFEDInstruction parseInstruction(String str) { in.split(parts[1]); out.split(parts[2]); ValueFunction func = Builtin.getBuiltinFnObject(opcode); - if(Arrays.asList(new String[] {"ucumk+", "ucum*", "ucumk+*", "ucummin", "ucummax", "exp", "log", "sigmoid"}) + if(Arrays.asList(new String[] {"ucumk+", "urowcumk+", "ucum*", "ucumk+*", "ucummin", "ucummax", "exp", "log", "sigmoid"}) .contains(opcode)) { UnaryOperator op = new UnaryOperator(func, Integer.parseInt(parts[3]), Boolean.parseBoolean(parts[4])); return new UnaryMatrixFEDInstruction(op, in, out, opcode, str); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/gpu/MatrixBuiltinGPUInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/gpu/MatrixBuiltinGPUInstruction.java index 0557ccc2791..250b0bf843a 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/gpu/MatrixBuiltinGPUInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/gpu/MatrixBuiltinGPUInstruction.java @@ -92,6 +92,10 @@ public void processInstruction(ExecutionContext ec) { LibMatrixCUDA.cumulativeScan(ec, ec.getGPUContext(0), getExtendedOpcode(), "cumulative_sum", mat, _output.getName()); break; + case "urowcumk+": + LibMatrixCUDA.cumulativeScan(ec, ec.getGPUContext(0), getExtendedOpcode(), "row_cumulative_sum", mat, + _output.getName()); + break; case "ucum*": LibMatrixCUDA.cumulativeScan(ec, ec.getGPUContext(0), getExtendedOpcode(), "cumulative_prod", mat, _output.getName()); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeAggregateSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeAggregateSPInstruction.java index b511ac2e257..8c776abe56d 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeAggregateSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeAggregateSPInstruction.java @@ -25,9 +25,11 @@ import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysds.runtime.functionobjects.Builtin; +import org.apache.sysds.runtime.functionobjects.KahanPlus; import org.apache.sysds.runtime.functionobjects.PlusMultiply; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CPOperand; +import org.apache.sysds.runtime.instructions.cp.KahanObject; import org.apache.sysds.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysds.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysds.runtime.matrix.data.MatrixBlock; @@ -52,62 +54,135 @@ public static CumulativeAggregateSPInstruction parseInstruction( String str ) { CPOperand in1 = new CPOperand(parts[1]); CPOperand out = new CPOperand(parts[2]); AggregateUnaryOperator aggun = InstructionUtils.parseCumulativeAggregateUnaryOperator(opcode); - return new CumulativeAggregateSPInstruction(aggun, in1, out, opcode, str); + return new CumulativeAggregateSPInstruction(aggun, in1, out, opcode, str); } - + @Override public void processInstruction(ExecutionContext ec) { SparkExecutionContext sec = (SparkExecutionContext)ec; DataCharacteristics mc = sec.getDataCharacteristics(input1.getName()); + + //get input + JavaPairRDD in = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() ); + + if ("urowcumk+".equals(getOpcode())) { + processRowCumsum(sec, in, mc); + } else { + processCumsum(sec, in, mc); + } + } + + private void processRowCumsum(SparkExecutionContext sec, JavaPairRDD in, DataCharacteristics mc) { + JavaPairRDD localRowCumsum = + in.mapToPair(new LocalRowCumsumFunction()); + + sec.setRDDHandleForVariable(output.getName(), localRowCumsum); + sec.addLineageRDD(output.getName(), input1.getName()); + sec.getDataCharacteristics(output.getName()).set(mc); + } + + public static Tuple2, JavaPairRDD> + processRowCumsumWithEndValues(JavaPairRDD in) { + JavaPairRDD localRowCumsum = + in.mapToPair(new LocalRowCumsumFunction()); + + JavaPairRDD endValues = + localRowCumsum.mapToPair(new ExtractEndValuesFunction()); + + return new Tuple2<>(localRowCumsum, endValues); + } + + private void processCumsum(SparkExecutionContext sec, JavaPairRDD in, DataCharacteristics mc) { DataCharacteristics mcOut = new MatrixCharacteristics(mc); long rlen = mc.getRows(); int blen = mc.getBlocksize(); mcOut.setRows((long)(Math.ceil((double)rlen/blen))); - - //get input - JavaPairRDD in = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() ); - + //execute unary aggregate (w/ implicit drop correction) AggregateUnaryOperator auop = (AggregateUnaryOperator) _optr; - JavaPairRDD out = - in.mapToPair(new RDDCumAggFunction(auop, rlen, blen)); + JavaPairRDD out = + in.mapToPair(new RDDCumAggFunction(auop, rlen, blen)); //merge partial aggregates, adjusting for correct number of partitions //as size can significant shrink (1K) but also grow (sparse-dense) int numParts = SparkUtils.getNumPreferredPartitions(mcOut); int minPar = (int)Math.min(SparkExecutionContext.getDefaultParallelism(true), mcOut.getNumBlocks()); out = RDDAggregateUtils.mergeByKey(out, Math.max(numParts, minPar), false); - + //put output handle in symbol table sec.setRDDHandleForVariable(output.getName(), out); sec.addLineageRDD(output.getName(), input1.getName()); sec.getDataCharacteristics(output.getName()).set(mcOut); } - private static class RDDCumAggFunction implements PairFunction, MatrixIndexes, MatrixBlock> + private static class LocalRowCumsumFunction implements PairFunction, MatrixIndexes, MatrixBlock> { + private static final long serialVersionUID = 123L; + + @Override + public Tuple2 call(Tuple2 kv) throws Exception { + MatrixIndexes idx = kv._1; + MatrixBlock inputBlock = kv._2; + MatrixBlock outBlock = new MatrixBlock(inputBlock.getNumRows(), inputBlock.getNumColumns(), false); + + for (int i = 0; i < inputBlock.getNumRows(); i++) { + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); + + for (int j = 0; j < inputBlock.getNumColumns(); j++) { + double val = inputBlock.get(i, j); + kplus.execute2(kbuff, val); + outBlock.set(i, j, kbuff._sum); + } + } + // original index, original matrix and local cumsum block + return new Tuple2<>(idx, outBlock); + } + } + + private static class ExtractEndValuesFunction implements PairFunction, MatrixIndexes, MatrixBlock> { + private static final long serialVersionUID = 123L; + + @Override + public Tuple2 call(Tuple2 kv) throws Exception { + MatrixIndexes idx = kv._1; + MatrixBlock cumsumBlock = kv._2; + + MatrixBlock endValuesBlock = new MatrixBlock(cumsumBlock.getNumRows(), 1, false); + for (int i = 0; i < cumsumBlock.getNumRows(); i++) { + if (cumsumBlock.getNumColumns() > 0) { + endValuesBlock.set(i, 0, cumsumBlock.get(i, cumsumBlock.getNumColumns() - 1)); + } else { + endValuesBlock.set(i, 0, 0.0); + } + } + return new Tuple2<>(idx, endValuesBlock); + } + } + + private static class RDDCumAggFunction implements PairFunction, MatrixIndexes, MatrixBlock> { private static final long serialVersionUID = 11324676268945117L; - + private final AggregateUnaryOperator _op; private UnaryOperator _uop = null; private final long _rlen; private final int _blen; - + public RDDCumAggFunction( AggregateUnaryOperator op, long rlen, int blen ) { _op = op; _rlen = rlen; _blen = blen; } - + @Override - public Tuple2 call( Tuple2 arg0 ) - throws Exception + public Tuple2 call( Tuple2 arg0 ) + throws Exception { MatrixIndexes ixIn = arg0._1(); MatrixBlock blkIn = arg0._2(); MatrixIndexes ixOut = new MatrixIndexes(); MatrixBlock blkOut = new MatrixBlock(); - + //process instruction AggregateUnaryOperator aop = _op; if( aop.aggOp.increOp.fn instanceof PlusMultiply ) { //cumsumprod @@ -125,19 +200,19 @@ public Tuple2 call( Tuple2(ixOut, blkOut2); } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java index 61b61b15332..9b57713e7ac 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java @@ -40,10 +40,10 @@ import org.apache.sysds.runtime.meta.DataCharacteristics; import org.apache.sysds.runtime.util.DataConverter; import org.apache.sysds.runtime.util.UtilFunctions; +import scala.Serializable; import scala.Tuple2; -import java.util.ArrayList; -import java.util.Iterator; +import java.util.*; public class CumulativeOffsetSPInstruction extends BinarySPInstruction { private UnaryOperator _uop = null; @@ -92,12 +92,17 @@ public void processInstruction(ExecutionContext ec) { DataCharacteristics mc2 = sec.getDataCharacteristics(input2.getName()); long rlen = mc2.getRows(); int blen = mc2.getBlocksize(); - + + if (Opcodes.BROWCUMOFFKP.toString().equals(getOpcode())) { + processRowCumsumOffsets(sec, mc1, mc2); + return; + } + //get and join inputs JavaPairRDD inData = sec.getBinaryMatrixBlockRDDHandleForVariable(input1.getName()); JavaPairRDD> joined = null; boolean broadcast = _broadcast && !SparkUtils.isHashPartitioned(inData); - + if( broadcast ) { //broadcast offsets and broadcast join with data PartitionedBroadcast inAgg = sec.getBroadcastForVariable(input2.getName()); @@ -106,18 +111,18 @@ public void processInstruction(ExecutionContext ec) { else { //prepare aggregates (cumsplit of offsets) and repartition join with data joined = inData.join(sec - .getBinaryMatrixBlockRDDHandleForVariable(input2.getName()) - .flatMapToPair(new RDDCumSplitFunction(_initValue, rlen, blen))); + .getBinaryMatrixBlockRDDHandleForVariable(input2.getName()) + .flatMapToPair(new RDDCumSplitFunction(_initValue, rlen, blen))); } - + //execute cumulative offset (apply cumulative op w/ offsets) JavaPairRDD out = joined - .mapValues(new RDDCumOffsetFunction(_uop, _cumsumprod)); - + .mapValues(new RDDCumOffsetFunction(_uop, _cumsumprod)); + //put output handle in symbol table if( _cumsumprod ) sec.getDataCharacteristics(output.getName()) - .set(mc1.getRows(), 1, mc1.getBlocksize(), mc1.getBlocksize()); + .set(mc1.getRows(), 1, mc1.getBlocksize(), mc1.getBlocksize()); else //general case updateUnaryOutputDataCharacteristics(sec); sec.setRDDHandleForVariable(output.getName(), out); @@ -125,6 +130,94 @@ public void processInstruction(ExecutionContext ec) { sec.addLineage(output.getName(), input2.getName(), broadcast); } + public static JavaPairRDD processRowCumsumOffsetsDirectly( + JavaPairRDD localRowCumsum, + JavaPairRDD endValues) { + + // Collect end-values of every block of every row for offset calc by grouping by global row index + JavaPairRDD>> rowEndValues = endValues + .mapToPair(t -> { + // get index of block + MatrixIndexes idx = t._1; + // get cum matrix block + MatrixBlock endValuesBlock = t._2; + + // get row and column block index + long rowBlockIdx = idx.getRowIndex(); + long colBlockIdx = idx.getColumnIndex(); + + // Save end value of every row of every block (if block is empty save 0) + double[] lastValues = new double[endValuesBlock.getNumRows()]; + for (int i = 0; i < endValuesBlock.getNumRows(); i++) { + lastValues[i] = endValuesBlock.get(i, 0); + } + + return new Tuple2<>(rowBlockIdx, new Tuple3<>(rowBlockIdx, colBlockIdx, lastValues)); + }) + .groupByKey(); + + // compute offset for every block + List, double[]>> offsetList = rowEndValues + .flatMapToPair(t -> { + Long rowBlockIdx = t._1; + List> colValues = new ArrayList<>(); + for (Tuple3 cv : t._2) { + colValues.add(cv); + } + + // sort blocks from one row by column index + colValues.sort(Comparator.comparing(Tuple3::_2)); + + // get number of rows of a block by counting amount of end (row) values of said block + int numRows = 0; + if (!colValues.isEmpty()) { + double[] lastValuesArray = colValues.get(0)._3(); + numRows = lastValuesArray.length; + } + + List, double[]>> blockOffsets = new ArrayList<>(); + double[] cumulativeOffsets = new double[numRows]; + + for (Tuple3 colValue : colValues) { + Long colBlockIdx = colValue._2(); + double[] rowendValues = colValue._3(); + + // copy current offsets + double[] currentOffsets = cumulativeOffsets.clone(); + + // and save block indexes with its offsets + blockOffsets.add(new Tuple2<>(new Tuple2<>(rowBlockIdx, colBlockIdx), currentOffsets)); + + for (int i = 0; i < numRows && i < rowendValues.length; i++) { + cumulativeOffsets[i] += rowendValues[i]; + } + } + return blockOffsets.iterator(); + }) + .collect(); + + // convert list to map for easier access to offsets + Map, double[]> offsetMap = new HashMap<>(); + for (Tuple2, double[]> entry : offsetList) { + offsetMap.put(entry._1, entry._2); + } + return localRowCumsum.mapToPair(new FinalRowCumsumFunction(offsetMap)); + } + + private void processRowCumsumOffsets(SparkExecutionContext sec, DataCharacteristics mc1, DataCharacteristics mc2) { + JavaPairRDD localRowCumsum = + sec.getBinaryMatrixBlockRDDHandleForVariable(input1.getName()); + JavaPairRDD endValues = + sec.getBinaryMatrixBlockRDDHandleForVariable(input2.getName()); + + JavaPairRDD out = processRowCumsumOffsetsDirectly(localRowCumsum, endValues); + + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), input1.getName()); + sec.addLineageRDD(output.getName(), input2.getName()); + sec.getDataCharacteristics(output.getName()).set(mc1); + } + public double getInitValue() { return _initValue; } @@ -133,36 +226,86 @@ public boolean getBroadcast() { return _broadcast; } - private static class RDDCumSplitFunction implements PairFlatMapFunction, MatrixIndexes, MatrixBlock> + private static class FinalRowCumsumFunction implements PairFunction, MatrixIndexes, MatrixBlock> { + private static final long serialVersionUID = 1L; + // map block indexes to the row offsets + private final Map, double[]> offsetMap; + + public FinalRowCumsumFunction(Map, double[]> offsetMap) { + this.offsetMap = offsetMap; + } + + @Override + public Tuple2 call(Tuple2 tuple) throws Exception { + MatrixIndexes idx = tuple._1; + MatrixBlock localRowCumsumBlock = tuple._2; + + // key to get the row offset for this block + Tuple2 blockKey = new Tuple2<>(idx.getRowIndex(), idx.getColumnIndex()); + double[] offsets = offsetMap.get(blockKey); + + MatrixBlock outBlock = new MatrixBlock(localRowCumsumBlock.getNumRows(), localRowCumsumBlock.getNumColumns(), false); + + for (int i = 0; i < localRowCumsumBlock.getNumRows(); i++) { + double rowOffset = (offsets != null && i < offsets.length) ? offsets[i] : 0.0; + for (int j = 0; j < localRowCumsumBlock.getNumColumns(); j++) { + double cumsumValue = localRowCumsumBlock.get(i, j); + outBlock.set(i, j, cumsumValue + rowOffset); + } + } + // block index and final cumsum block + return new Tuple2<>(idx, outBlock); + } + } + + // helper class + private static class Tuple3 implements Serializable { + private static final long serialVersionUID = 1L; + private final T1 _1; + private final T2 _2; + private final T3 _3; + + public Tuple3(T1 _1, T2 _2, T3 _3) { + this._1 = _1; + this._2 = _2; + this._3 = _3; + } + + public T1 _1() { return _1; } + public T2 _2() { return _2; } + public T3 _3() { return _3; } + } + + private static class RDDCumSplitFunction implements PairFlatMapFunction, MatrixIndexes, MatrixBlock> { private static final long serialVersionUID = -8407407527406576965L; - + private double _initValue = 0; private int _blen = -1; private long _lastRowBlockIndex; - + public RDDCumSplitFunction( double initValue, long rlen, int blen ) { _initValue = initValue; _blen = blen; _lastRowBlockIndex = (long)Math.ceil((double)rlen/blen); } - + @Override - public Iterator> call( Tuple2 arg0 ) - throws Exception + public Iterator> call( Tuple2 arg0 ) + throws Exception { ArrayList> ret = new ArrayList<>(); - + MatrixIndexes ixIn = arg0._1(); MatrixBlock blkIn = arg0._2(); - + long rixOffset = (ixIn.getRowIndex()-1)*_blen; boolean firstBlk = (ixIn.getRowIndex() == 1); boolean lastBlk = (ixIn.getRowIndex() == _lastRowBlockIndex ); - - //introduce offsets w/ init value for first row - if( firstBlk ) { + + //introduce offsets w/ init value for first row + if( firstBlk ) { MatrixIndexes tmpix = new MatrixIndexes(1, ixIn.getColumnIndex()); MatrixBlock tmpblk = new MatrixBlock(1, blkIn.getNumColumns(), blkIn.isInSparseFormat()); if( _initValue != 0 ){ @@ -171,7 +314,7 @@ public Iterator> call( Tuple2(tmpix, tmpblk)); } - + //output splitting (shift by one), preaggregated offset used by subsequent block for( int i=0; i> call( Tuple2(tmpix, tmpblk)); } - + return ret.iterator(); } } - - private static class RDDCumSplitLookupFunction implements PairFunction, MatrixIndexes, Tuple2> + + private static class RDDCumSplitLookupFunction implements PairFunction, MatrixIndexes, Tuple2> { private static final long serialVersionUID = -2785629043886477479L; - + private final PartitionedBroadcast _pbc; private final double _initValue; private final int _blen; - + public RDDCumSplitLookupFunction(PartitionedBroadcast pbc, double initValue, long rlen, int blen) { _pbc = pbc; _initValue = initValue; _blen = blen; } - + @Override public Tuple2> call(Tuple2 arg0) throws Exception { MatrixIndexes ixIn = arg0._1(); MatrixBlock blkIn = arg0._2(); - + //compute block and row indexes long brix = UtilFunctions.computeBlockIndex(ixIn.getRowIndex()-1, _blen); int rix = UtilFunctions.computeCellInBlock(ixIn.getRowIndex()-1, _blen); - + //lookup offset row and return joined output MatrixBlock off = (ixIn.getRowIndex() == 1) ? new MatrixBlock(1, blkIn.getNumColumns(), _initValue) : - _pbc.getBlock((int)brix, (int)ixIn.getColumnIndex()).slice(rix, rix); + _pbc.getBlock((int)brix, (int)ixIn.getColumnIndex()).slice(rix, rix); return new Tuple2<>(ixIn, new Tuple2<>(blkIn,off)); } } - private static class RDDCumOffsetFunction implements Function, MatrixBlock> + private static class RDDCumOffsetFunction implements Function, MatrixBlock> { private static final long serialVersionUID = -5804080263258064743L; private final UnaryOperator _uop; private final boolean _cumsumprod; - + public RDDCumOffsetFunction(UnaryOperator uop, boolean cumsumprod) { _uop = uop; _cumsumprod = cumsumprod; @@ -231,17 +374,17 @@ public RDDCumOffsetFunction(UnaryOperator uop, boolean cumsumprod) { @Override public MatrixBlock call(Tuple2 arg0) throws Exception { //prepare inputs and outputs - MatrixBlock dblkIn = arg0._1(); //original data + MatrixBlock dblkIn = arg0._1(); //original data MatrixBlock oblkIn = arg0._2(); //offset row vector - + //allocate output block MatrixBlock blkOut = new MatrixBlock(dblkIn.getNumRows(), - _cumsumprod ? 1 : dblkIn.getNumColumns(), false); - + _cumsumprod ? 1 : dblkIn.getNumColumns(), false); + //blockwise cumagg computation, incl offset aggregation return LibMatrixAgg.cumaggregateUnaryMatrix(dblkIn, blkOut, _uop, - DataConverter.convertToDoubleVector(oblkIn, false, - ((Builtin)_uop.fn).bFunc == BuiltinCode.CUMSUM)); + DataConverter.convertToDoubleVector(oblkIn, false, + ((Builtin)_uop.fn).bFunc == BuiltinCode.CUMSUM)); } } } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java index eebeef0b2ec..6181d1c1d1a 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java @@ -21,28 +21,18 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.ValueType; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysds.runtime.functionobjects.KahanPlus; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CPOperand; -import org.apache.sysds.runtime.instructions.cp.KahanObject; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.matrix.operators.UnaryOperator; -import scala.Serializable; import scala.Tuple2; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class UnaryMatrixSPInstruction extends UnarySPInstruction { protected UnaryMatrixSPInstruction(Operator op, CPOperand in, CPOperand out, String opcode, String instr) { @@ -73,189 +63,22 @@ public void processInstruction(ExecutionContext ec) { sec.setRDDHandleForVariable(output.getName(), out); sec.addLineageRDD(output.getName(), input1.getName()); - //FIXME: implement similar to cumsum through + //FIXME: implement similar to cumsum through // CumulativeAggregateSPInstruction (Spark) // UnaryMatrixCPInstruction (local cumsum on aggregates) - // CumulativeOffsetSPInstruction (Spark) - if ( "urowcumk+".equals(getOpcode()) ) { - - JavaPairRDD< MatrixIndexes, Tuple2 > localRowcumsum = in.mapToPair( new LocalRowCumsumFunction() ); - - // Collect end-values of every block of every row for offset calc by grouping by global row index - JavaPairRDD< Long, Iterable> > rowEndValues = localRowcumsum - .mapToPair( tuple2 -> { - // get index of block - MatrixIndexes indexes = tuple2._1; - // get cum matrix block - MatrixBlock localRowcumsumBlock = tuple2._2._2; - - // get row and column block index - long rowBlockIndex = indexes.getRowIndex(); - long colBlockIndex = indexes.getColumnIndex(); - - // Save end value of every row of every block (if block is empty save 0) - double[] endValues = new double[ localRowcumsumBlock.getNumRows() ]; - - for ( int i = 0; i < localRowcumsumBlock.getNumRows(); i ++ ) { - if (localRowcumsumBlock.getNumColumns() > 0) - endValues[i] = localRowcumsumBlock.get(i, localRowcumsumBlock.getNumColumns() - 1); - else - endValues[i] = 0.0 ; - } - return new Tuple2<>(rowBlockIndex, new Tuple3<>(rowBlockIndex, colBlockIndex, endValues)); - } - ).groupByKey(); - - // compute offset for every block - List< Tuple2 , double[]> > offsetList = rowEndValues - .flatMapToPair(tuple2 -> { - Long rowBlockIndex = tuple2._1; - List< Tuple3 > colValues = new ArrayList<>(); - for ( Tuple3 cv : tuple2._2 ) - colValues.add(cv); - - // sort blocks from one row by column index - colValues.sort(Comparator.comparing(Tuple3::_2)); - - // get number of rows of a block by counting amount of end (row) values of said block - int numberOfRows = 0; - if ( !colValues.isEmpty() ) { - Tuple3 firstTuple = colValues.get(0); - double[] lastValuesArray = firstTuple._3(); - numberOfRows = lastValuesArray.length; - } - - List, double[]>> blockOffsets = new ArrayList<>(); - double[] cumulativeOffsets = new double[numberOfRows]; - for (Tuple3 colValue : colValues) { - Long colBlockIndex = colValue._2(); - double[] endValues = colValue._3(); - - // copy current offsets - double[] currentOffsets = cumulativeOffsets.clone(); - - // and save block indexes with its offsets - blockOffsets.add( new Tuple2<>(new Tuple2<>(rowBlockIndex, colBlockIndex), currentOffsets) ); - - for ( int i = 0; i < numberOfRows && i < endValues.length; i++ ) { - cumulativeOffsets[i] += endValues[i]; - } - } - return blockOffsets.iterator(); - } - ).collect(); - - // convert list to map for easier access to offsets - Map< Tuple2, double[] > offsetMap = new HashMap<>(); - for (Tuple2, double[]> offset : offsetList) { - offsetMap.put(offset._1, offset._2); - } - - out = localRowcumsum.mapToPair( new FinalRowCumsumFunction(offsetMap)) ; - - updateUnaryOutputDataCharacteristics(sec); - sec.setRDDHandleForVariable(output.getName(), out); - sec.addLineageRDD(output.getName(), input1.getName()); - } - } - - - - private static class LocalRowCumsumFunction implements PairFunction< Tuple2, MatrixIndexes, Tuple2 > { - private static final long serialVersionUID = 2388003441846068046L; - - @Override - public Tuple2< MatrixIndexes, Tuple2 > call(Tuple2 tuple2) { - + // CumulativeOffsetSPInstruction (Spark) - MatrixBlock inputBlock = tuple2._2; - MatrixBlock cumsumBlock = new MatrixBlock( inputBlock.getNumRows(), inputBlock.getNumColumns(), false ); - - - for ( int i = 0; i < inputBlock.getNumRows(); i++ ) { - - KahanObject kbuff = new KahanObject(0, 0); - KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); - - for ( int j = 0; j < inputBlock.getNumColumns(); j++ ) { - - double val = inputBlock.get(i, j); - kplus.execute2(kbuff, val); - cumsumBlock.set(i, j, kbuff._sum); - } - } - // original index, original matrix and local cumsum block - return new Tuple2<>( tuple2._1, new Tuple2<>(inputBlock, cumsumBlock) ); - } - } + // rowcumsum processing + JavaPairRDD localRowcumsum = sec.getBinaryMatrixBlockRDDHandleForVariable(input1.getName()); + Tuple2, JavaPairRDD> results = + CumulativeAggregateSPInstruction.processRowCumsumWithEndValues(localRowcumsum); + JavaPairRDD rowEndValues = CumulativeOffsetSPInstruction.processRowCumsumOffsetsDirectly(results._1, results._2); - - private static class FinalRowCumsumFunction implements PairFunction >, MatrixIndexes, MatrixBlock> { - private static final long serialVersionUID = -6738155890298916270L; - // map block indexes to the row offsets - private final Map< Tuple2, double[] > offsetMap; - - public FinalRowCumsumFunction(Map, double[]> offsetMap) { - this.offsetMap = offsetMap; - } - - - @Override - public Tuple2 call( Tuple2< MatrixIndexes, Tuple2 > tuple ) { - - MatrixIndexes indexes = tuple._1; - MatrixBlock inputBlock = tuple._2._1; - MatrixBlock localRowCumsumBlock = tuple._2._2; - - // key to get the row offset for this block - Tuple2 blockKey = new Tuple2<>( indexes.getRowIndex(), indexes.getColumnIndex()) ; - double[] offsets = offsetMap.get(blockKey); - - MatrixBlock cumsumBlock = new MatrixBlock( inputBlock.getNumRows(), inputBlock.getNumColumns(), false ); - - - for ( int i = 0; i < inputBlock.getNumRows(); i++ ) { - - double rowOffset = 0.0; - if ( offsets != null && i < offsets.length ) { - rowOffset = offsets[i]; - } - - for ( int j = 0; j < inputBlock.getNumColumns(); j++ ) { - double cumsumValue = localRowCumsumBlock.get(i, j); - cumsumBlock.set(i, j, cumsumValue + rowOffset); - } - } - - // block index and final cumsum block - return new Tuple2<>(indexes, cumsumBlock); - } - } - - - - // helper class - private static class Tuple3 implements Serializable { - - private static final long serialVersionUID = 123; - private final Type2 _2; - private final Type3 _3; - - - public Tuple3( Type1 _1, Type2 _2, Type3 _3 ) { - this._2 = _2; - this._3 = _3; - } - - public Type2 _2() { - return _2; - } - - public Type3 _3() { - return _3; - } + sec.setRDDHandleForVariable(output.getName(), rowEndValues); + sec.addLineageRDD(output.getName(), input1.getName()); + updateUnaryOutputDataCharacteristics(sec); } private static class RDDMatrixBuiltinUnaryOp implements Function From 092eedea754b719066274e9d4fd742f9e4408635 Mon Sep 17 00:00:00 2001 From: SamarHeydari Date: Sun, 18 Jan 2026 22:48:30 +0100 Subject: [PATCH 2/2] Add large rowCumsum tests and update Spark cumulative instructions --- .../runtime/functionobjects/Builtin.java | 1 + .../CumulativeAggregateSPInstruction.java | 159 ++++++++------ .../spark/CumulativeOffsetSPInstruction.java | 203 ++++++++---------- .../spark/UnaryMatrixSPInstruction.java | 73 +++---- .../RowCumsumLarge01_WideDense_100x200.dml | 45 ++++ .../RowCumsumLarge02_WideAltSign_50x500.dml | 41 ++++ .../matrix/RowCumsumLarge03_Tall_2000x30.dml | 44 ++++ ...owCumsumLarge04_SparseImpulses_100x400.dml | 53 +++++ .../RowCumsumLarge05_NegHeavy_300x250.dml | 44 ++++ .../RowCumsumLarge06_FloatTiny_150x600.dml | 33 +++ .../RowCumsumLarge07_BlockCarry_64x1024.dml | 45 ++++ ...umsumLarge08_DeterministicHash_128x256.dml | 46 ++++ .../RowCumsumLarge09_SpotChecks_10x1000.dml | 45 ++++ .../RowCumsumLarge10_Regression_256x256.dml | 43 ++++ 14 files changed, 661 insertions(+), 214 deletions(-) create mode 100644 src/test/scripts/functions/unary/matrix/RowCumsumLarge01_WideDense_100x200.dml create mode 100644 src/test/scripts/functions/unary/matrix/RowCumsumLarge02_WideAltSign_50x500.dml create mode 100644 src/test/scripts/functions/unary/matrix/RowCumsumLarge03_Tall_2000x30.dml create mode 100644 src/test/scripts/functions/unary/matrix/RowCumsumLarge04_SparseImpulses_100x400.dml create mode 100644 src/test/scripts/functions/unary/matrix/RowCumsumLarge05_NegHeavy_300x250.dml create mode 100644 src/test/scripts/functions/unary/matrix/RowCumsumLarge06_FloatTiny_150x600.dml create mode 100644 src/test/scripts/functions/unary/matrix/RowCumsumLarge07_BlockCarry_64x1024.dml create mode 100644 src/test/scripts/functions/unary/matrix/RowCumsumLarge08_DeterministicHash_128x256.dml create mode 100644 src/test/scripts/functions/unary/matrix/RowCumsumLarge09_SpotChecks_10x1000.dml create mode 100644 src/test/scripts/functions/unary/matrix/RowCumsumLarge10_Regression_256x256.dml diff --git a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java index 8e9aef94660..324b1acbcfc 100644 --- a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java +++ b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java @@ -96,6 +96,7 @@ public enum BuiltinCode { AUTODIFF, SIN, COS, TAN, SINH, COSH, TANH, ASIN, ACOS, String2BuiltinCode.put( "floor" , BuiltinCode.FLOOR); String2BuiltinCode.put( "ucumk+" , BuiltinCode.CUMSUM); String2BuiltinCode.put( "urowcumk+" , BuiltinCode.ROWCUMSUM); + String2BuiltinCode.put("rowCumsum", BuiltinCode.ROWCUMSUM); String2BuiltinCode.put( "ucum*" , BuiltinCode.CUMPROD); String2BuiltinCode.put( "ucumk+*", BuiltinCode.CUMSUMPROD); String2BuiltinCode.put( "ucummin", BuiltinCode.CUMMIN); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeAggregateSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeAggregateSPInstruction.java index 8c776abe56d..be13ba98432 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeAggregateSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeAggregateSPInstruction.java @@ -19,17 +19,14 @@ package org.apache.sysds.runtime.instructions.spark; - import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFunction; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysds.runtime.functionobjects.Builtin; -import org.apache.sysds.runtime.functionobjects.KahanPlus; import org.apache.sysds.runtime.functionobjects.PlusMultiply; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CPOperand; -import org.apache.sysds.runtime.instructions.cp.KahanObject; import org.apache.sysds.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysds.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysds.runtime.matrix.data.MatrixBlock; @@ -47,42 +44,60 @@ private CumulativeAggregateSPInstruction(AggregateUnaryOperator op, CPOperand in super(SPType.CumsumAggregate, op, null, in1, out, null, opcode, istr); } - public static CumulativeAggregateSPInstruction parseInstruction( String str ) { - String[] parts = InstructionUtils.getInstructionPartsWithValueType( str ); - InstructionUtils.checkNumFields ( parts, 2 ); + public static CumulativeAggregateSPInstruction parseInstruction(String str) { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + // parts: opcode, in, out => 3 fields + InstructionUtils.checkNumFields(parts, 3); + String opcode = parts[0]; CPOperand in1 = new CPOperand(parts[1]); CPOperand out = new CPOperand(parts[2]); + AggregateUnaryOperator aggun = InstructionUtils.parseCumulativeAggregateUnaryOperator(opcode); return new CumulativeAggregateSPInstruction(aggun, in1, out, opcode, str); } @Override public void processInstruction(ExecutionContext ec) { - SparkExecutionContext sec = (SparkExecutionContext)ec; + SparkExecutionContext sec = (SparkExecutionContext) ec; DataCharacteristics mc = sec.getDataCharacteristics(input1.getName()); - //get input - JavaPairRDD in = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() ); + // get input + JavaPairRDD in = + sec.getBinaryMatrixBlockRDDHandleForVariable(input1.getName()); if ("urowcumk+".equals(getOpcode())) { - processRowCumsum(sec, in, mc); - } else { + // rowcumsum: aggregate phase should output carry/end-values per block + processRowCumsumAggregate(sec, in, mc); + } + else { + // regular cumsum aggregate phase processCumsum(sec, in, mc); } } - private void processRowCumsum(SparkExecutionContext sec, JavaPairRDD in, DataCharacteristics mc) { - JavaPairRDD localRowCumsum = - in.mapToPair(new LocalRowCumsumFunction()); + private void processRowCumsumAggregate(SparkExecutionContext sec, JavaPairRDD in, DataCharacteristics mc) { + Tuple2, JavaPairRDD> res = + processRowCumsumWithEndValues(in); - sec.setRDDHandleForVariable(output.getName(), localRowCumsum); + JavaPairRDD endValues = res._2; + + sec.setRDDHandleForVariable(output.getName(), endValues); sec.addLineageRDD(output.getName(), input1.getName()); - sec.getDataCharacteristics(output.getName()).set(mc); + + // output characteristics: same rows as input, but 1 column (per-row carry) + MatrixCharacteristics mcOut = new MatrixCharacteristics(mc); + mcOut.setCols(1); + sec.getDataCharacteristics(output.getName()).set(mcOut); } + /** + * Helper for rowcumsum: + * returns (localRowCumsumBlocks, endValuesBlocks). + */ public static Tuple2, JavaPairRDD> - processRowCumsumWithEndValues(JavaPairRDD in) { + processRowCumsumWithEndValues(JavaPairRDD in) { + JavaPairRDD localRowCumsum = in.mapToPair(new LocalRowCumsumFunction()); @@ -92,74 +107,85 @@ private void processRowCumsum(SparkExecutionContext sec, JavaPairRDD(localRowCumsum, endValues); } - private void processCumsum(SparkExecutionContext sec, JavaPairRDD in, DataCharacteristics mc) { + /** + * Original cumsum aggregate phase (keep intact). + */ + private void processCumsum(SparkExecutionContext sec, JavaPairRDD in, DataCharacteristics mc) { DataCharacteristics mcOut = new MatrixCharacteristics(mc); long rlen = mc.getRows(); int blen = mc.getBlocksize(); - mcOut.setRows((long)(Math.ceil((double)rlen/blen))); + mcOut.setRows((long) (Math.ceil((double) rlen / blen))); - //execute unary aggregate (w/ implicit drop correction) + // execute unary aggregate (w/ implicit drop correction) AggregateUnaryOperator auop = (AggregateUnaryOperator) _optr; - JavaPairRDD out = + JavaPairRDD out = in.mapToPair(new RDDCumAggFunction(auop, rlen, blen)); - //merge partial aggregates, adjusting for correct number of partitions - //as size can significant shrink (1K) but also grow (sparse-dense) + + // merge partial aggregates, adjusting for correct number of partitions int numParts = SparkUtils.getNumPreferredPartitions(mcOut); - int minPar = (int)Math.min(SparkExecutionContext.getDefaultParallelism(true), mcOut.getNumBlocks()); + int minPar = (int) Math.min(SparkExecutionContext.getDefaultParallelism(true), mcOut.getNumBlocks()); out = RDDAggregateUtils.mergeByKey(out, Math.max(numParts, minPar), false); - //put output handle in symbol table + // put output handle in symbol table sec.setRDDHandleForVariable(output.getName(), out); sec.addLineageRDD(output.getName(), input1.getName()); sec.getDataCharacteristics(output.getName()).set(mcOut); } - private static class LocalRowCumsumFunction implements PairFunction, MatrixIndexes, MatrixBlock> { + + private static class LocalRowCumsumFunction + implements PairFunction, MatrixIndexes, MatrixBlock> { + private static final long serialVersionUID = 123L; + private static final UnaryOperator ROWCUMSUM_OP = + new UnaryOperator(Builtin.getBuiltinFnObject("urowcumk+")); + @Override - public Tuple2 call(Tuple2 kv) throws Exception { + public Tuple2 call(Tuple2 kv) { MatrixIndexes idx = kv._1; MatrixBlock inputBlock = kv._2; - MatrixBlock outBlock = new MatrixBlock(inputBlock.getNumRows(), inputBlock.getNumColumns(), false); - - for (int i = 0; i < inputBlock.getNumRows(); i++) { - KahanObject kbuff = new KahanObject(0, 0); - KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); - for (int j = 0; j < inputBlock.getNumColumns(); j++) { - double val = inputBlock.get(i, j); - kplus.execute2(kbuff, val); - outBlock.set(i, j, kbuff._sum); - } - } - // original index, original matrix and local cumsum block + MatrixBlock outBlock = inputBlock.unaryOperations(ROWCUMSUM_OP, new MatrixBlock()); return new Tuple2<>(idx, outBlock); } } - private static class ExtractEndValuesFunction implements PairFunction, MatrixIndexes, MatrixBlock> { + + private static class ExtractEndValuesFunction + implements PairFunction, MatrixIndexes, MatrixBlock> { + private static final long serialVersionUID = 123L; @Override - public Tuple2 call(Tuple2 kv) throws Exception { + public Tuple2 call(Tuple2 kv) { MatrixIndexes idx = kv._1; MatrixBlock cumsumBlock = kv._2; - MatrixBlock endValuesBlock = new MatrixBlock(cumsumBlock.getNumRows(), 1, false); - for (int i = 0; i < cumsumBlock.getNumRows(); i++) { - if (cumsumBlock.getNumColumns() > 0) { - endValuesBlock.set(i, 0, cumsumBlock.get(i, cumsumBlock.getNumColumns() - 1)); - } else { + int r = cumsumBlock.getNumRows(); + int c = cumsumBlock.getNumColumns(); + MatrixBlock endValuesBlock = new MatrixBlock(r, 1, false); + + if (c > 0) { + int lastCol = c - 1; + for (int i = 0; i < r; i++) { + endValuesBlock.set(i, 0, cumsumBlock.get(i, lastCol)); + } + } + else { + // degenerate case: empty block + for (int i = 0; i < r; i++) { endValuesBlock.set(i, 0, 0.0); } } + return new Tuple2<>(idx, endValuesBlock); } } - private static class RDDCumAggFunction implements PairFunction, MatrixIndexes, MatrixBlock> - { + private static class RDDCumAggFunction + implements PairFunction, MatrixIndexes, MatrixBlock> { + private static final long serialVersionUID = 11324676268945117L; private final AggregateUnaryOperator _op; @@ -167,53 +193,50 @@ private static class RDDCumAggFunction implements PairFunction call( Tuple2 arg0 ) - throws Exception - { + public Tuple2 call(Tuple2 arg0) throws Exception { MatrixIndexes ixIn = arg0._1(); MatrixBlock blkIn = arg0._2(); MatrixIndexes ixOut = new MatrixIndexes(); MatrixBlock blkOut = new MatrixBlock(); - //process instruction + // process instruction AggregateUnaryOperator aop = _op; - if( aop.aggOp.increOp.fn instanceof PlusMultiply ) { //cumsumprod + if (aop.aggOp.increOp.fn instanceof PlusMultiply) { // cumsumprod aop.indexFn.execute(ixIn, ixOut); - if( _uop == null ) + if (_uop == null) _uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucumk+*")); MatrixBlock t1 = blkIn.unaryOperations(_uop, new MatrixBlock()); - MatrixBlock t2 = blkIn.slice(0, blkIn.getNumRows()-1, 1, 1, new MatrixBlock()); + MatrixBlock t2 = blkIn.slice(0, blkIn.getNumRows() - 1, 1, 1, new MatrixBlock()); blkOut.reset(1, 2); - blkOut.set(0, 0, t1.get(t1.getNumRows()-1, 0)); + blkOut.set(0, 0, t1.get(t1.getNumRows() - 1, 0)); blkOut.set(0, 1, t2.prod()); } - else { //general case - OperationsOnMatrixValues.performAggregateUnary( ixIn, blkIn, ixOut, blkOut, aop, _blen); - if( aop.aggOp.existsCorrection() ) + else { // general case + OperationsOnMatrixValues.performAggregateUnary(ixIn, blkIn, ixOut, blkOut, aop, _blen); + if (aop.aggOp.existsCorrection()) blkOut.dropLastRowsOrColumns(aop.aggOp.correction); } - //cumsum expand partial aggregates - long rlenOut = (long)Math.ceil((double)_rlen/_blen); - long rixOut = (long)Math.ceil((double)ixIn.getRowIndex()/_blen); - int rlenBlk = (int) Math.min(rlenOut-(rixOut-1)*_blen, _blen); + // cumsum expand partial aggregates + long rlenOut = (long) Math.ceil((double) _rlen / _blen); + long rixOut = (long) Math.ceil((double) ixIn.getRowIndex() / _blen); + int rlenBlk = (int) Math.min(rlenOut - (rixOut - 1) * _blen, _blen); int clenBlk = blkOut.getNumColumns(); - int posBlk = (int) ((ixIn.getRowIndex()-1) % _blen); + int posBlk = (int) ((ixIn.getRowIndex() - 1) % _blen); - //construct sparse output blocks (single row in target block size) + // construct sparse output blocks (single row in target block size) MatrixBlock blkOut2 = new MatrixBlock(rlenBlk, clenBlk, true); - blkOut2.copy(posBlk, posBlk, 0, clenBlk-1, blkOut, true); + blkOut2.copy(posBlk, posBlk, 0, clenBlk - 1, blkOut, true); ixOut.setIndexes(rixOut, ixOut.getColumnIndex()); - //output new tuple return new Tuple2<>(ixOut, blkOut2); } } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java index 9b57713e7ac..f7418ca44ad 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java @@ -19,7 +19,13 @@ package org.apache.sysds.runtime.instructions.spark; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; @@ -40,10 +46,8 @@ import org.apache.sysds.runtime.meta.DataCharacteristics; import org.apache.sysds.runtime.util.DataConverter; import org.apache.sysds.runtime.util.UtilFunctions; -import scala.Serializable; -import scala.Tuple2; -import java.util.*; +import scala.Tuple2; public class CumulativeOffsetSPInstruction extends BinarySPInstruction { private UnaryOperator _uop = null; @@ -51,7 +55,8 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction { private final double _initValue ; private final boolean _broadcast; - private CumulativeOffsetSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, double init, boolean broadcast, String opcode, String istr) { + private CumulativeOffsetSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, + double init, boolean broadcast, String opcode, String istr) { super(SPType.CumsumOffset, op, in1, in2, out, opcode, istr); if (Opcodes.BCUMOFFKP.toString().equals(opcode)) @@ -73,15 +78,18 @@ else if (Opcodes.BCUMOFFMAX.toString().equals(opcode)) _broadcast = broadcast; } - public static CumulativeOffsetSPInstruction parseInstruction ( String str ) { - String[] parts = InstructionUtils.getInstructionPartsWithValueType( str ); - InstructionUtils.checkNumFields(parts, 5); + public static CumulativeOffsetSPInstruction parseInstruction(String str) { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + // parts: opcode, in1, in2, out, init, broadcast => 6 fields + InstructionUtils.checkNumFields(parts, 6); + String opcode = parts[0]; CPOperand in1 = new CPOperand(parts[1]); CPOperand in2 = new CPOperand(parts[2]); CPOperand out = new CPOperand(parts[3]); double init = Double.parseDouble(parts[4]); boolean broadcast = Boolean.parseBoolean(parts[5]); + return new CumulativeOffsetSPInstruction(null, in1, in2, out, init, broadcast, opcode, str); } @@ -130,78 +138,101 @@ public void processInstruction(ExecutionContext ec) { sec.addLineage(output.getName(), input2.getName(), broadcast); } + /** + * Distributed rowcumsum offset application: + * - endValues: for each (rowBlock, colBlock), a (rowsInBlock x 1) column vector with the last value of each row + * - compute per-(rowBlock,colBlock) offsets via prefix-scan across colBlocks (within each rowBlock) + * - join offsets with localRowCumsum blocks and add offsets row-wise + * + * This matches the paper’s two-phase scan: local scan + carry propagation. + */ public static JavaPairRDD processRowCumsumOffsetsDirectly( JavaPairRDD localRowCumsum, JavaPairRDD endValues) { - // Collect end-values of every block of every row for offset calc by grouping by global row index - JavaPairRDD>> rowEndValues = endValues - .mapToPair(t -> { - // get index of block - MatrixIndexes idx = t._1; - // get cum matrix block - MatrixBlock endValuesBlock = t._2; - - // get row and column block index - long rowBlockIdx = idx.getRowIndex(); - long colBlockIdx = idx.getColumnIndex(); - - // Save end value of every row of every block (if block is empty save 0) - double[] lastValues = new double[endValuesBlock.getNumRows()]; - for (int i = 0; i < endValuesBlock.getNumRows(); i++) { - lastValues[i] = endValuesBlock.get(i, 0); - } + // Group end-values by row-block, then sort by col-block and compute prefix offsets + JavaPairRDD>> groupedByRowBlock = endValues + .mapToPair(new PairFunction, Long, Tuple2>() { + private static final long serialVersionUID = 1L; - return new Tuple2<>(rowBlockIdx, new Tuple3<>(rowBlockIdx, colBlockIdx, lastValues)); + @Override + public Tuple2> call(Tuple2 t) { + long rowBlock = t._1.getRowIndex(); + long colBlock = t._1.getColumnIndex(); + return new Tuple2<>(rowBlock, new Tuple2<>(colBlock, t._2)); + } }) .groupByKey(); - // compute offset for every block - List, double[]>> offsetList = rowEndValues - .flatMapToPair(t -> { - Long rowBlockIdx = t._1; - List> colValues = new ArrayList<>(); - for (Tuple3 cv : t._2) { - colValues.add(cv); - } + // Produce offsets per (rowBlock, colBlock) as a MatrixBlock (rowsInBlock x 1) + JavaPairRDD offsetsByBlock = groupedByRowBlock + .flatMapToPair(new PairFlatMapFunction>>, MatrixIndexes, MatrixBlock>() { + private static final long serialVersionUID = 1L; - // sort blocks from one row by column index - colValues.sort(Comparator.comparing(Tuple3::_2)); + @Override + public Iterator> call(Tuple2>> t) { + long rowBlock = t._1; - // get number of rows of a block by counting amount of end (row) values of said block - int numRows = 0; - if (!colValues.isEmpty()) { - double[] lastValuesArray = colValues.get(0)._3(); - numRows = lastValuesArray.length; - } + List> cols = new ArrayList<>(); + for (Tuple2 x : t._2) + cols.add(x); + + cols.sort(Comparator.comparingLong(Tuple2::_1)); - List, double[]>> blockOffsets = new ArrayList<>(); - double[] cumulativeOffsets = new double[numRows]; + int numRows = 0; + if (!cols.isEmpty()) + numRows = cols.get(0)._2.getNumRows(); - for (Tuple3 colValue : colValues) { - Long colBlockIdx = colValue._2(); - double[] rowendValues = colValue._3(); + double[] cumulative = new double[numRows]; + List> out = new ArrayList<>(cols.size()); - // copy current offsets - double[] currentOffsets = cumulativeOffsets.clone(); + for (Tuple2 cb : cols) { + long colBlock = cb._1; + MatrixBlock endBlock = cb._2; // (numRows x 1) - // and save block indexes with its offsets - blockOffsets.add(new Tuple2<>(new Tuple2<>(rowBlockIdx, colBlockIdx), currentOffsets)); + // offsets for THIS block = cumulative sum of all previous blocks + MatrixBlock offsetBlock = new MatrixBlock(numRows, 1, false); + for (int i = 0; i < numRows; i++) + offsetBlock.set(i, 0, cumulative[i]); - for (int i = 0; i < numRows && i < rowendValues.length; i++) { - cumulativeOffsets[i] += rowendValues[i]; + out.add(new Tuple2<>(new MatrixIndexes(rowBlock, colBlock), offsetBlock)); + + // update cumulative by adding this block’s end-values + for (int i = 0; i < numRows; i++) + cumulative[i] += endBlock.get(i, 0); } + + return out.iterator(); } - return blockOffsets.iterator(); - }) - .collect(); + }); + + // Join local rowcumsum with offsets and add offsets row-wise + return localRowCumsum + .leftOuterJoin(offsetsByBlock) + .mapToPair(new PairFunction>>, MatrixIndexes, MatrixBlock>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2 call( + Tuple2>> t) { + + MatrixIndexes ix = t._1; + MatrixBlock local = t._2._1; + MatrixBlock off = t._2._2.isPresent() ? t._2._2.get() : null; + + int r = local.getNumRows(); + int c = local.getNumColumns(); + MatrixBlock out = new MatrixBlock(r, c, false); + + for (int i = 0; i < r; i++) { + double rowOffset = (off != null) ? off.get(i, 0) : 0.0; + for (int j = 0; j < c; j++) + out.set(i, j, local.get(i, j) + rowOffset); + } - // convert list to map for easier access to offsets - Map, double[]> offsetMap = new HashMap<>(); - for (Tuple2, double[]> entry : offsetList) { - offsetMap.put(entry._1, entry._2); - } - return localRowCumsum.mapToPair(new FinalRowCumsumFunction(offsetMap)); + return new Tuple2<>(ix, out); + } + }); } private void processRowCumsumOffsets(SparkExecutionContext sec, DataCharacteristics mc1, DataCharacteristics mc2) { @@ -226,55 +257,7 @@ public boolean getBroadcast() { return _broadcast; } - private static class FinalRowCumsumFunction implements PairFunction, MatrixIndexes, MatrixBlock> { - private static final long serialVersionUID = 1L; - // map block indexes to the row offsets - private final Map, double[]> offsetMap; - - public FinalRowCumsumFunction(Map, double[]> offsetMap) { - this.offsetMap = offsetMap; - } - - @Override - public Tuple2 call(Tuple2 tuple) throws Exception { - MatrixIndexes idx = tuple._1; - MatrixBlock localRowCumsumBlock = tuple._2; - - // key to get the row offset for this block - Tuple2 blockKey = new Tuple2<>(idx.getRowIndex(), idx.getColumnIndex()); - double[] offsets = offsetMap.get(blockKey); - - MatrixBlock outBlock = new MatrixBlock(localRowCumsumBlock.getNumRows(), localRowCumsumBlock.getNumColumns(), false); - - for (int i = 0; i < localRowCumsumBlock.getNumRows(); i++) { - double rowOffset = (offsets != null && i < offsets.length) ? offsets[i] : 0.0; - for (int j = 0; j < localRowCumsumBlock.getNumColumns(); j++) { - double cumsumValue = localRowCumsumBlock.get(i, j); - outBlock.set(i, j, cumsumValue + rowOffset); - } - } - // block index and final cumsum block - return new Tuple2<>(idx, outBlock); - } - } - - // helper class - private static class Tuple3 implements Serializable { - private static final long serialVersionUID = 1L; - private final T1 _1; - private final T2 _2; - private final T3 _3; - - public Tuple3(T1 _1, T2 _2, T3 _3) { - this._1 = _1; - this._2 = _2; - this._3 = _3; - } - - public T1 _1() { return _1; } - public T2 _2() { return _2; } - public T3 _3() { return _3; } - } + // --- existing generic cumsum offset machinery below (unchanged) --- private static class RDDCumSplitFunction implements PairFlatMapFunction, MatrixIndexes, MatrixBlock> { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java index 6181d1c1d1a..e316d848100 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java @@ -38,55 +38,57 @@ public class UnaryMatrixSPInstruction extends UnarySPInstruction { protected UnaryMatrixSPInstruction(Operator op, CPOperand in, CPOperand out, String opcode, String instr) { super(SPType.Unary, op, in, out, opcode, instr); } - + public static UnarySPInstruction parseInstruction ( String str ) { CPOperand in = new CPOperand("", ValueType.UNKNOWN, DataType.UNKNOWN); CPOperand out = new CPOperand("", ValueType.UNKNOWN, DataType.UNKNOWN); String opcode = parseUnaryInstruction(str, in, out); return new UnaryMatrixSPInstruction( - InstructionUtils.parseUnaryOperator(opcode), in, out, opcode, str); + InstructionUtils.parseUnaryOperator(opcode), in, out, opcode, str); } - @Override + @Override public void processInstruction(ExecutionContext ec) { SparkExecutionContext sec = (SparkExecutionContext)ec; - - //get input - JavaPairRDD in = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() ); - - //execute unary builtin operation - UnaryOperator uop = (UnaryOperator) _optr; - JavaPairRDD out = in.mapValues(new RDDMatrixBuiltinUnaryOp(uop)); - - //set output RDD - updateUnaryOutputDataCharacteristics(sec); - sec.setRDDHandleForVariable(output.getName(), out); - sec.addLineageRDD(output.getName(), input1.getName()); - - //FIXME: implement similar to cumsum through - // CumulativeAggregateSPInstruction (Spark) - // UnaryMatrixCPInstruction (local cumsum on aggregates) - // CumulativeOffsetSPInstruction (Spark) - - // rowcumsum processing - JavaPairRDD localRowcumsum = sec.getBinaryMatrixBlockRDDHandleForVariable(input1.getName()); - - Tuple2, JavaPairRDD> results = - CumulativeAggregateSPInstruction.processRowCumsumWithEndValues(localRowcumsum); - - JavaPairRDD rowEndValues = CumulativeOffsetSPInstruction.processRowCumsumOffsetsDirectly(results._1, results._2); - - sec.setRDDHandleForVariable(output.getName(), rowEndValues); - sec.addLineageRDD(output.getName(), input1.getName()); - updateUnaryOutputDataCharacteristics(sec); + + // get input + JavaPairRDD in = + sec.getBinaryMatrixBlockRDDHandleForVariable(input1.getName()); + + // Only do distributed rowcumsum logic for the rowcumsum opcode. + // Otherwise do the default unary builtin blockwise operation. + if ("urowcumk+".equals(getOpcode())) { + + // rowcumsum processing (distributed: aggregate + offsets) + Tuple2, JavaPairRDD> results = + CumulativeAggregateSPInstruction.processRowCumsumWithEndValues(in); + + JavaPairRDD rowEndValues = + CumulativeOffsetSPInstruction.processRowCumsumOffsetsDirectly(results._1, results._2); + + updateUnaryOutputDataCharacteristics(sec); + sec.setRDDHandleForVariable(output.getName(), rowEndValues); + sec.addLineageRDD(output.getName(), input1.getName()); + } + else { + // execute unary builtin operation (blockwise) + UnaryOperator uop = (UnaryOperator) _optr; + JavaPairRDD out = + in.mapValues(new RDDMatrixBuiltinUnaryOp(uop)); + + // set output RDD + updateUnaryOutputDataCharacteristics(sec); + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), input1.getName()); + } } - private static class RDDMatrixBuiltinUnaryOp implements Function + private static class RDDMatrixBuiltinUnaryOp implements Function { private static final long serialVersionUID = -3128192099832877491L; - + private UnaryOperator _op = null; - + public RDDMatrixBuiltinUnaryOp(UnaryOperator u_op) { _op = u_op; } @@ -97,4 +99,3 @@ public MatrixBlock call(MatrixBlock arg0) throws Exception { } } } - diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge01_WideDense_100x200.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge01_WideDense_100x200.dml new file mode 100644 index 00000000000..4a2f18f2ff3 --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge01_WideDense_100x200.dml @@ -0,0 +1,45 @@ +# Test-specific focus: +# Wide dense matrix spanning multiple column blocks. +# +# Why this matters: +# Row-wise cumulative sum in Spark can fail when carries are not propagated +# correctly across column blocks/partitions. This test checks continuity of +# cumulative sums across the full row width. +# +# Matrix construction: +# X[i,j] = i + 0.01*j +# +# Expected result (closed-form, no loops): +# E[i,j] = sum_{k=1..j} (i + 0.01*k) +# = j*i + 0.01 * j*(j+1)/2 +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. That helper exists only in the JUnit test harness. +# - Exec mode is controlled by CLI: -exec spark +# +# IMPORTANT: +# - The builtin function name is "rowcumsum" (lowercase), not "rowCumsum". +# - Do NOT source scripts/builtin/rowCumsum.dml because it does not exist. + +n = 100; m = 200; + +ri = matrix(seq(1, n), n, 1); +cj = matrix(seq(1, m), 1, m); + +# X[i,j] = i + 0.01*j +X = ri %*% matrix(1, 1, m) + 0.01 * (matrix(1, n, 1) %*% cj); + +# Compute row-wise cumulative sum (native builtin) +Y = rowcumsum(X); + +# Expected: E[i,j] = j*i + 0.01 * j*(j+1)/2 +J = matrix(1, n, 1) %*% cj; # j replicated down +E = J * (ri %*% matrix(1, 1, m)) + 0.01 * (J * (J + 1)) / 2; # closed form + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge02_WideAltSign_50x500.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge02_WideAltSign_50x500.dml new file mode 100644 index 00000000000..074ee33574a --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge02_WideAltSign_50x500.dml @@ -0,0 +1,41 @@ +# Test-specific focus: +# Wide matrix with alternating positive and negative values. +# +# Why this matters: +# Carry propagation errors often surface when signs alternate, because +# missing or duplicated carries cannot cancel out silently. +# +# Matrix construction: +# X[i,j] alternates sign by column and increases in magnitude with (i + j). +# +# Expected result: +# Computed via an explicit sequential loop over columns, serving as a +# trusted reference implementation. +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. Exec mode is controlled by CLI: -exec spark + +n = 50; m = 500; +ri = matrix(seq(1, n), n, 1); +cj = matrix(seq(1, m), 1, m); + +# X[i,j] = (j%2==0 ? -1 : 1) * (i + j) +S = ((cj %% 2) * (-2) + 1); # j odd => +1, j even => -1 +X = (ri %*% matrix(1, 1, m) + matrix(1, n, 1) %*% cj) * (matrix(1, n, 1) %*% S); + +Y = rowcumsum(X); + +# Expected via explicit sequential loop over columns +E = matrix(0, n, m); +E[,1] = X[,1]; +for (j in 2:m) { + E[,j] = E[,j-1] + X[,j]; +} + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge03_Tall_2000x30.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge03_Tall_2000x30.dml new file mode 100644 index 00000000000..b78ade4680a --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge03_Tall_2000x30.dml @@ -0,0 +1,44 @@ +# Test-specific focus: +# Very tall matrix with moderate width. +# +# Why this matters: +# Ensures correctness when: +# - Many row blocks are involved +# - Row-wise operations are distributed across Spark partitions +# +# This test verifies that: +# - Row-wise independence is preserved +# - No cross-row interference occurs during carry propagation +# +# Expected result: +# Computed via a sequential column-wise loop (trusted reference). +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. Exec mode is controlled by CLI: -exec spark + +n = 2000; m = 30; +ri = matrix(seq(1, n), n, 1); +cj = matrix(seq(1, m), 1, m); + +# X[i,j] = sin(i) + j +# - sin(i) varies per row (helps detect cross-row interference) +# - + j ensures a predictable column-wise growth pattern +X = sin(ri %*% matrix(1, 1, m)) + (matrix(1, n, 1) %*% cj); + +# Row-wise cumulative sum (must be supported by your implementation) +Y = rowcumsum(X); + +# Expected via explicit sequential loop over columns +E = matrix(0, n, m); +E[,1] = X[,1]; +for (j in 2:m) { + E[,j] = E[,j-1] + X[,j]; +} + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge04_SparseImpulses_100x400.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge04_SparseImpulses_100x400.dml new file mode 100644 index 00000000000..9ad65ca64aa --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge04_SparseImpulses_100x400.dml @@ -0,0 +1,53 @@ +# Test-specific focus: +# Sparse-like structure with isolated non-zero impulses. +# +# Why this matters: +# In sparse or near-sparse matrices, incorrect carry handling often +# manifests as: +# - Sudden jumps +# - Missing propagation after zero blocks +# +# Matrix construction: +# - Mostly zeros +# - Deterministic impulses at known columns and rows +# +# Expected behavior: +# Cumulative sum should only change at impulse locations and remain +# constant elsewhere. +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. Exec mode is controlled by CLI: -exec spark + +n = 100; m = 400; +X = matrix(0, n, m); + +# Put impulses in a deterministic way +# Row 1: spikes of +10 every 50 columns +for (j in 1:m) { + if (j %% 50 == 0) { + X[1, j] = 10; + } +} + +# Row 50: ramp in first 100 columns (1..100) +for (j in 1:100) { + X[50, j] = j; +} + +# Row-wise cumulative sum (must be supported by your implementation) +Y = rowcumsum(X); + +# Expected via explicit sequential loop over columns +E = matrix(0, n, m); +E[, 1] = X[, 1]; +for (j in 2:m) { + E[, j] = E[, j-1] + X[, j]; +} + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge05_NegHeavy_300x250.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge05_NegHeavy_300x250.dml new file mode 100644 index 00000000000..99c6da8daaf --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge05_NegHeavy_300x250.dml @@ -0,0 +1,44 @@ +# Test-specific focus: +# Negative-dominated values with increasing magnitude. +# +# Why this matters: +# Ensures numerical correctness when cumulative sums are decreasing +# and values span multiple column blocks. +# +# This test stresses: +# - Sign correctness (cumsum should keep decreasing) +# - Carry propagation across column blocks +# - No accidental abs() / truncation / overflow-ish behavior +# +# Expected result: +# Verified via a sequential loop reference (per-row prefix sum). +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. Exec mode is controlled by CLI: -exec spark + +n = 300; m = 250; +ri = matrix(seq(1, n), n, 1); +cj = matrix(seq(1, m), 1, m); + +# X[i,j] = - (i*j) / 1000 +# Construct i replicated across columns and j replicated across rows, then multiply. +I = ri %*% matrix(1, 1, m); +J = matrix(1, n, 1) %*% cj; +X = - (I * J) / 1000; + +Y = rowcumsum(X); + +# Expected via explicit sequential loop over columns +E = matrix(0, n, m); +E[, 1] = X[, 1]; +for (j in 2:m) { + E[, j] = E[, j-1] + X[, j]; +} + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge06_FloatTiny_150x600.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge06_FloatTiny_150x600.dml new file mode 100644 index 00000000000..21f93a36adc --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge06_FloatTiny_150x600.dml @@ -0,0 +1,33 @@ +fix this only dont give the test :# Test-specific focus: +# Floating-point stability with very small values. +# +# Why this matters: +# Distributed prefix sums can amplify floating-point error if +# offsets are applied incorrectly or in the wrong order. +# +# Matrix construction: +# Values on the order of 1e-6 with mixed signs. +# +# Expected behavior: +# Numerical error remains within tight tolerance (1e-10). + +setExecMode("spark"); + +n=150; m=600; +ri = matrix(seq(1,n), n, 1); +cj = matrix(seq(1,m), 1, m); + +# X[i,j] = 1e-6 * (i - j) +X = 1e-6 * ( (ri %*% matrix(1,1,m)) - (matrix(1,n,1) %*% cj) ); + +Y = rowCumsum(X); + +E = matrix(0, n, m); +E[,1] = X[,1]; +for(j in 2:m){ + E[,j] = E[,j-1] + X[,j]; +} + +err = max(abs(Y - E)); +pass = (err < 1e-10); # slightly stricter because values are tiny +write(pass, $1); \ No newline at end of file diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge07_BlockCarry_64x1024.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge07_BlockCarry_64x1024.dml new file mode 100644 index 00000000000..42fd0c9e461 --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge07_BlockCarry_64x1024.dml @@ -0,0 +1,45 @@ +# Test-specific focus: +# Stress test for carry propagation across many column blocks. +# +# Why this matters: +# Spark row-wise prefix sums commonly break at block boundaries when +# carries are not propagated correctly. This test uses a very simple +# data pattern with a closed-form expected result, so any block-boundary +# error shows up immediately. +# +# Matrix construction: +# X[i,j] = i (constant across each row) +# +# Expected result (closed-form, no loops): +# E[i,j] = sum_{k=1..j} i = j * i +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. That helper exists only in the JUnit harness. +# - Exec mode is controlled by CLI: -exec spark + +n = 64; m = 1024; + +# Row index column vector i = 1..n +ri = matrix(seq(1, n), n, 1); + +# Construct X so every row i is constant: X[i,j] = i +X = ri %*% matrix(1, 1, m); + +# Compute row-wise cumulative sum +Y = rowcumsum(X); + +# Column index row vector j = 1..m +cj = matrix(seq(1, m), 1, m); +J = matrix(1, n, 1) %*% cj; # replicate j down the rows +R = ri %*% matrix(1, 1, m); # replicate i across columns + +# Expected: E[i,j] = j * i +E = J * R; + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge08_DeterministicHash_128x256.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge08_DeterministicHash_128x256.dml new file mode 100644 index 00000000000..b03474e1479 --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge08_DeterministicHash_128x256.dml @@ -0,0 +1,46 @@ +# Test-specific focus: +# Pseudo-random but deterministic data pattern. +# +# Why this matters: +# Captures realistic, irregular data distributions while remaining +# fully reproducible. +# +# This test ensures: +# - No assumptions are made about monotonicity or smoothness +# - Carry propagation works for arbitrary values +# +# Expected result: +# Computed via trusted sequential loop. +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. That helper exists only in the JUnit harness. +# - Exec mode is controlled by CLI: -exec spark + +n = 128; m = 256; + +ri = matrix(seq(1, n), n, 1); +cj = matrix(seq(1, m), 1, m); + +# deterministic pseudo-random-ish: +# X[i,j] = ((i*1103515245 + j*12345) %% 97) - 48 +A = (ri %*% matrix(1, 1, m)) * 1103515245; +B = (matrix(1, n, 1) %*% cj) * 12345; +X = ((A + B) %% 97) - 48; + +# row-wise cumsum (builtin name is lower-case in DML) +Y = rowcumsum(X); + +# Expected result via sequential loop +E = matrix(0, n, m); +E[, 1] = X[, 1]; +for (j in 2:m) { + E[, j] = E[, j-1] + X[, j]; +} + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge09_SpotChecks_10x1000.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge09_SpotChecks_10x1000.dml new file mode 100644 index 00000000000..3cd29cd3900 --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge09_SpotChecks_10x1000.dml @@ -0,0 +1,45 @@ +# Test-specific focus: +# Explicit spot-check validation with known exact values. +# +# Why this matters: +# Provides human-readable correctness checks that can be discussed +# and verified manually. +# +# This test: +# - Checks global correctness via max error +# - Verifies specific entries with analytically known results +# +# Useful for debugging and supervisor review. +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. Exec mode is controlled by CLI: -exec spark + +n = 10; m = 1000; + +ri = matrix(seq(1, n), n, 1); +cj = matrix(seq(1, m), 1, m); + +# X[i,j] = i + j +X = ri %*% matrix(1, 1, m) + matrix(1, n, 1) %*% cj; + +# Row-wise cumulative sum (builtin is lower-case in DML) +Y = rowcumsum(X); + +# Closed-form expected: +# E[i,j] = sum_{k=1..j} (i+k) = j*i + j*(j+1)/2 +J = matrix(1, n, 1) %*% cj; +E = J * (ri %*% matrix(1, 1, m)) + (J * (J + 1)) / 2; + +err = max(abs(Y - E)); + +# Spot checks (exact integers) +s1 = Y[1, 1]; # should be 1+1 = 2 +s2 = Y[1, 10]; # sum_{k=1..10}(1+k)= 10*1 + 55 = 65 +s3 = Y[3, 1000]; # 1000*3 + 1000*1001/2 = 3000 + 500500 = 503500 + +pass = (err < 1e-9) & (s1 == 2) & (s2 == 65) & (s3 == 503500); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge10_Regression_256x256.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge10_Regression_256x256.dml new file mode 100644 index 00000000000..6687a78c64f --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge10_Regression_256x256.dml @@ -0,0 +1,43 @@ +# Test-specific focus: +# Regression-style test combining multiple patterns. +# +# Why this matters: +# Designed to catch subtle bugs introduced by future refactoring. +# +# Matrix characteristics: +# - Mixed signs +# - Row-dependent offsets +# - Moderate size across both dimensions +# +# Expected result: +# Sequential loop reference. +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. Exec mode is controlled by CLI: -exec spark + +n = 256; m = 256; + +# Create structured matrix with mixed sign and magnitude +ri = matrix(seq(1, n), n, 1); +cj = matrix(seq(1, m), 1, m); + +X = ((ri %*% matrix(1, 1, m)) - (matrix(1, n, 1) %*% cj)) / 10; +X = X + ((ri %% 7) %*% matrix(1, 1, m)); # add row-dependent offsets + +# Row-wise cumulative sum (builtin is lower-case in DML) +Y = rowcumsum(X); + +# Reference via loop +E = matrix(0, n, m); +E[, 1] = X[, 1]; +for (j in 2:m) { + E[, j] = E[, j-1] + X[, j]; +} + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1);