DEV Community

Cover image for Understand Golang channels and how to monitor with Grafana (Part 2/2)
Ahmed Ashraf
Ahmed Ashraf

Posted on • Updated on

Understand Golang channels and how to monitor with Grafana (Part 2/2)

In the first part, we talked about what are go channels and how they work and how to implement them in a scalable way.

In this part, I'll show you how to monitor them and get some useful metrics like processing time per worker. How many jobs running per minute. And some other stuff

Stack

let's take a small brief of the tools we will use to do the job.

  • Prometheus: is an open-source time-series metric system. also, it supports alerts but it's not our topic for today.

  • Grafana: is an open-source analytics & monitor solution that supports many data sources and we will use it with Prometheus to display the metric we fire from our Go app.

Prometheus

first, we need to define the metrics in our app and register them in Prometheus package. but let's define the types of the metric and what is the purpose of each one.

  • Counter: a number that can only increase or reset to zero. like how many emails you sent. it can't be at some time 5 then became 3. it can only increase.

  • Gauge: represents a single numerical value that can arbitrarily go up and down. like memory usage or concurrent users and for our case it's running workers and running jobs. because these numbers go up and down.

  • Histogram: in just an observer. based on the samples it gives you the average. like response time, response size, request duration.

  • Summary: It's like a histogram but also it counts the total observations and a sum of all observed values.

I'm not sure I understand summary well. but anyway we don't need it in this article

// prometheus.go

package queue

var (
    JobsProcessed  *prometheus.CounterVec
    RunningJobs    *prometheus.GaugeVec
    ProcessingTime *prometheus.HistogramVec
    RunningWorkers *prometheus.GaugeVec
)

var collectorContainer []prometheus.Collector

//InitPrometheus ... initalize prometheus
func InitPrometheus() {
    prometheus.MustRegister(collectorContainer...)
}

//PushRegister ... Push collectores to prometheus before inializing
func PushRegister(c ...prometheus.Collector) {
    collectorContainer = append(collectorContainer, c...)
}

func InitMetrics() {
    JobsProcessed = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Namespace: "worker",
            Subsystem: "jobs",
            Name:      "processed_total",
            Help:      "Total number of jobs processed by the workers",
        },
        []string{"worker_id", "type"},
    )

    RunningJobs = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Namespace: "worker",
            Subsystem: "jobs",
            Name:      "running",
            Help:      "Number of jobs inflight",
        },
        []string{"type"},
    )

    RunningWorkers = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Namespace: "worker",
            Subsystem: "workers",
            Name:      "running",
            Help:      "Number of workers inflight",
        },
        []string{"type"},
    )

    ProcessingTime = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Namespace: "worker",
            Subsystem: "jobs",
            Name:      "process_time_seconds",
            Help:      "Amount of time spent processing jobs",
        },
        []string{"worker_id", "type"},
    )

    metrics.PushRegister(ProcessingTime, RunningJobs, JobsProcessed, RunningWorkers)
}

after defining the metrics we need to run our endpoint that Prometheus will call to get the metrics data

r.Handle("GET", "/metrics", gin.WrapH(promhttp.Handler()))

Now if we tried to make some calls

JobsProcessed.WithLabelValues("Worker-1", "ahmedash.com").Inc()
JobsProcessed.WithLabelValues("Worker-1", "ahmedash.com").Inc()

JobsProcessed.WithLabelValues("Worker-2", "ahmedash.com").Inc()
JobsProcessed.WithLabelValues("Worker-2", "ahmedash.com").Inc()
JobsProcessed.WithLabelValues("Worker-2", "ahmedash.com").Inc()

the result when you hit your Prometheus endpoint http://localhost/metrics

and if you opened Prometheus dashboard you will find the chart like below

this is the way of how you can work with Prometheus and send the data to it and check it's dashboard for simple visualization.

but as you can see the dashboard of Prometheus is not that fancy, and we need to create a nice dashboard so it's easy for anyone to understand what is going on with the system. and Grafana can tackle this job very well.

