Micro-services Using go-kit: Service Discovery
In a environment where number of instances and network locations changes dynamically, a client needs a discovery mechanism to determine the location of a service instance, in order to send a request to it.
Overview
Nowadays, a micro-service based application usually runs in a containerized environments. The main reason for this approach because it is essential for a service to be scaled individually based on its resources need. Container itself has a capability to bring flexibility in terms of adding number of instances and changing network locations. However, this dynamic-ism brings another effect to our client code. We have to discover a service to which request will be sent. This consequences lead us to a service discovery mechanism.
Client Side Service Discovery
Basically, a service discovery is a mechanism to a client to find a service without the needs to know which is a service is located. Rather, the client will query through a service registry to obtain the location of service instances. Then the client will send a request to its associate instance.
Server Side Service Discovery
In this mechanism, rather than a client query through a service registry, it sends the requests through a router. The router then will query the service registry and forward the requests to a service instance.
Consul
Briefly, consul is a tool for discovering and configuring services in your infrastructure. It provides several key features such as service discovery, health checking, key-value Store and multi data center.
For the purpose of this article, I will use two consul key features. The first one, of course, the service discovery feature. The other one is health checking feature. This feature basically check whether a service is still in a healthy condition in order to process requests.
For more details about consul and its features, you can visit consul website.
FYI, consul is not the only available tools for service registry. There are several tools in the market such as etcd and Apache Zookeeper.
Use Case
This article will use client side service discovery mechanism. First, I will register Lorem Service into consul. Then I will create a client to invoke Lorem Service after querying the registry. The client itself will expose a service under sd-lorem
context path. Moreover, I will create health check function under existing service in order to provide consul the information whether the service is healthy or not.
Usually, Lorem Service only invoke URL without payload and then return the result. Instead, sd-lorem
endpoint will be a little bit different. I will invoke endpoint with POST method and pass the payload. The payload will be in the JSON format, with structure:
1 2 3 4 5 |
{ "requestType": "word", "min": 10, "max": 10 } |
In addition, I will run the consul via docker and use progrium/consul
image. Since this is article purposely to show how service discovery works with consul, I only run a single consul server. However, this is not recommended for production because you need the cluster setup. For more information, read the guideline on their website.
Step by Step
First of all, copied the lorem-metrics
folder and give it a name lorem-consul
to a new folder. Then start a consul agent by issuing this command on shell:
1 2 |
# execute docker for consul docker run --rm -p 8400:8400 -p 8500:8500 -p 8600:53/udp -h node1 progrium/consul -server -bootstrap -ui-dir /ui |
Step 1: service.go
Add a new function and give it a name HealthCheck
then implement the new function. The function takes no input and return a boolean value.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// declare new function type Service interface { // existing function here.... // health check HealthCheck() bool } //new implementation of HealthCheck function //existing function here... func (LoremService) HealthCheck() bool { //to make the health check always return true is a bad idea //however, I did this on purpose to make the sample run easier. //Ideally, it should check things such as amount of free disk space or free memory return true } |
Note: the HealthCheck
implementation is very simple. It only returns true so the response code will be 200 OK. However, this is a bad idea to always return true in production environment.
Step 2: logging.go and instrument.go
In the previous article, we extend the Service
interface to loggingMiddleware
struct and metricsMiddleware
struct. Then we implemented the service function for each struct. This would bring another consequences, which we have to implement the new HealthCheck
function to each struct.
logging.go
1 2 3 4 5 6 7 8 9 10 11 12 |
// implement logging feature in HealthCheck function func (mw loggingMiddleware) HealthCheck() (output bool) { defer func(begin time.Time){ mw.logger.Log( "function","HealthCheck", "result", output, "took", time.Since(begin), ) }(time.Now()) output = mw.Service.HealthCheck() return } |
instrument.go
1 2 3 4 5 6 7 8 9 10 |
// implement metrics feature in HealthCheck function func (mw metricsMiddleware) HealthCheck() (output bool) { defer func(begin time.Time) { lvs := []string{"method", "HealthCheck"} mw.requestCount.With(lvs...).Add(1) mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds()) }(time.Now()) output = mw.Service.HealthCheck() return } |
Step 3: endpoint.go
This step is pretty clear because we need to expose a health endpoint. Then consul can consume this endpoint to determine the health status.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
//Health Request type HealthRequest struct { } //Health Response type HealthResponse struct { Status bool `json:"status"` } // creating health endpoint func MakeHealthEndpoint(svc Service) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { status := svc.HealthCheck() return HealthResponse{Status: status}, nil } } |
Step 4: transport.go
After creating the endpoint function, then we need to add a new HTTP handler for HealthCheck
function. Then the function will be exposed under /health
context path. As usual, at first we create a decode function for Health request.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
// decode health check func decodeHealthRequest(_ context.Context, _ *http.Request) (interface{}, error) { return HealthRequest{}, nil } // Make Http Handler func MakeHttpHandler(_ context.Context, endpoint Endpoints, logger log.Logger) http.Handler { //Existing handler here... //GET /health r.Methods("GET").Path("/health").Handler(httptransport.NewServer( endpoint.HealthEndpoint, decodeHealthRequest, EncodeResponse, options..., )) return r } |
Step 5: registration.go
This is a new file for handling service registration to consul. In this file, we will create a new function with name Register
. This function takes four input parameters, which are:
- Consul Address. The IP address of consul agent.
- Consul Port. The port of consul agent.
- Advertised Address. Service instance IP address.
- Advertised Port. Service instance port.
The function then will return Registrar
interface from go-kit library.
Basically, the Register
function will use consul API to register the service as well as its health monitoring endpoint.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
// registration func Register(consulAddress string, consulPort string, advertiseAddress string, advertisePort string) (registar sd.Registrar) { // Logging domain. var logger log.Logger { logger = log.NewLogfmtLogger(os.Stderr) logger = log.With(logger, "ts", log.DefaultTimestampUTC) logger = log.With(logger, "caller", log.DefaultCaller) } rand.Seed(time.Now().UTC().UnixNano()) // Service discovery domain. In this example we use Consul. var client consulsd.Client { consulConfig := api.DefaultConfig() consulConfig.Address = consulAddress + ":" + consulPort consulClient, err := api.NewClient(consulConfig) if err != nil { logger.Log("err", err) os.Exit(1) } client = consulsd.NewClient(consulClient) } check := api.AgentServiceCheck{ HTTP: "http://" + advertiseAddress + ":" + advertisePort + "/health", Interval: "10s", Timeout: "1s", Notes: "Basic health checks", } port, _ := strconv.Atoi(advertisePort) num := rand.Intn(100) // to make service ID unique asr := api.AgentServiceRegistration{ ID: "lorem" + strconv.Itoa(num), //unique service ID Name: "lorem", Address: advertiseAddress, Port: port, Tags: []string{"lorem", "ru-rocker"}, Check: &check, } registar = consulsd.NewRegistrar(client, &asr, logger) return } |
Note: see highlighted lines to identify the specific service. Consul uses service name and tags to query the registry.
Step 6: lorem-consul.d/main.go
In this file, we add a function to create a health endpoint then register service to consul by calling Register
function. Next we call the registar.Register()
function before starting the HTTP server. Then implement registar.Deregister()
function whenever the HTTP server is terminated.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
// inside main function ... // parse variable from input command var ( consulAddr = flag.String("consul.addr", "", "consul address") consulPort = flag.String("consul.port", "", "consul port") advertiseAddr = flag.String("advertise.addr", "", "advertise address") advertisePort = flag.String("advertise.port", "", "advertise port") ) flag.Parse() // existing implementation here... // Make health endpoint healthEndpoint := lorem_consul.MakeHealthEndpoint(svc) endpoint := lorem_consul.Endpoints{ LoremEndpoint: e, HealthEndpoint: healthEndpoint, } // Register Service to Consul registar := lorem_consul.Register(*consulAddr, *consulPort, *advertiseAddr, *advertisePort) // HTTP transport go func() { ilog.Println("Starting server at port", *advertisePort) // register service registar.Register() handler := r errChan <- http.ListenAndServe(":" + *advertisePort, handler) }() go func() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) errChan <- fmt.Errorf("%s", <-c) }() error := <- errChan // deregister service registar.Deregister() ilog.Fatalln(error) |
Note: see highlighted line for register to and deregister from consul.
Step 7: discover.d/main.go
In this step, we create a folder discover.d
and create a main.go
file. Because our client uses different payload format compare to existing Lorem Service, first of all we need to decode and encode the request from our client.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
// decode request from discovery service // parsing JSON into LoremRequest func decodeConsulLoremRequest(_ context.Context, r *http.Request) (interface{}, error) { var request lorem_consul.LoremRequest if err := json.NewDecoder(r.Body).Decode(&request); err != nil { return nil, err } return request, nil } // Encode request form LoremRequest into existing Lorem Service // The encode will translate the LoremRequest into /lorem/{requestType}/{min}/{max} func encodeLoremRequest(_ context.Context, req *http.Request, request interface{}) error { lr := request.(lorem_consul.LoremRequest) p := "/" + lr.RequestType + "/" + strconv.Itoa(lr.Min) + "/" + strconv.Itoa(lr.Max) req.URL.Path += p return nil } |
Next we create a function to encode and decode the response. Because the client will return the same payload with existing Lorem Service, we do not need to implement new encode functionality. Rather, we will use existing EncodeResponse
function. So we just need to implement the decode function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
// decode response from Lorem Service func decodeLoremResponse(_ context.Context, resp *http.Response) (interface{}, error) { var response lorem_consul.LoremResponse var s map[string]interface{} if respCode := resp.StatusCode; respCode >= 400 { if err := json.NewDecoder(resp.Body).Decode(&s); err != nil{ return nil, err } return nil, errors.New(s["error"].(string) + "\n") } if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { return nil, err } return response, nil } |
Next, create a factory function to construct the URL from service discovery, set encode and decode function then parse it into an endpoint.Endpoint
function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// factory function to parse URL from Consul to Endpoint func loremFactory(_ context.Context, method, path string) sd.Factory { return func(instance string) (endpoint.Endpoint, io.Closer, error) { if !strings.HasPrefix(instance, "http") { instance = "http://" + instance } tgt, err := url.Parse(instance) if err != nil { return nil, nil, err } tgt.Path = path var ( enc ht.EncodeRequestFunc dec ht.DecodeResponseFunc ) enc, dec = encodeLoremRequest, decodeLoremResponse return ht.NewClient(method, tgt, enc, dec).Endpoint(), nil, nil } } |
Go into main function, in this function we construct a consul client, subscribe to consul agent then create an HTTP handler from the factory endpoint.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
// main function func main() { var ( consulAddr = flag.String("consul.addr", "", "consul address") consulPort = flag.String("consul.port", "", "consul port") ) flag.Parse() // Logging domain. var logger log.Logger { logger = log.NewLogfmtLogger(os.Stderr) logger = log.With(logger,"ts", log.DefaultTimestampUTC) logger = log.With(logger,"caller", log.DefaultCaller) } // Service discovery domain. In this example we use Consul. var client consulsd.Client { consulConfig := api.DefaultConfig() consulConfig.Address = "http://" + *consulAddr + ":" + *consulPort consulClient, err := api.NewClient(consulConfig) if err != nil { logger.Log("err", err) os.Exit(1) } client = consulsd.NewClient(consulClient) } tags := []string{"lorem", "ru-rocker"} passingOnly := true duration := 500 * time.Millisecond var loremEndpoint endpoint.Endpoint ctx := context.Background() r := mux.NewRouter() factory := loremFactory(ctx, "POST", "/lorem") serviceName := "lorem" subscriber := consulsd.NewSubscriber(client, factory, logger, serviceName, tags, passingOnly) balancer := lb.NewRoundRobin(subscriber) retry := lb.Retry(1, duration, balancer) loremEndpoint = retry // POST /sd-lorem // Payload: {"requestType":"word", "min":10, "max":10} r.Methods("POST").Path("/sd-lorem").Handler(ht.NewServer( loremEndpoint, decodeConsulLoremRequest, lorem_consul.EncodeResponse, // use existing encode response since I did not change the logic on response )) // Interrupt handler. errc := make(chan error) go func() { c := make(chan os.Signal) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) errc <- fmt.Errorf("%s", <-c) }() // HTTP transport. go func() { logger.Log("transport", "HTTP", "addr", "8080") errc <- http.ListenAndServe(":8080", r) }() // Run! logger.Log("exit", <-errc) } |
Note: see highlighted lines. We use the same tags and service name from Register
function (Step 5).
Step 8: Run the Sample
For this sample, I will run two instances and register them to consul. They are running on port 7002 and 7003 respectively.
1 2 3 4 5 6 7 8 9 10 |
# Running in separate console cd $GOPATH/src/github.com/ru-rocker/gokit-playground go run lorem-consul/lorem-consul.d/main.go \ -consul.addr localhost -consul.port 8500 \ -advertise.addr 192.168.1.103 -advertise.port 7002 go run lorem-consul/lorem-consul.d/main.go \ -consul.addr localhost -consul.port 8500 \ -advertise.addr 192.168.1.103 -advertise.port 7003 |
Note: remember consul agent already running via docker.
After services started, you will see in the console similar output:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# service on port 7002 2017/04/16 23:14:16 Starting server at port 7002 ts=2017-04-16T16:14:16.430222994Z caller=registrar.go:33 service=lorem tags="[lorem ru-rocker]" address=192.168.1.103 action=register ts=2017-04-16T16:14:18.573538644Z caller=logging.go:71 function=HealthCheck result=true took=997ns # service on port 7003 2017/04/16 23:14:42 Starting server at port 7003 ts=2017-04-16T16:14:42.87250799Z caller=registrar.go:33 service=lorem tags="[lorem ru-rocker]" address=192.168.1.103 action=register ts=2017-04-16T16:14:43.310868906Z caller=logging.go:71 function=HealthCheck result=true took=1.169µs # consul agent console 2017/04/16 16:14:16 [INFO] agent: Synced service 'lorem91' 2017/04/16 16:14:19 [INFO] agent: Synced check 'service:lorem91' 2017/04/16 16:14:42 [INFO] agent: Synced service 'lorem17' 2017/04/16 16:14:43 [INFO] agent: Synced check 'service:lorem17' |
And in the consul UI:
And now run the client:
1 2 3 4 |
# client cd $GOPATH/src/github.com/ru-rocker/gokit-playground go run lorem-consul/discover.d/main.go \ -consul.addr localhost -consul.port 8500 |
Then try to make a request via curl:
1 2 3 4 5 6 7 8 9 10 11 |
# send two request $ curl -XPOST -d'{"requestType":"word", "min":10, "max":10}' http://localhost:8080/sd-lorem $ {"message":"meminerunt"} $ curl -XPOST -d'{"requestType":"word", "min":10, "max":10}' http://localhost:8080/sd-lorem $ {"message":"quaecumque"} # output in port 7002 ts=2017-04-16T16:24:38.924105072Z caller=logging.go:31 function=Word min=10 max=10 result=meminerunt took=20.607µs # output in port 7003 ts=2017-04-16T16:24:39.958944257Z caller=logging.go:31 function=Word min=10 max=10 result=quaecumque took=4.217µs |
Advance Topic
In the previous article, we talk about API monitoring using Prometheus and Grafana. We still want to monitor our API, but now it is a little bit complex, because we cannot put in a static config way our scrape target. Just remember about dynamic-ism of micro services architecture.
Luckily, Prometheus provides consul configuration natively. Scraping of all instances can be easily achieved by consul_sd_config
and relabeling config. The setup is configured through prometheus.yml
prometheus.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
global: scrape_interval: 15s # By default, scrape targets every 15 seconds. # Attach these labels to any time series or alerts when communicating with # external systems (federation, remote storage, Alertmanager). external_labels: monitor: 'rurocker-monitor' # A scrape configuration containing exactly one endpoint to scrape: # Here it's Prometheus itself. scrape_configs: # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config. - job_name: 'prometheus' # Override the global default and scrape targets from this job every 5 seconds. scrape_interval: 5s static_configs: - targets: ['localhost:9090'] labels: group: 'local' - job_name: 'ru-rocker' scrape_interval: 5s consul_sd_configs: - server: '192.168.1.103:8500' relabel_configs: - source_labels: [__meta_consul_tags] regex: .*,lorem,.* action: keep - source_labels: [__meta_consul_service] target_label: job |
Note: see __meta_consul_tags
for identifying service name lorem.
Then run Prometheus and Grafana server to scrape our instances. After making several requests, you will see the metrics from two instances.
Note: see lorem-consul/README.md for running these applications. And I am sorry for the screenshots. They are using port 7004 instead of 7003. This is because I run multiple times before capturing the results. And I mixed it with multiple ports. But, need no worry, the main point is showing metrics from multiple instances.
Summary
One of the advantages of micro-services architecture is each service can be scaled individually based on its resource needs. But this bring another consequence, which is the needs of service registry.
Consul has provided a great tools to handle this mechanism, through its service discovery functionality. Moreover, go-kit itself, as a micro-service framework, already shipped with service discovery library under github.com/go-kit/kit/sd
. Therefore, both of them help us a lot to overcome the needs of service registry. So, thanks to you who provide this nice cool tools and libraries.
That’s all for today. Happy Passover. Happy Easter.
PS: complete code is on my github, under folder lorem-consul.
References
- Service Discovery Architecture (https://www.nginx.com/blog/service-discovery-in-a-microservices-architecture/)
- Client Side Service Discovery (http://microservices.io/patterns/client-side-discovery.html)
- Server Side Service Discovery (http://microservices.io/patterns/server-side-discovery.html)
- Consul (https://www.consul.io/)