Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Uxio0 committed Sep 11, 2023
1 parent 8c7b18b commit 57b927a
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 11 deletions.
94 changes: 92 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"amqplib": "^0.10.3",
"axios": "^1.4.0",
"cache-manager": "^5.2.1",
"ethers": "^6.7.1",
"express-formidable": "^1.2.0",
"express-session": "^1.17.3",
"pg": "^8.11.3",
Expand Down
59 changes: 55 additions & 4 deletions src/routes/events/events.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,69 @@
import { Test, TestingModule } from '@nestjs/testing';

Check warning on line 1 in src/routes/events/events.controller.spec.ts

View workflow job for this annotation

GitHub Actions / lint

'TestingModule' is defined but never used
import { EventsController } from './events.controller';
import { EventsService } from './events.service';
import { QueueProvider } from '../../datasources/queue/queue.provider';
import { WebhookService } from '../webhook/webhook.service';
import { firstValueFrom } from 'rxjs';
import { TxServiceEvent, TxServiceEventType } from './event.dto';
import { BadRequestException } from '@nestjs/common';

describe('EventsController', () => {
let controller: EventsController;
let service: EventsService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
const module = await Test.createTestingModule({
controllers: [EventsController],
}).compile();
providers: [EventsService, QueueProvider, WebhookService],
})
.overrideProvider(QueueProvider)
.useValue({})
.overrideProvider(WebhookService)
.useValue({})
.compile();

controller = module.get<EventsController>(EventsController);
service = module.get<EventsService>(EventsService);
});

it('should be defined', () => {
expect(controller).toBeDefined();
describe('SSE events', () => {
it('should require an EIP55 address', async () => {
const notValidAddress = '0x8618CE407F169ABB1388348A19632AaFA857CCB9';
const expectedError = new BadRequestException('Not valid EIP55 address', {
description: `${notValidAddress} is not a valid EIP55 Safe address`,
});
expect(() => {
controller.sse(notValidAddress);
}).toThrow(expectedError);
});
it('should receive for a Safe', async () => {
const relevantSafeAddress = '0x8618ce407F169ABB1388348A19632AaFA857CCB9';
const notRelevantSafeAddress =
'0x3F6E283068Ded118459B56fC669A27C3a65e587D';
const txServiceEvents: Array<TxServiceEvent> = [
{
chainId: '1',
type: 'SAFE_CREATED' as TxServiceEventType,
hero: 'Saitama',
address: notRelevantSafeAddress,
},
{
chainId: '100',
type: 'MESSAGE_CREATED' as TxServiceEventType,
hero: 'Atomic Samurai',
address: relevantSafeAddress,
},
];
const observable = controller.sse(relevantSafeAddress);
const firstValue = firstValueFrom(observable);
txServiceEvents.map((txServiceEvent) =>
service.pushEventToEventsObservable(txServiceEvent),
);

// Not relevant event must be ignored by Safe filter
const event = await firstValue;
expect(event.data).toEqual(txServiceEvents[1]);
expect(event.type).toEqual(txServiceEvents[1].type);
});
});
});
12 changes: 9 additions & 3 deletions src/routes/events/events.controller.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import { Controller, Param, Sse } from '@nestjs/common';
import { BadRequestException, Controller, Param, Sse } from '@nestjs/common';
import { Observable } from 'rxjs';
import { EventsService } from './events.service';
import { getAddress, isAddress } from 'ethers';

@Controller('events')
export class EventsController {
constructor(private eventsService: EventsService) {}
constructor(private readonly eventsService: EventsService) {}

@Sse('/sse/:safe')
sse(@Param('safe') safe: string): Observable<MessageEvent> {
return interval(1000).pipe(map((_) => ({ data: { hello: 'world' } })));
if (isAddress(safe) && getAddress(safe) === safe)
return this.eventsService.getEventsObservableForSafe(safe);

throw new BadRequestException('Not valid EIP55 address', {
description: `${safe} is not a valid EIP55 Safe address`,
});
}
}
5 changes: 3 additions & 2 deletions src/routes/events/events.module.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Module } from '@nestjs/common';
import { EventsController } from './events.controller';
import { EventsService } from './events.service';
import { WebhookModule } from '../webhook/webhook.module';
import { QueueModule } from '../../datasources/queue/queue.module';
import { WebhookModule } from '../webhook/webhook.module';