Push the metric from Go to Prometheus

From the first part, we will modify the code to send the data we need for the metrics

Follow arrows ⬅️

func (d *Dispatcher) Run() {
    for i := 0; i < d.maxWorkers; i++ {

        // increase the number of running workers
        RunningWorkers.WithLabelValues("Emails").Inc() ⬅️⬅️⬅️⬅️⬅️⬅️⬅️⬅️

        worker := NewWorker(d.WorkerPool)
        worker.Start()
        d.Workers = append(d.Workers, worker)
    }

    go d.dispatch()
}

With every job. we need to increase the jobs in queue metric


func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:

            // Increase running jobs Gauge
            RunningJobs.WithLabelValues("Emails").Inc() ️⬅️⬅️⬅️⬅️⬅️⬅️⬅️

            go func(job Queuable) {
                jobChannel := <-d.WorkerPool
                jobChannel <- job
            }(job)
        }
    }
}
  • First metric: stores the number of jobs processed per worker
  • Second metric: decreases the number of current jobs running from the previous code snippet
  • Third metric: stores the processing time for each job
func (w Worker) Start() {
    go func() {
        for {
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                startTime := time.Now()

                // track the total number of jobs processed by the worker
                JobsProcessed.WithLabelValues(w.Name, "Emails").Inc() ️⬅️⬅️⬅️⬅️⬅️⬅️⬅️
                if err := job.Handle(); err != nil {
                    log.Fatal("Error in job: %s", err.Error())
                }
                // Decrease the number of running jobs once we finish
                RunningJobs.WithLabelValues("Emails").Dec() ️⬅️⬅️⬅️⬅️⬅️⬅️⬅️

                // ⬇️ Register the proccesing time in the Histogram ⬇️
                ProcessingTime.WithLabelValues(w.Name, "Emails").Observe(time.Now().Sub(startTime).Seconds()) ️⬅️⬅️⬅️⬅️⬅️⬅️⬅️
            }
        }
    }()
}

Grafana

Now we have everything ready and our app is working as expected. the last step we need is just to have a beautiful dashboard with Grafana.

from Grafana dashboard we need to add Prometheus as a data source

then from HOME at the top bar create New dashboard

Then we need to add a Singlestat for counters & gauge metrics

Now:

  1. Let's choose Prometheus from data-sources
  2. then search for the metric name
  3. have the full right name of the metric

Then from Options tab choose if it's current or total. in our case it's current

Finally from General tab rename the component

Also, you can control the size of each component in the panel

Let's make another one for the running workers with worker_workers_running metric name

then let's make a graph panel to show how many jobs per minute

Now you can make some requests to the app and see the data in realtime. just make sure to enable the auto-refresh.

Now we have the first part of our dashboard

The second part is just about adding a graph to show the processing duration for each worker

for this metric, the value would be worker_jobs_process_time_seconds_sum / worker_jobs_process_time_seconds_count

and then make sure we display the values in milliseconds

and the final result is

Now we have a dashboard shows us the running workers, how many jobs in queues. jobs processing duration. also, easily we can see if there are any spikes with our jobs or the system works as we expect from him to be.

Conclusion

In this article including the two parts. we learned how to

  • create channels
  • dispatcher/worker pattern
  • how to create&kill workers
  • push metrics and display them in a very nice dashboard

Things we didn't cover

  • We don't deal with failed jobs. so if an error happens to any job we don't have a way to re-try or even store them for later
  • No data persistence. so if the app restarts we will lose all jobs because they are in memory.
  • We didn't show how to make this work with a third-party messaging/queuing system like Redis or RabbitMQ.

Source code

This is a source code contains all the code we went through. should be easy to run as it's dockerized.

https://github.com/ahmedash95/go-channels-demo

References

These 2 articles helped me a lot to understand how I can build powerful apps that support async jobs and monitor them. only my article combines knowledge from.

Top comments (0)