-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
184 lines (162 loc) · 5.1 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package main
import (
"context"
"fmt"
"github.com/cploutarchou/go-kafka-rest/hub"
"github.com/gofiber/websocket/v2"
"log"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/cploutarchou/go-kafka-rest/controllers"
"github.com/cploutarchou/go-kafka-rest/initializers"
middlewares "github.com/cploutarchou/go-kafka-rest/middleware"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/logger"
)
// Global variables for controllers, middleware, and fiber app
var (
controller *controllers.Controller
app *fiber.App
middleware *middlewares.Middleware
config *initializers.Config
brokers []string
)
// setupApp initializes the fiber app, middleware, and controllers
// It returns a new fiber app and an error if one occurs during setup
func setupApp() (*fiber.App, error) {
var err error
// Load the configuration from environment variables
config, err = initializers.LoadConfig(".")
if err != nil {
return nil, fmt.Errorf("failed to load environment variables: %s", err.Error())
}
// Connect to the database
initializers.ConnectDB(config)
db := initializers.GetDB()
// Initialize the middleware and controllers
middleware = middlewares.NewMiddleware(config, db)
brokers = strings.Split(config.KafkaBrokers, ",")
controller = controllers.NewController(db, brokers, int32(config.KafkaNumOfPartitions))
// Check if Kafka brokers are set
if config.KafkaBrokers == "" {
return nil, fmt.Errorf("KAFKA_BROKER environment variable is not set")
}
// Create a new fiber app
app := fiber.New()
// Use the logger middleware for request logging
app.Use(logger.New())
// Set the CORS policy
allowedOrigins := strings.Join(config.CorsAllowedOrigins, ",")
app.Use(cors.New(cors.Config{
AllowOrigins: allowedOrigins,
AllowHeaders: "Origin, Content-Type, Accept",
AllowMethods: "GET, POST",
AllowCredentials: true,
}))
// Custom middleware to log the execution time of each request
app.Use(func(c *fiber.Ctx) error {
start := time.Now()
if err := c.Next(); err != nil {
code := fiber.StatusInternalServerError
if e, ok := err.(*fiber.Error); ok {
code = e.Code
}
return c.Status(code).JSON(fiber.Map{
"status": "fail",
"message": err.Error(),
})
}
log.Printf("[%s] %s", c.Method(), c.Path())
log.Printf("execution time: %v", time.Since(start))
return nil
})
return app, nil
}
// setupRoutes sets up all the routes for the fiber app
func setupRoutes(controller *controllers.Controller) *fiber.App {
app := fiber.New()
// Health check endpoint
app.Get("/healthchecker", func(c *fiber.Ctx) error {
return c.Status(fiber.StatusOK).JSON(fiber.Map{
"status": "success",
"message": "JWT Authentication with Golang, Fiber, and GORM",
})
})
// Authentication endpoints
app.Route("/auth", func(router fiber.Router) {
router.Post("/register", controller.User.SignUpUser)
router.Post("/login", controller.User.SignInUser)
router.Get("/logout", middleware.DeserializeUser, controller.User.LogoutUser)
router.Get("/refresh", middleware.DeserializeUser, controller.User.RefreshToken)
})
logger_ := log.New(os.Stdout, "logger: ", log.Lshortfile)
if config.EnableWebsocket {
// log that we are starting the hub and pass the logger to it
hub_ := hub.NewHub(brokers, logger_, nil)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go hub_.Run(ctx, logger_)
app.Route("/kafka", func(router fiber.Router) {
router.Post("/send-message", middleware.DeserializeUser, controller.User.SendMessage)
router.Get("/ws", middleware.DeserializeUser, websocket.New(func(c *websocket.Conn) {
hub_.UpgradeWebSocket(c, logger_)
}))
})
} else {
app.Route("/kafka", func(router fiber.Router) {
router.Post("/send-message", middleware.DeserializeUser, controller.User.SendMessage)
})
}
// User details endpoint
app.Get("/users/me", middleware.DeserializeUser, controller.User.GetMe)
// Fallback route
app.All("*", func(c *fiber.Ctx) error {
path := c.Path()
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{
"status": "fail",
"message": fmt.Sprintf("path: %v does not exist on this server", path),
})
})
return app
}
// main is the entry point of the application
func main() {
// Setup the fiber app
var err error
app, err = setupApp()
if err != nil {
log.Fatalf("failed to setup app: %v", err)
return
}
// Setup the routes
routes := setupRoutes(controller)
// Mount the routes on /api
app.Mount("/api", routes)
// Get the port from the environment variable or use the default port
port := os.Getenv("PORT")
if port == "" {
port = "8045"
}
// Start the server
go func() {
if err := app.Listen(":" + port); err != nil && err != http.ErrServerClosed {
log.Fatalf("server listen: %s", err)
}
}()
// Wait for a shutdown signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
// Shutdown the server gracefully
log.Print("shutting down server...")
if err := app.Shutdown(); err != nil {
log.Fatal("server shutdown:", err)
}
log.Print("server exited")
}