Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH#323: Fix syslogStream tests #423

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<teragrep.pth_03.version>9.2.0</teragrep.pth_03.version>
<teragrep.pth_06.version>3.3.2</teragrep.pth_06.version>
<teragrep.rlp_01.version>4.0.1</teragrep.rlp_01.version>
<teragrep.rlp_03.version>1.7.6</teragrep.rlp_03.version>
<teragrep.rlp_03.version>9.0.0</teragrep.rlp_03.version>
</properties>
<dependencies>
<!--DPL-dependencies -->
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/teragrep/pth10/ast/StepList.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ private void analyze() {
}
}

if (step.hasProperty(AbstractStep.CommandProperty.NO_PRECEDING_AGGREGATE)) {
if (aggregateCount > 0) {
throw new RuntimeException("Step '" + step + "' cannot be used after aggregations!");
}
}

if (step.hasProperty(AbstractStep.CommandProperty.SEQUENTIAL_ONLY)) {
LOGGER.info("[Analyze] Sequential only command: <{}>", step);
// set the breakpoint just once
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/teragrep/pth10/steps/AbstractStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public enum CommandProperty {
IGNORE_DEFAULT_SORTING, // Command applies a certain order to the rows
SEQUENTIAL_ONLY, // Works only in Sequential mode (forEachBatch)
AGGREGATE, // If there are multiple aggregate commands, switch to sequential mode is necessary
REQUIRE_PRECEDING_AGGREGATE // this command requires an aggregate command before it
REQUIRE_PRECEDING_AGGREGATE, // this command requires an aggregate command before it
NO_PRECEDING_AGGREGATE // command does not allow an aggregate command before it
}

protected final Set<CommandProperty> properties = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class TeragrepSyslogStep extends AbstractStep {
public TeragrepSyslogStep(String relpHost, int relpPort) {
this.relpHost = relpHost;
this.relpPort = relpPort;
this.properties.add(CommandProperty.NO_PRECEDING_AGGREGATE);
}

@Override
Expand Down
142 changes: 98 additions & 44 deletions src/test/java/com/teragrep/pth10/SyslogStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,15 @@
*/
package com.teragrep.pth10;

import com.teragrep.rlp_03.Server;
import com.teragrep.rlp_03.SyslogFrameProcessor;
import org.apache.spark.sql.streaming.StreamingQueryException;
import com.teragrep.net_01.channel.socket.PlainFactory;
import com.teragrep.net_01.eventloop.EventLoop;
import com.teragrep.net_01.eventloop.EventLoopFactory;
import com.teragrep.net_01.server.Server;
import com.teragrep.net_01.server.ServerFactory;
import com.teragrep.rlp_03.frame.FrameDelegationClockFactory;
import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.frame.delegate.FrameContext;
import com.teragrep.rlp_03.frame.delegate.FrameDelegate;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
Expand All @@ -57,9 +63,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* Tests for | teragrep exec syslog stream Uses streaming datasets
Expand All @@ -71,6 +83,14 @@ public class SyslogStreamTest {

private static final Logger LOGGER = LoggerFactory.getLogger(SyslogStreamTest.class);

private final List<String> messages = new ArrayList<>();
private final int listenPort = 9999;

private Server server;
private EventLoop eventLoop;
private Thread eventLoopThread;
private ExecutorService executorService;

private final String testFile = "src/test/resources/regexTransformationTest_data*.jsonl"; // * to make the path into a directory path
private final StructType testSchema = new StructType(new StructField[] {
new StructField("_time", DataTypes.TimestampType, false, new MetadataBuilder().build()),
Expand All @@ -95,74 +115,108 @@ void setEnv() {
@BeforeEach
void setUp() {
this.streamingTestUtil.setUp();
messages.clear();
serverSetup();
}

@AfterEach
void tearDown() {
this.streamingTestUtil.tearDown();
eventLoop.stop();
Assertions.assertDoesNotThrow(() -> eventLoopThread.join());
executorService.shutdown();
Assertions.assertDoesNotThrow(() -> server.close());
}

private void serverSetup() {
executorService = Executors.newFixedThreadPool(1);

Consumer<FrameContext> syslogConsumer = new Consumer<FrameContext>() {

// NOTE: synchronized because frameDelegateSupplier returns this instance for all the parallel connections
@Override
public synchronized void accept(FrameContext frameContext) {
messages.add(frameContext.relpFrame().payload().toString());
}
};

/*
* New instance of the frameDelegate is provided for every connection
*/
Supplier<FrameDelegate> frameDelegateSupplier = () -> new DefaultFrameDelegate(syslogConsumer);

/*
* EventLoop is used to notice any events from the connections
*/
EventLoopFactory eventLoopFactory = new EventLoopFactory();
try {
eventLoop = eventLoopFactory.create();
}
catch (IOException e) {
throw new RuntimeException(e);
}

eventLoopThread = new Thread(eventLoop);
/*
* eventLoopThread must run, otherwise nothing will be processed
*/
eventLoopThread.start();

/*
* ServerFactory is used to create server instances
*/
ServerFactory serverFactory = new ServerFactory(
eventLoop,
executorService,
new PlainFactory(),
new FrameDelegationClockFactory(frameDelegateSupplier)
);

try {
server = serverFactory.create(listenPort);
System.out.println("server started at port <" + listenPort + ">");
}
catch (IOException ioException) {
throw new UncheckedIOException(ioException);
}
}

// ----------------------------------------
// Tests
// ----------------------------------------

@Disabled(value = "RLP-03 has to be updated") /* FIXME: Update rlp_03 library to work with new rlp_01 version! */
@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
matches = "true"
) // teragrep exec syslog stream
public void syslogStreamSendingTest() {
final int expectedSyslogs = 10;
AtomicInteger numberOfSyslogMessagesSent = new AtomicInteger();
AtomicReferenceArray<String> arrayOfSyslogs = new AtomicReferenceArray<>(expectedSyslogs);

final Consumer<byte[]> cbFunction = (message) -> {
LOGGER.debug("Server received the following syslog message:\n <[{}]>\n-----", new String(message));
Assertions.assertTrue(numberOfSyslogMessagesSent.get() <= expectedSyslogs);
arrayOfSyslogs.set(numberOfSyslogMessagesSent.getAndIncrement(), new String(message));
};

final int port = 9999;
final Server server = new Server(port, new SyslogFrameProcessor(cbFunction));
Assertions.assertDoesNotThrow(server::start);

streamingTestUtil
.performDPLTest(
"index=index_A | teragrep exec syslog stream host 127.0.0.1 port " + port, testFile, ds -> {
LOGGER.debug("Syslog msgs = <{}>", numberOfSyslogMessagesSent.get());
Assertions.assertEquals(expectedSyslogs, numberOfSyslogMessagesSent.get());

for (int i = 0; i < expectedSyslogs; i++) {
String s = arrayOfSyslogs.get(i);
for (int j = 0; j < expectedSyslogs; j++) {
if (i == j)
continue;
Assertions.assertFalse(arrayOfSyslogs.compareAndSet(j, s, s));
}

}
Assertions.assertAll("stop server", server::stop);
"index=index_A | teragrep exec syslog stream host 127.0.0.1 port " + listenPort, testFile,
ds -> {
LOGGER.debug("Syslog msgs = <{}>", messages.size());
Assertions.assertEquals(10, messages.size());
Assertions.assertEquals(10, new HashSet<>(messages).size());
}
);
}

@Disabled(value = "RLP-03 has to be updated") // FIXME: update rlp_03
@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
matches = "true"
) // teragrep exec syslog stream, with preceding aggregation command
public void syslogStreamSendingFailureTest() {
Assertions
.assertThrows(
StreamingQueryException.class,
() -> streamingTestUtil
.performDPLTest(
"index=index_A | stats count(_raw) as craw | teragrep exec syslog stream host 127.0.0.1 port 9998",
testFile, ds -> {
}
)
RuntimeException rte = streamingTestUtil
.performThrowingDPLTest(
RuntimeException.class,
"index=index_A | stats count(_raw) as craw | teragrep exec syslog stream host 127.1.0.1 port 9998",
testFile, ds -> {
}
);

Assertions.assertNotNull(rte);
Assertions.assertTrue(rte.getMessage().contains("cannot be used after aggregations!"));
51-code marked this conversation as resolved.
Show resolved Hide resolved
}
}