Skip to content

Commit

Permalink
Merge pull request #42578 from HindujaB/2201.9.x-fix-strands
Browse files Browse the repository at this point in the history
Strand fix test PR
  • Loading branch information
TharmiganK authored Apr 17, 2024
2 parents 48f1f43 + e22ba4d commit 45f24f1
Showing 1 changed file with 11 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package io.ballerina.runtime.internal;

import io.ballerina.runtime.api.Module;
import io.ballerina.runtime.api.async.StrandMetadata;
import io.ballerina.runtime.api.creators.ErrorCreator;
import io.ballerina.runtime.api.creators.TypeCreator;
import io.ballerina.runtime.api.flags.SymbolFlags;
Expand Down Expand Up @@ -188,32 +187,24 @@ private static BMap<BString, Object> populateRecordDefaultValues(
private static void populateInitialValuesWithNoStrand(BMap<BString, Object> recordValue, CountDownLatch latch,
Map<String, BFunctionPointer<Object, ?>> defaultValues) {
String[] fields = defaultValues.keySet().toArray(new String[0]);
invokeFPAsyncIterativelyWithNoStrand(recordValue, defaultValues, fields, "default",
Scheduler.getDaemonStrand().getMetadata(), defaultValues.size(), o -> {
}, Scheduler.getDaemonStrand().scheduler, latch);
}

public static void invokeFPAsyncIterativelyWithNoStrand(BMap<BString, Object> recordValue,
Map<String, BFunctionPointer<Object, ?>> defaultValues,
String[] fields, String strandName, StrandMetadata metadata,
int noOfIterations, Consumer<Object> futureResultConsumer,
Scheduler scheduler, CountDownLatch latch) {
int noOfIterations = defaultValues.size();
if (noOfIterations <= 0) {
return;
}
AtomicInteger callCount = new AtomicInteger(0);
scheduleNextFunction(recordValue, defaultValues, fields, strandName, metadata, noOfIterations, callCount,
futureResultConsumer, scheduler, latch);
scheduleNextFunction(recordValue, defaultValues, fields, "default", noOfIterations, callCount,
o -> { }, latch, Scheduler.getDaemonStrand());
}

private static void scheduleNextFunction(BMap<BString, Object> recordValue,
Map<String, BFunctionPointer<Object, ?>> defaultValues, String[] fields,
String strandName, StrandMetadata metadata, int noOfIterations,
AtomicInteger callCount, Consumer<Object> futureResultConsumer,
Scheduler scheduler, CountDownLatch latch) {
String strandName, int noOfIterations, AtomicInteger callCount,
Consumer<Object> futureResultConsumer, CountDownLatch latch,
Strand parent) {
BFunctionPointer<?, ?> func = defaultValues.get(fields[callCount.get()]);
Type retType = ((FunctionType) TypeUtils.getImpliedType(func.getType())).getReturnType();
FutureValue future = scheduler.createFuture(null, null, null, retType, strandName, metadata);
FutureValue future = parent.scheduler.createFuture(Scheduler.getDaemonStrand(), null, null, retType,
strandName, parent.getMetadata());
AsyncFunctionCallback callback = new AsyncFunctionCallback(null) {
@Override
public void notifySuccess(Object result) {
Expand All @@ -222,8 +213,8 @@ public void notifySuccess(Object result) {
int i = callCount.incrementAndGet();
latch.countDown();
if (i != noOfIterations) {
scheduleNextFunction(recordValue, defaultValues, fields, strandName, metadata, noOfIterations,
callCount, futureResultConsumer, scheduler, latch);
scheduleNextFunction(recordValue, defaultValues, fields, strandName, noOfIterations,
callCount, futureResultConsumer, latch, parent);
}
}

Expand All @@ -232,27 +223,9 @@ public void notifyFailure(BError error) {
errStream.println(ERROR_PRINT_PREFIX + error.getPrintableStackTrace());
}
};

invokeFunctionPointerAsync(func, retType, future, callback, scheduler, strandName, metadata);
}

private static void invokeFunctionPointerAsync(BFunctionPointer<?, ?> func, Type returnType, FutureValue future,
AsyncFunctionCallback callback, Scheduler scheduler,
String strandName, StrandMetadata metadata) {
future.callback = callback;
callback.setFuture(future);
AsyncFunctionCallback childCallback = new AsyncFunctionCallback(future.strand) {
@Override
public void notifySuccess(Object result) {
callback.notifySuccess(result);
}

@Override
public void notifyFailure(BError error) {
callback.notifyFailure(error);
}
};
scheduler.scheduleFunction(new Object[1], func, null, returnType, strandName, metadata, childCallback);
parent.scheduler.scheduleLocal(new Object[1], func, Scheduler.getDaemonStrand(), future);
}

/**
Expand Down

0 comments on commit 45f24f1

Please sign in to comment.