@Module({
imports: [QueueModule, WebhookModule],
// controllers: [Controller],
controllers: [EventsController],
providers: [EventsService],
})
export class EventsModule {}
40 changes: 40 additions & 0 deletions src/routes/events/events.service.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Observable, Subject, filter } from 'rxjs';
import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common';
import { WebhookService } from '../webhook/webhook.service';
import { QueueProvider } from '../../datasources/queue/queue.provider';
Expand All @@ -7,6 +8,7 @@ import { TxServiceEvent } from './event.dto';
@Injectable()
export class EventsService implements OnApplicationBootstrap {
private readonly logger = new Logger(EventsService.name);
private eventsSubject = new Subject<MessageEvent<TxServiceEvent>>();

constructor(
private readonly queueProvider: QueueProvider,
Expand All @@ -24,6 +26,43 @@ export class EventsService implements OnApplicationBootstrap {
);
}

/**
*
* @param safe
* @returns Events rx.js observable used by the Server Side Events endpoint
*/
getEventsObservableForSafe(
safe: string,
): Observable<MessageEvent<TxServiceEvent>> {
return this.eventsSubject.pipe(filter((ev) => ev.data.address === safe));
}

/**
* Push txServiceEvent to the events observable (used by the Server Side Events endpoint)
* @param txServiceEvent
* @returns Crafted MessageEvent from txServiceEvent
*/
pushEventToEventsObservable(
txServiceEvent: TxServiceEvent,
): MessageEvent<TxServiceEvent> {
const messageEvent: MessageEvent<TxServiceEvent> = new MessageEvent(
txServiceEvent.type,
{
data: txServiceEvent,
},
);
this.eventsSubject.next(messageEvent);
return messageEvent;
}

/**
* Complete event observable
* This function is useful for testing purposes
*/
completeEventsObservable() {
return this.eventsSubject.complete();
}

/**
*
* Event must have at least a `chainId` and `type`
Expand Down Expand Up @@ -55,6 +94,7 @@ export class EventsService implements OnApplicationBootstrap {
return Promise.resolve([undefined]);
}

this.pushEventToEventsObservable(txServiceEvent);
return this.webhookService.postEveryWebhook(txServiceEvent);
}
}
28 changes: 28 additions & 0 deletions test/app.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { INestApplication } from '@nestjs/common';
import * as request from 'supertest';
import { AppModule } from './../src/app.module';
import { QueueProvider } from '../src/datasources/queue/queue.provider';
import { EventsService } from '../src/routes/events/events.service';

/* eslint-disable */
const { version } = require('../package.json');
Expand All @@ -11,12 +12,14 @@ const { version } = require('../package.json');
describe('AppController (e2e)', () => {
let app: INestApplication;
let queueProvider: QueueProvider;
let eventsService: EventsService;

beforeEach(async () => {
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [AppModule],
}).compile();

eventsService = moduleFixture.get<EventsService>(EventsService);
queueProvider = moduleFixture.get<QueueProvider>(QueueProvider);
app = moduleFixture.createNestApplication();
await app.init();
Expand All @@ -34,4 +37,29 @@ describe('AppController (e2e)', () => {
.expect(200)
.expect(expected);
});

describe('/events/sse/:safe (GET)', () => {
it('should subscribe to server side events', () => {
const validSafeAddress = '0x8618ce407F169ABB1388348A19632AaFA857CCB9';
const url = `/events/sse/${validSafeAddress}`;
const expected = {};

const result = request(app.getHttpServer())
.get(url)
.expect(200)
.expect(expected);
eventsService.completeEventsObservable();
return result;
});
it('should return a 400 if safe address is not EIP55 valid', () => {
const notValidAddress = '0x8618CE407F169ABB1388348A19632AaFA857CCB9';
const url = `/events/sse/${notValidAddress}`;
const expected = {
statusCode: 400,
message: 'Not valid EIP55 address',
error: `${notValidAddress} is not a valid EIP55 Safe address`,
};
return request(app.getHttpServer()).get(url).expect(400).expect(expected);
});
});
});

0 comments on commit 57b927a

Please sign in to comment.