Skip to content

Commit

Permalink
Merge pull request #42634 from warunalakshitha/fix_perf_issue_master
Browse files Browse the repository at this point in the history
[master] Use daemon strand for non strand threads
  • Loading branch information
warunalakshitha authored Apr 29, 2024
2 parents de17421 + 5c7248c commit 44d9cdf
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
package io.ballerina.runtime.internal;

import io.ballerina.runtime.api.Module;
import io.ballerina.runtime.api.async.StrandMetadata;
import io.ballerina.runtime.api.creators.ErrorCreator;
import io.ballerina.runtime.api.creators.TypeCreator;
import io.ballerina.runtime.api.flags.SymbolFlags;
import io.ballerina.runtime.api.types.Field;
import io.ballerina.runtime.api.types.FunctionType;
import io.ballerina.runtime.api.types.Type;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.utils.TypeUtils;
Expand All @@ -35,12 +32,10 @@
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.runtime.api.values.BValue;
import io.ballerina.runtime.api.values.BXml;
import io.ballerina.runtime.internal.scheduling.AsyncFunctionCallback;
import io.ballerina.runtime.internal.scheduling.Scheduler;
import io.ballerina.runtime.internal.scheduling.State;
import io.ballerina.runtime.internal.scheduling.Strand;
import io.ballerina.runtime.internal.types.BRecordType;
import io.ballerina.runtime.internal.values.FutureValue;
import io.ballerina.runtime.internal.values.MapValue;
import io.ballerina.runtime.internal.values.MapValueImpl;
import io.ballerina.runtime.internal.values.TypedescValueImpl;
Expand All @@ -52,11 +47,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static io.ballerina.runtime.api.values.BError.ERROR_PRINT_PREFIX;

/**
* Class @{@link ValueUtils} provides utils to create Ballerina Values.
Expand Down Expand Up @@ -148,19 +138,12 @@ private static BMap<BString, Object> populateRecordDefaultValues(
BMap<BString, Object> recordValue, Map<String, BFunctionPointer<Object, ?>> defaultValues) {
Strand strand = Scheduler.getStrandNoException();
if (strand == null) {
try {
final CountDownLatch latch = new CountDownLatch(defaultValues.size());
populateInitialValuesWithNoStrand(recordValue, latch, defaultValues);
latch.await();
} catch (InterruptedException e) {
throw ErrorCreator.createError(
StringUtils.fromString("error occurred when populating default values"), e);
}
} else {
for (Map.Entry<String, BFunctionPointer<Object, ?>> field : defaultValues.entrySet()) {
recordValue.populateInitialValue(StringUtils.fromString(field.getKey()),
field.getValue().call(new Object[]{strand}));
}
// Create a dummy strand only for keep frames.
strand = new Strand();
}
for (Map.Entry<String, BFunctionPointer<Object, ?>> field : defaultValues.entrySet()) {
recordValue.populateInitialValue(StringUtils.fromString(field.getKey()),
field.getValue().call(new Object[]{strand}));
}
return recordValue;
}
Expand All @@ -185,76 +168,6 @@ private static BMap<BString, Object> populateRecordDefaultValues(
return result;
}

private static void populateInitialValuesWithNoStrand(BMap<BString, Object> recordValue, CountDownLatch latch,
Map<String, BFunctionPointer<Object, ?>> defaultValues) {
String[] fields = defaultValues.keySet().toArray(new String[0]);
invokeFPAsyncIterativelyWithNoStrand(recordValue, defaultValues, fields, "default",
Scheduler.getDaemonStrand().getMetadata(), defaultValues.size(), o -> {
}, Scheduler.getDaemonStrand().scheduler, latch);
}

public static void invokeFPAsyncIterativelyWithNoStrand(BMap<BString, Object> recordValue,
Map<String, BFunctionPointer<Object, ?>> defaultValues,
String[] fields, String strandName, StrandMetadata metadata,
int noOfIterations, Consumer<Object> futureResultConsumer,
Scheduler scheduler, CountDownLatch latch) {
if (noOfIterations <= 0) {
return;
}
AtomicInteger callCount = new AtomicInteger(0);
scheduleNextFunction(recordValue, defaultValues, fields, strandName, metadata, noOfIterations, callCount,
futureResultConsumer, scheduler, latch);
}

private static void scheduleNextFunction(BMap<BString, Object> recordValue,
Map<String, BFunctionPointer<Object, ?>> defaultValues, String[] fields,
String strandName, StrandMetadata metadata, int noOfIterations,
AtomicInteger callCount, Consumer<Object> futureResultConsumer,
Scheduler scheduler, CountDownLatch latch) {
BFunctionPointer<?, ?> func = defaultValues.get(fields[callCount.get()]);
Type retType = ((FunctionType) TypeUtils.getImpliedType(func.getType())).getReturnType();
FutureValue future = scheduler.createFuture(null, null, null, retType, strandName, metadata);
AsyncFunctionCallback callback = new AsyncFunctionCallback(null) {
@Override
public void notifySuccess(Object result) {
futureResultConsumer.accept(getFutureResult());
recordValue.populateInitialValue(StringUtils.fromString(fields[callCount.get()]), result);
int i = callCount.incrementAndGet();
latch.countDown();
if (i != noOfIterations) {
scheduleNextFunction(recordValue, defaultValues, fields, strandName, metadata, noOfIterations,
callCount, futureResultConsumer, scheduler, latch);
}
}

@Override
public void notifyFailure(BError error) {
errStream.println(ERROR_PRINT_PREFIX + error.getPrintableStackTrace());
}
};

invokeFunctionPointerAsync(func, retType, future, callback, scheduler, strandName, metadata);
}

private static void invokeFunctionPointerAsync(BFunctionPointer<?, ?> func, Type returnType, FutureValue future,
AsyncFunctionCallback callback, Scheduler scheduler,
String strandName, StrandMetadata metadata) {
future.callback = callback;
callback.setFuture(future);
AsyncFunctionCallback childCallback = new AsyncFunctionCallback(future.strand) {
@Override
public void notifySuccess(Object result) {
callback.notifySuccess(result);
}

@Override
public void notifyFailure(BError error) {
callback.notifyFailure(error);
}
};
scheduler.scheduleFunction(new Object[1], func, null, returnType, strandName, metadata, childCallback);
}

/**
* Create a record value that populates record fields using the given package ID, record type name and a map of
* field names and associated values for the fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ public class Strand {
public BMap<BString, Object> workerReceiveMap = null;
public int channelCount = 0;

public Strand() {
this.id = -1;
this.strandLock = null;
this.name = null;
this.metadata = null;
this.state = RUNNABLE;
}

public Strand(String name, StrandMetadata metadata, Scheduler scheduler, Strand parent,
Map<String, Object> properties) {
this.id = nextStrandId.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,11 @@ public class TypedescValueImpl implements TypedescValue {
public MapValue annotations;
private BTypedesc typedesc;

@Deprecated
public TypedescValueImpl(Type describingType) {
this.type = new BTypedescType(describingType);
this.describingType = describingType;
}

@Deprecated
public TypedescValueImpl(Type describingType, MapValue[] closures) {
this.type = new BTypedescType(describingType);
this.describingType = describingType;
Expand Down

0 comments on commit 44d9cdf

Please sign in to comment.