Skip to content

Commit

Permalink
setup cors, improved sse BE and implemented on FE, fixed major deadlo…
Browse files Browse the repository at this point in the history
…ck bug, added static indexes to scraper, added buffers to hub
  • Loading branch information
smeggmann99 committed Nov 17, 2024
1 parent fa1fa5f commit 0618dfe
Show file tree
Hide file tree
Showing 35 changed files with 804 additions and 77 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ go.work.sum
.vscode
app/db
app/dist
build
build
web/prod/basement_floor.jpg
web/prod/first_floor.jpg
web/prod/ground_floor.jpg
web/prod/second_floor.jpg
21 changes: 12 additions & 9 deletions api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,31 @@ import (
"smuggr.xyz/optivum-bsf/common/config"
"smuggr.xyz/optivum-bsf/common/models"

"github.com/gin-contrib/cors"
"github.com/gin-contrib/gzip"
"github.com/gin-gonic/gin"
)

var DefaultRouter *gin.Engine
var Config *config.APIConfig

func Initialize(scheduleChannels *models.ScheduleChannels) (chan error) {
func Initialize(scheduleChannels *models.ScheduleChannels) chan error {
fmt.Println("initializing api/v1")

Config = &config.Global.API
gin.SetMode(os.Getenv("GIN_MODE"))

DefaultRouter = gin.Default()
DefaultRouter.Use(func(c *gin.Context) {
c.Writer.Header().Set("Access-Control-Allow-Origin", "http://localhost:3000")
c.Writer.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
c.Writer.Header().Set("Access-Control-Allow-Headers", "Origin, Content-Type, X-Auth-Token")
c.Next()
})
DefaultRouter.Use(gzip.Gzip(gzip.DefaultCompression))

DefaultRouter.Use(cors.New(cors.Config{
AllowOrigins: []string{"http://localhost:3000, http://localhost:3002, https://zsem.smuggr.xyz/"},
AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
AllowHeaders: []string{"Origin", "Content-Type", "X-Auth-Token"},
ExposeHeaders: []string{"Content-Length"},
AllowCredentials: true,
}))

DefaultRouter.Use(gzip.Gzip(gzip.DefaultCompression))
routes.Initialize(DefaultRouter, scheduleChannels)

errCh := make(chan error)
Expand All @@ -41,4 +44,4 @@ func Initialize(scheduleChannels *models.ScheduleChannels) (chan error) {
}()

return errCh
}
}
8 changes: 4 additions & 4 deletions api/v1/routes/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ package routes
import (
"fmt"

"smuggr.xyz/optivum-bsf/common/models"
"smuggr.xyz/optivum-bsf/api/v1/handlers"
"smuggr.xyz/optivum-bsf/common/models"
"smuggr.xyz/optivum-bsf/core/sse"

"github.com/gin-gonic/gin"
Expand All @@ -17,9 +17,9 @@ func SetupGenericRoutes(router *gin.Engine, rootGroup *gin.RouterGroup, schedule
healthGroup.GET("/ping", handlers.PingHandler)
}

var DivisionsHub = sse.NewHub()
var TeachersHub = sse.NewHub()
var RoomsHub = sse.NewHub()
var DivisionsHub = sse.NewHub(Config.MaxSSEClients)
var TeachersHub = sse.NewHub(Config.MaxSSEClients)
var RoomsHub = sse.NewHub(Config.MaxSSEClients)

go DivisionsHub.Run()
go TeachersHub.Run()
Expand Down
5 changes: 5 additions & 0 deletions api/v1/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package routes
import (
"os"

"smuggr.xyz/optivum-bsf/common/config"
"smuggr.xyz/optivum-bsf/common/models"
"smuggr.xyz/optivum-bsf/api/v1/handlers"

Expand All @@ -13,7 +14,11 @@ import (
"github.com/gin-gonic/gin"
)

var Config config.APIConfig

func Initialize(defaultRouter *gin.Engine, scheduleChannels *models.ScheduleChannels) {
Config = config.Global.API

//defaultLimiter := tollbooth.NewLimiter(0.5, nil)
defaultRouter.Use(static.Serve("/", static.LocalFile(os.Getenv("DIST_PATH"), false)))

Expand Down
6 changes: 6 additions & 0 deletions app/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
"teacher": 5,
"room": 5
}
},
"static_indexes": {
"divisions": [],
"teachers": [],
"rooms": [43]
}
},
"api": {
"port": 3001,
"max_sse_clients": 100,
"open_weather": {
"base_url": "https://api.openweathermap.org/data/2.5/",
"endpoints": {
Expand Down
10 changes: 8 additions & 2 deletions app/_config → app/test_config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"scraper": {
"base_url": "http://localhost:3000/",
"base_url": "http://localhost:3002/",
"endpoints": {
"division": "o%d.html",
"teacher": "n%d.html",
Expand All @@ -15,15 +15,21 @@
"teacher": 10,
"room": 10
}
},
"static_indexes": {
"divisions": [],
"teachers": [],
"rooms": []
}
},
"api": {
"port": 3001,
"max_sse_clients": 100,
"open_weather": {
"base_url": "https://api.openweathermap.org/data/2.5/",
"endpoints": {
"current_weather": "weather?lat=%f&lon=%f&appid=%s&lang=%s&units=%s",
"forecast_weather": "forecast?lat=%f&lon=%f&appid=%s&lang=%s&units=%s",
"forecast_weather": "forecast?lat=%f&lon=%f&appid=%s&lang=%s&units=%s&cnt=%d",
"current_air_pollution": "air_pollution?lat=%f&lon=%f&appid=%s"
},
"lat": 49.60982192707506,
Expand Down
9 changes: 9 additions & 0 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ func Initialize() error {
viper.AddConfigPath(os.Getenv("CONFIG_PATH"))
viper.SetConfigType(os.Getenv("CONFIG_TYPE"))

configPurpose := os.Getenv("CONFIG_PURPOSE")
if configPurpose == "test" {
viper.SetConfigName("test_config")
} else if configPurpose == "prod" {
viper.SetConfigName("config")
} else {
return fmt.Errorf("invalid CONFIG_PURPOSE: %s", configPurpose)
}

if err := loadConfig(&Global); err != nil {
return err
}
Expand Down
18 changes: 13 additions & 5 deletions common/config/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@ type scraperQuantities struct {
Workers quantitiesWorkers `mapstructure:"workers"`
}

type scraperStaticIndexes struct {
Divisions []int64 `mapstructure:"divisions"`
Teachers []int64 `mapstructure:"teachers"`
Rooms []int64 `mapstructure:"rooms"`
}

type ScraperConfig struct {
BaseUrl string `mapstructure:"base_url"`
Endpoints scraperEndpoints `mapstructure:"endpoints"`
Quantities scraperQuantities `mapstructure:"quantities"`
BaseUrl string `mapstructure:"base_url"`
Endpoints scraperEndpoints `mapstructure:"endpoints"`
Quantities scraperQuantities `mapstructure:"quantities"`
StaticIndexes scraperStaticIndexes `mapstructure:"static_indexes"`
}

type openWeatherEndpoints struct {
Expand All @@ -40,8 +47,9 @@ type openWeatherConfig struct {
}

type APIConfig struct {
Port int16 `mapstructure:"port"`
OpenWeather openWeatherConfig `mapstructure:"open_weather"`
Port int16 `mapstructure:"port"`
OpenWeather openWeatherConfig `mapstructure:"open_weather"`
MaxSSEClients int16 `mapstructure:"max_sse_clients"`
}

type GlobalConfig struct {
Expand Down
16 changes: 12 additions & 4 deletions core/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,21 @@ func (h *Hub) worker(id int64) {
select {
case o := <-h.tasksCh:
fmt.Printf("worker %d: processing observer with URL: %s\n", id, o.URL)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)

// It's safe to defer cancel here because it's called before the next iteration
// However, to avoid multiple defers in a loop, I'll call cancel manually
changed := o.CompareHashWithClient(ctx, h.client)
cancel()

if changed {
go o.Callback()
if o.Callback != nil {
go o.Callback()
} else {
fmt.Printf("worker %d: no callback for observer with URL: %s\n", id, o.URL)
}
}

case <-h.quitCh:
fmt.Printf("stopping worker of id %d\n", id)
return
Expand Down Expand Up @@ -164,7 +168,11 @@ func (h *Hub) scheduler() {
o.Mu.Lock()
if o.NextRun.Before(now) || o.NextRun.Equal(now) {
o.NextRun = now.Add(o.Interval)
h.tasksCh <- o
select {
case h.tasksCh <- o:
case <-time.After(10 * time.Second):
fmt.Printf("timed out sending observer with URL: %s to tasksCh\n", o.URL)
}
}
o.Mu.Unlock()
}
Expand Down
27 changes: 20 additions & 7 deletions core/scraper/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,26 @@ func waitForFirstRefresh() {
roomObservers := len(RoomsScraperResource.Hub.GetAllObservers(true))

totalObservers := divisionObservers + teacherObservers + roomObservers
wg.Add(totalObservers)

if totalObservers > 0 {
wg.Add(totalObservers)
} else {
fmt.Println("no observers to wait for")
return
}

waitForRefresh := func(ch <-chan int64, count int) {
for i := 0; i <= count; i++ {
fmt.Println("waiting for refresh:", count)
for i := 0; i < count; i++ {
go func() {
<-ch
wg.Done()
select {
case <-ch:
wg.Done()
case <-time.After(20 * time.Second):
fmt.Println("timed out waiting for refresh")
wg.Done()
}
}()
}
}
Expand All @@ -56,7 +69,7 @@ func waitForFirstRefresh() {
close(done)
}()

// Some observers may not refresh so we
// Some observers might not refresh so we
// need to wait for a certain amount of time
select {
case <-done:
Expand Down Expand Up @@ -352,7 +365,7 @@ func newDivisionObserver(index int64, refreshChan *chan int64) *observer.Observe
}

url := fmt.Sprintf(Config.BaseUrl + Config.Endpoints.Division, index)
interval := time.Duration((index + 1) / 10 + 15) * time.Second
interval := time.Duration((index + 1) / 10 + 5) * time.Second

return observer.NewObserver(index, url, interval, extractFunc, callbackFunc)
}
Expand Down Expand Up @@ -389,7 +402,7 @@ func newTeacherObserver(index int64, refreshChan *chan int64) *observer.Observer
}

url := fmt.Sprintf(Config.BaseUrl + Config.Endpoints.Teacher, index)
interval := time.Duration((index + 1) / 10 + 15) * time.Second
interval := time.Duration((index + 1) / 10 + 5) * time.Second

return observer.NewObserver(index, url, interval, extractFunc, callbackFunc)
}
Expand Down Expand Up @@ -428,7 +441,7 @@ func newRoomObserver(index int64, refreshChan *chan int64) *observer.Observer {
}

url := fmt.Sprintf(Config.BaseUrl + Config.Endpoints.Room, index)
interval := time.Duration((index + 1) / 10 + 15) * time.Second
interval := time.Duration((index + 1) / 10 + 5) * time.Second

return observer.NewObserver(index, url, interval, extractFunc, callbackFunc)
}
4 changes: 4 additions & 0 deletions core/scraper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ func ScrapeDivisionsIndexes() ([]int64, error) {
}

indexes := []int64{}
indexes = append(indexes, Config.StaticIndexes.Divisions...)

doc.Find("a").Each(func(index int, element *goquery.Selection) {
href, exists := element.Attr("href")
if exists {
Expand Down Expand Up @@ -408,6 +410,7 @@ func ScrapeTeachersIndexes() ([]int64, error) {
}

indexes := []int64{}
indexes = append(indexes, Config.StaticIndexes.Teachers...)

doc.Find("a").Each(func(index int, element *goquery.Selection) {
href, exists := element.Attr("href")
Expand Down Expand Up @@ -441,6 +444,7 @@ func ScrapeRoomsIndexes() ([]int64, error) {
}

indexes := []int64{}
indexes = append(indexes, Config.StaticIndexes.Rooms...)

doc.Find("a").Each(func(index int, element *goquery.Selection) {
href, exists := element.Attr("href")
Expand Down
Loading

0 comments on commit 0618dfe

Please sign in to comment.