Go언어로 나만의 Query Exporter 만들어보기!

Overview

안녕하세요. 무더운 7월 잘 지내고 계시죠.?

오늘은 조금 특이한 주제를 가지고 이야기를 해보고자 합니다. 바로 go로 나만의 Exporter를 만들어보는 것입니다. 특정 쿼리를 등록을 해놓으면, 이 쿼리 결과를 Exporter 결과로 보여주는 간단한 프로그램입니다. 아직 Expoter가 무엇인지 생소하신 분들이 있을 수 있겠는데요. 오늘 차근차근 설명을 하면서, 머릿속에 살짝 인스톨해드리도록 하겠습니다. 🙂

Exporter?

Exporter란, Prometheus같은 시계열 데이터베이스에서 데이터를 끌어가기 위한 하나의 HTTP 서버라고 생각하면 되겠습니다. Prometheus에서는 정해진 주기에 따라 exporter의 특정 URL을 호출하고, 그 결과값을 시계열로 데이터를 저장합니다.

prometheus & exporter

세상에는 수많은 Exporter들이 존재하죠. 대표적으로는 Prometheus의 Offcial프로젝트들인 mysqld_expoter가 있고, Percona에서는 이를 Fork 해서 자기들이 추가로 배포하는 mysqld_expoter도 있습니다. 이것 외에도 Linux 노드를 모니터링을 위한 node_expoter 뿐만 아니라, memcached_expoter 등등.. 아주 다양한 exporter들이 존재하죠.
오늘 이 자리에서 할 내용은, 이 다양한 Exporter중에 나만의 새로운 Exporter 하나를 더 추가해보는 과정입니다.

Go 프로젝트 생성하기

Exporter는 다양한 언어로 구현을 할 수 있습니다만, 오늘은 golang으로 구현을 해보도록 하죠. 아무래도, 배포 및 호환성 측면에서 golang 만큼 편리(?)한 것은 없다고 개인적으로 생각하고 있습니다. 여기서 go 설치 및 환경 구성에 대한 것은 생략하도록 하겠습니다.

$ cd ~/go/src

$ mkdir -p query-exporter-simple

$ cd query-exporter-simple

$ go mod init
go: creating new go.mod: module query-exporter-simple

$ ls -al
total 8
drwxr-xr-x   3 chan  staff   96  7 12 13:33 .
drwxr-xr-x  12 chan  staff  384  7 12 13:33 ..
-rw-r--r--   1 chan  staff   38  7 12 13:33 go.mod

$ cat go.mod
module query-exporter-simple

go 1.16

비록 깡통(?) 프로젝트이기는 하지만, 이제 나만의 exporter를 만들기 위한 모든 준비는 완료하였습니다. 이제부터 패키지 관리는 go mod로 관리합니다.

깡통 Exporter 맛보기

자, 이제부터 본격적(?)으로 Exporter를 만들어보도록 하겠습니다. 먼저 맛보기로, 아~무 기능 없는.. 단순히 go version 정도만 출력을 해주는 깡통 Exporter를 만들어보도록 해보죠.

package main

import (
    "flag"
)

func main() {
    // =====================
    // Get OS parameter
    // =====================
    var bind string
    flag.StringVar(&bind, "bind", "0.0.0.0:9104", "bind")
    flag.Parse()
}

flag를 활용하여 OS 파라메터를 읽어오는 내용입니다. Exporter가 구동 시 뜰 서버 바인딩 정보입니다.

package main

import (
    "flag"
    "net/http"

    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "github.com/prometheus/common/version"
    log "github.com/sirupsen/logrus"
)

func main() {
    // =====================
    // Get OS parameter
    // =====================
    var bind string
    flag.StringVar(&bind, "bind", "0.0.0.0:9104", "bind")
    flag.Parse()

    // ========================
    // Regist handler
    // ========================
    prometheus.Register(version.NewCollector("query_exporter"))

    // Regist http handler
    http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
        h := promhttp.HandlerFor(prometheus.Gatherers{
            prometheus.DefaultGatherer,
        }, promhttp.HandlerOpts{})
        h.ServeHTTP(w, r)
    })

    // start server
    log.Infof("Starting http server - %s", bind)
    if err := http.ListenAndServe(bind, nil); err != nil {
        log.Errorf("Failed to start http server: %s", err)
    }
}

수집할 Collector를 등록하고, HTTP server로 Exporter를 구동합니다. Collector는 정보를 수집해주는 하나의 쓰레드(?) 개념으로, Prometheus의 Collector 인터페이스를 구현한 구조체라고 생각하면 쉽게 이해가 되겠습니다.

$ go mod vendor
go: finding module for package github.com/prometheus/common/version
go: finding module for package github.com/prometheus/client_golang/prometheus
go: finding module for package github.com/sirupsen/logrus
go: finding module for package github.com/prometheus/client_golang/prometheus/promhttp
go: found github.com/prometheus/client_golang/prometheus in github.com/prometheus/client_golang v1.11.0
go: found github.com/prometheus/client_golang/prometheus/promhttp in github.com/prometheus/client_golang v1.11.0
go: found github.com/prometheus/common/version in github.com/prometheus/common v0.29.0
go: found github.com/sirupsen/logrus in github.com/sirupsen/logrus v1.8.1

$ ls -al
total 112
drwxr-xr-x   6 chan  staff    192  7 13 10:26 .
drwxr-xr-x  12 chan  staff    384  7 12 13:33 ..
-rw-r--r--   1 chan  staff    169  7 13 10:26 go.mod
-rw-r--r--   1 chan  staff  45722  7 13 10:26 go.sum
-rw-r--r--   1 chan  staff   1163  7 13 10:34 main.go
drwxr-xr-x   6 chan  staff    192  7 13 10:26 vendor

아직 go가 사용하는 패키지들이 프로젝트에 존재하지 않기 때문에, 수많은 에러가 발생할 것입니다. 그래서 위와 같이 go mod vendor 를 통하여 관련 패키지를 받아옵니다. 관련 패키지는 vendor 디렉토리 하단에 위치하게 됩니다.

$ go run .
INFO[0000] Regist version collector - query_exporter
INFO[0000] HTTP handler path - /metrics
INFO[0000] Starting http server - 0.0.0.0:9104

Exporter 서버를 구동시켜보면, 이제 9104 포트(flag에서 기본값으로 지정한 포트)로 서버가 구동될 것입니다.

$ go run . --bind=0.0.0.0:9105
INFO[0000] Regist version collector - query_exporter
INFO[0000] HTTP handler path - /metrics
INFO[0000] Starting http server - 0.0.0.0:9105

만약 포트 변경을 하고 싶다면, 위와 같이 바인딩 정보를 주면 해당 포트로 서버가 구동하겠죠.

$ curl 127.0.0.1:9104/metrics
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0
go_gc_duration_seconds{quantile="0.25"} 0

.. 중략 ..

# HELP go_threads Number of OS threads created.
# TYPE go_threads gauge
go_threads 7
# HELP query_exporter_build_info A metric with a constant '1' value labeled by version, revision, branch, and goversion from which query_exporter was built.
# TYPE query_exporter_build_info gauge
query_exporter_build_info{branch="",goversion="go1.16.5",revision="",version=""} 1

비록 깡통 Exporter임에도 불구하고.. 아주 많은 정보들이 Exporter를 통해 추출되는 것을 확인할 수 있습니다. (대부분의 정보는 go 자체에 대한 정보이기는 합자만..)

가장 하단을 보시면, query_exporter_build_info 메트릭이 들어있는데, 이것이 바로 앞선 부분에서 추가했던 그 Collector가 수집한 정보입니다. 깡통 Exporter를 만들어낸 순간입니다! 감격. ㅠ_ㅠ

본격적으로 Exporter 만들기

조금 전에, 버전 정도만 명시해주는 깡통 Exporter를 만들어보았습니다. 참 쉽죠? ㅎㅎ
이제부터는 우리가 정말로 필요한 정보들을 수집해서, 그 결과를 HTTP GET 메쏘드 호출 결과로 뿌려주는 Collector를 구현해볼 생각입니다. query exporter

Configuration format (YAML)

앞서 이야기한 것처럼, 등록한 쿼리의 결과를 Exporter 결과 매트릭으로 뿌리는 것을 만들고자 합니다. 그러기 위해서는 타겟 인스턴스에 대한 정보도 알아야할 것이고, 실제 실행할 쿼리에 대해서도 알고 있어야겠죠.

dsn: test:test123@tcp(127.0.0.1:3306)/information_schema
metrics:
  process_count_by_host:
    query: "select user, substring_index(host, ':', 1) host, count(*) sessions from information_schema.processlist group by 1,2 "
    type: gauge
    description: "process count by host"
    labels: ["user","host"]
    value: sessions
  process_count_by_user:
    query: "select user, count(*) sessions from information_schema.processlist group by 1 "
    type: gauge
    description: "process count by user"
    labels: ["user"]
    value: sessions

위와 같은 포멧으로 설정을 해보도록 하겠습니다. MySQL 접속 정보와, 실제로 수행할 쿼리입니다. "호스트 별 커넥션 수""유저별 커넥션 수" 두 가지 정보를 결과로 보여줄 것입니다.

type Config struct {
    DSN     string
    Metrics map[string]struct {
        Query       string
        Type        string
        Description string
        Labels      []string
        Value       string
        metricDesc  *prometheus.Desc
    }
}

위 yaml 을 go 구조체로 정의를 해보았습니다. 여기서 metricDesc *prometheus.Desc는 (뒤에서 설명하겠지만) Prometheus 메트릭에서 사용하는 스펙 명세서(?)라고 이해를 해보면 되겠네요. 이 안에는 어떤 Label과 Counter/Gauge같은 메트릭 타입에 대한것도 같이 명시되어 있습니다.

var b []byte
var config Config
if b, err = ioutil.ReadFile("config.yml"); err != nil {
    log.Errorf("Failed to read config file: %s", err)
    os.Exit(1)
}

// Load yaml
if err := yaml.Unmarshal(b, &config); err != nil {
    log.Errorf("Failed to load config: %s", err)
    os.Exit(1)
}

YAML파일을 위와 같이 읽어서, 최종적으로 앞서 정의한 구조체에 설정 정보를 로딩해봅니다.

package main

import (
    "flag"
    "io/ioutil"
    "net/http"
    "os"

    "github.com/ghodss/yaml"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "github.com/prometheus/common/version"
    log "github.com/sirupsen/logrus"
)

var config Config

func main() {
    var err error
    var configFile, bind string
    // =====================
    // Get OS parameter
    // =====================
    flag.StringVar(&configFile, "config", "config.yml", "configuration file")
    flag.StringVar(&bind, "bind", "0.0.0.0:9104", "bind")
    flag.Parse()

    // =====================
    // Load config & yaml
    // =====================
    var b []byte
    if b, err = ioutil.ReadFile(configFile); err != nil {
        log.Errorf("Failed to read config file: %s", err)
        os.Exit(1)
    }

    // Load yaml
    if err := yaml.Unmarshal(b, &config); err != nil {
        log.Errorf("Failed to load config: %s", err)
        os.Exit(1)
    }

    // ========================
    // Regist handler
    // ========================
    log.Infof("Regist version collector - %s", "query_exporter")
    prometheus.Register(version.NewCollector("query_exporter"))

    // Regist http handler
    log.Infof("HTTP handler path - %s", "/metrics")
    http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
        h := promhttp.HandlerFor(prometheus.Gatherers{
            prometheus.DefaultGatherer,
        }, promhttp.HandlerOpts{})
        h.ServeHTTP(w, r)
    })

    // start server
    log.Infof("Starting http server - %s", bind)
    if err := http.ListenAndServe(bind, nil); err != nil {
        log.Errorf("Failed to start http server: %s", err)
    }
}

// =============================
// Config config structure
// =============================
type Config struct {
    DSN     string
    Metrics map[string]struct {
        Query       string
        Type        string
        Description string
        Labels      []string
        Value       string
        metricDesc  *prometheus.Desc
    }
}

이렇게 하면, 이제 필요한 정보를 Config 구조체에 담아서, 이를 활용하여 원하는 구현을 해볼 수 있겠습니다.

Collector 구현해보기

오늘 포스팅의 하이라이트.. 바로 원하는 정보를 수집해보기 위한 Collector를 구현해보는 과정입니다.
지금까지 모든 과정은, 직접 구현한 Collector가 수집한 정보를 HTTP 결과로 보여주기 위한 것이었습니다. Collector에서는 실제로 DB에 접속을 해서 정해진 쿼리를 수행한 결과를 바탕으로 지정한 metric 결과를 전달하는 과정을 품습니다.

type QueryCollector struct{}

// Describe prometheus describe
func (e *QueryCollector) Describe(ch chan<- *prometheus.Desc) {
}

// Collect prometheus collect
func (e *QueryCollector) Collect(ch chan<- prometheus.Metric) {
}

앞서 이야기한 것 처럼, Collector는 정보를 수집해주는 하나의 쓰레드(?) 개념으로, Prometheus의 Collector 인터페이스를 구현한 구조체입니다. 즉, 이 이야기는 만약 나만의 또다른 Collector를 생성하기 위해서는 prometheus.Collector 인터페이스가 정의한 Describe와 Collect 두 가지 정도는 반드시 구현을 해야한다는 것입니다.

func main(){
    .. skip ..
    // ========================
    // Regist handler
    // ========================
    log.Infof("Regist version collector - %s", "query_exporter")
    prometheus.Register(version.NewCollector("query_exporter"))
    prometheus.Register(&QueryCollector{})
    .. skip ..
}

위에서 정의한 Collector를 위와 같이 등록을 해줍니다. 앞서 생성한 깡통 Exporter에 추가했던 Version Collector와 이번에 새롭게 추가한 QueryCollector가 등록됩니다. "/metric"으로 http 요청이 들어오면, 최종적으로는 위 두개의 Collector가 각각의 쓰레드로 수행됩니다.

1. Describe 함수 만들기

각각 메트릭들의 스펙을 정의하는 부분입니다. 사실 반드시 여기에서 메트릭의 스펙을 정의할 필요는 없지만, 여러개의 Collector를 만들어서 운영하는 경우를 생각해본다면, 유용합니다. prometheus.Register 로 Collector가 등록될 시 단 한번 수행되는 메쏘드입니다.

func (e *QueryCollector) Describe(ch chan<- *prometheus.Desc) {
    for metricName, metric := range config.Metrics {
        metric.metricDesc = prometheus.NewDesc(
            prometheus.BuildFQName("query_exporter", "", metricName),
            metric.Description,
            metric.Labels, nil,
        )
        config.Metrics[metricName] = metric
        log.Infof("metric description for \"%s\" registerd", metricName)
    }
}

저는 여기서 앞서 읽어들인 설정 정보에서 Query 관련된 정보로 메트릭의 스펙을 정의하였습니다.

  • prometheus.BuildFQName: 메트릭 명
  • metric.Description: 설명
  • metric.Labels: 라벨명 배열, 이 순서로 라벨값들이 추후 맵핑되어야 함

설정 정보를 보면, 아래와 같이 각각 맵핑이 되겠네요.

metrics:
  # metricName
  process_count_by_user:
    ## metric.Description
    description: "process count by user"
    ## metric.Labels
    labels: ["user"]

2. Collect 함수 만들기

DB에 접속해서, 원하는 SQL 을 실행한 이후 이를 metric으로 만들어주는 부분입니다. 각 쿼리의 실행 결과들은 아래 그림과 같이 지정된 이름의 메트릭명으로 결과로 보여지게 됩니다.

metric results

func (e *QueryCollector) Collect(ch chan<- prometheus.Metric) {

    // Connect to database
    db, err := sql.Open("mysql", config.DSN)
    if err != nil {
        log.Errorf("Connect to database failed: %s", err)
        return
    }
    defer db.Close()

    // Execute each queries in metrics
    for name, metric := range config.Metrics {

        // Execute query
        rows, err := db.Query(metric.Query)
        if err != nil {
            log.Errorf("Failed to execute query: %s", err)
            continue
        }

        // Get column info
        cols, err := rows.Columns()
        if err != nil {
            log.Errorf("Failed to get column meta: %s", err)
            continue
        }

        des := make([]interface{}, len(cols))
        res := make([][]byte, len(cols))
        for i := range cols {
            des[i] = &res[i]
        }

        // fetch database
        for rows.Next() {
            rows.Scan(des...)
            data := make(map[string]string)
            for i, bytes := range res {
                data[cols[i]] = string(bytes)
            }

            // Metric labels
            labelVals := []string{}
            for _, label := range metric.Labels {
                labelVals = append(labelVals, data[label])
            }

            // Metric value
            val, _ := strconv.ParseFloat(data[metric.Value], 64)

            // Add metric
            switch strings.ToLower(metric.Type) {
            case "counter":
                ch <- prometheus.MustNewConstMetric(metric.metricDesc, prometheus.CounterValue, val, labelVals...)
            case "gauge":
                ch <- prometheus.MustNewConstMetric(metric.metricDesc, prometheus.GaugeValue, val, labelVals...)
            default:
                log.Errorf("Fail to add metric for %s: %s is not valid type", name, metric.Type)
                continue
            }
        }
    }
}

labelVals 값에서 볼 수 있듯이, 앞서 Describe에서 정의한 스펙의 Labels 순으로 라벨 값 순서로 전달을 해야합니다. 여기서 counter와 gauge 두 개의 메트릭 타입이 있습니다. 각각의 타입은 아래와 같이 의미를 갖습니다.

  • COUNTER: 증가만 하는 값, prometheus에서는 rate/irate 와 같은 변화량 계산 함수로 지표를 보여줌
    ch <- prometheus.MustNewConstMetric(metric.metricDesc, prometheus.CounterValue, val, labelVals...)
  • GAUGE: 자동차 게이지와 같이, 값이 증가/감소할 수 있는 타입. 일반적으로 프로세스 카운트와 같이 현재 지표 값 그대로 저장할 시 사용
    ch <- prometheus.MustNewConstMetric(metric.metricDesc, prometheus.GaugeValue, val, labelVals...)

    지표로 보여줄 값은 설정에 지정했던 value 항목을 쿼리 결과에서 가져와서 메트릭 value 값으로 저장합니다.

QueryExporter Source

지금까지의 모든 내용들을 취합해보면 아래와 같습니다.

package main

import (
    "database/sql"
    "flag"
    "io/ioutil"
    "net/http"
    "os"
    "strconv"
    "strings"

    "github.com/ghodss/yaml"
    _ "github.com/go-sql-driver/mysql"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "github.com/prometheus/common/version"
    log "github.com/sirupsen/logrus"
)

var config Config

const (
    collector = "query_exporter"
)

func main() {
    var err error
    var configFile, bind string
    // =====================
    // Get OS parameter
    // =====================
    flag.StringVar(&configFile, "config", "config.yml", "configuration file")
    flag.StringVar(&bind, "bind", "0.0.0.0:9104", "bind")
    flag.Parse()

    // =====================
    // Load config & yaml
    // =====================
    var b []byte
    if b, err = ioutil.ReadFile(configFile); err != nil {
        log.Errorf("Failed to read config file: %s", err)
        os.Exit(1)
    }

    // Load yaml
    if err := yaml.Unmarshal(b, &config); err != nil {
        log.Errorf("Failed to load config: %s", err)
        os.Exit(1)
    }

    // ========================
    // Regist handler
    // ========================
    log.Infof("Regist version collector - %s", collector)
    prometheus.Register(version.NewCollector(collector))
    prometheus.Register(&QueryCollector{})

    // Regist http handler
    log.Infof("HTTP handler path - %s", "/metrics")
    http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
        h := promhttp.HandlerFor(prometheus.Gatherers{
            prometheus.DefaultGatherer,
        }, promhttp.HandlerOpts{})
        h.ServeHTTP(w, r)
    })

    // start server
    log.Infof("Starting http server - %s", bind)
    if err := http.ListenAndServe(bind, nil); err != nil {
        log.Errorf("Failed to start http server: %s", err)
    }
}

// =============================
// Config config structure
// =============================
type Config struct {
    DSN     string
    Metrics map[string]struct {
        Query       string
        Type        string
        Description string
        Labels      []string
        Value       string
        metricDesc  *prometheus.Desc
    }
}

// =============================
// QueryCollector exporter
// =============================
type QueryCollector struct{}

// Describe prometheus describe
func (e *QueryCollector) Describe(ch chan<- *prometheus.Desc) {
    for metricName, metric := range config.Metrics {
        metric.metricDesc = prometheus.NewDesc(
            prometheus.BuildFQName(collector, "", metricName),
            metric.Description,
            metric.Labels, nil,
        )
        config.Metrics[metricName] = metric
        log.Infof("metric description for \"%s\" registerd", metricName)
    }
}

// Collect prometheus collect
func (e *QueryCollector) Collect(ch chan<- prometheus.Metric) {

    // Connect to database
    db, err := sql.Open("mysql", config.DSN)
    if err != nil {
        log.Errorf("Connect to database failed: %s", err)
        return
    }
    defer db.Close()

    // Execute each queries in metrics
    for name, metric := range config.Metrics {

        // Execute query
        rows, err := db.Query(metric.Query)
        if err != nil {
            log.Errorf("Failed to execute query: %s", err)
            continue
        }

        // Get column info
        cols, err := rows.Columns()
        if err != nil {
            log.Errorf("Failed to get column meta: %s", err)
            continue
        }

        des := make([]interface{}, len(cols))
        res := make([][]byte, len(cols))
        for i := range cols {
            des[i] = &res[i]
        }

        // fetch database
        for rows.Next() {
            rows.Scan(des...)
            data := make(map[string]string)
            for i, bytes := range res {
                data[cols[i]] = string(bytes)
            }

            // Metric labels
            labelVals := []string{}
            for _, label := range metric.Labels {
                labelVals = append(labelVals, data[label])
            }

            // Metric value
            val, _ := strconv.ParseFloat(data[metric.Value], 64)

            // Add metric
            switch strings.ToLower(metric.Type) {
            case "counter":
                ch <- prometheus.MustNewConstMetric(metric.metricDesc, prometheus.CounterValue, val, labelVals...)
            case "gauge":
                ch <- prometheus.MustNewConstMetric(metric.metricDesc, prometheus.GaugeValue, val, labelVals...)
            default:
                log.Errorf("Fail to add metric for %s: %s is not valid type", name, metric.Type)
                continue
            }
        }
    }
}

만약 패키지가 없다면, go mod vendor를 수행해서 필요한 패키지들을 다운로드 받아보도록 합니다.

$ go run .
INFO[0000] Regist version collector - query_exporter
INFO[0000] metric description for "process_count_by_host" registerd
INFO[0000] metric description for "process_count_by_user" registerd
INFO[0000] HTTP handler path - /metrics
INFO[0000] Starting http server - 0.0.0.0:9104

서버를 구동하고, 실제 Exporter에서 수집하는 정보를 확인해봅니다.

$ curl 127.0.0.1:9104/metrics
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0
go_gc_duration_seconds{quantile="0.25"} 0

.. skip ..

# HELP query_exporter_build_info A metric with a constant '1' value labeled by version, revision, branch, and goversion from which query_exporter was built.
# TYPE query_exporter_build_info gauge
query_exporter_build_info{branch="",goversion="go1.16.5",revision="",version=""} 1
# HELP query_exporter_process_count_by_host process count by host
# TYPE query_exporter_process_count_by_host gauge
query_exporter_process_count_by_host{host="localhost",user="event_scheduler"} 1
query_exporter_process_count_by_host{host="localhost",user="test"} 1
# HELP query_exporter_process_count_by_user process count by user
# TYPE query_exporter_process_count_by_user gauge
query_exporter_process_count_by_user{user="event_scheduler"} 1
query_exporter_process_count_by_user{user="test"} 1

curl로 실행은 해보면, 설정에 정의를 했던 유저별/호스트별 세션 카운트가 정상적으로 보여지는 것을 확인할 수 있습니다.
나만의 Exporter가 만들어진 순간입니다. 🙂

마치며..

포스팅이 굉장히 길었습니다. 소스코드를 몇번이고 본문에 넣었더니.. 내용없이 본문 양만 길어진 느낌적인 느낌;;;
어찌됐건 나만의 고유한 Exporter를 만들어냈습니다! 저는 단순히 쿼리를 등록하여 이 결과를 메트릭 결과로 추출해보는 간단한 기능을 구현했지만, 필요에 따라서 더 많은 재미요소를 각자의 생각에 맞게 가미해볼 수 있을 것 같네요.

참고로, 다음 Git에 위에서 작성한 소스는 정리하였습니다.
https://github.com/go-gywn/query-exporter-simple

때로는 하나의 장비에서 수십~수백대의 장비들을 모니터링을 해야할 때.. 중앙에서 메트릭 수집을 관리하는 것이 유용할 때가 있었습니다. 아직은 MySQL만 기능이 제공되지만, 이런 요구사항을 해결하기 위해, 개인적으로 또다른 Query Exporter 프로젝트를 만들어보았습니다. 위 프로제트 베이스에 병렬처리와 타임아웃 같은 기타 등등을 더 구현해보았습니다.
https://github.com/go-gywn/query-exporter

늘 그래왔지만.. 없으면, 만들면 되고.. 있으면 잘 갖다 쓰면 되겠죠? 모든 것을 만들어볼 수는 없을테니. ㅎㅎ

즐거운 한여름 되세요. 🙂

MySQL에서 리셋되는 시퀀스 만들어보기

Overview

서비스를 준비하다보면, 시퀀스에 대한 요구사항은 언제나 생기기 마련입니다. 물론, MySQL에는 기본적으로 테이블 단위로 auto_increment가 있기는 합니다만, 일반적인 시퀀스가 요구되는 환경을 흡족하게 맞추기는 어려운 실정입니다.
보통은 Peter Zaitsev가 하단에 게시한 블로그 내용처럼, Function 기반으로 채번 함수를 만들고는 하지요. (물론 InnoDB로 지정하는 것이, 복제 상황에서는 아주 안정성을 확보하기는 합니다.)
https://www.percona.com/blog/2008/04/02/stored-function-to-generate-sequences/

이 내용을 기반으로, "재미난 시퀀스를 만들어볼 수 없을까?" 라는 퀘스천에 따라, 이번 블로깅에서는 특정 시점에 리셋이 되는 시퀀스를 한번 만들어보고자 합니다.

Schema

첫번째로는 현재 시퀀스를 담을 테이블 그릇(?)을 아래와 같이 생성을 해보도록 하겠습니다.

CREATE TABLE `t_sequence` (
  `name` varchar(100) NOT NULL,
  `seq_num` bigint(20) NOT NULL DEFAULT '0',
  `mtime` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
  PRIMARY KEY (`name`)
) ENGINE=InnoDB

여기에서 seq_num이 매번 +1되면서 데이터를 전달해주게 되겠죠.
Peter의 블로그와는 다르게, 저는 아래와 같이 insert into .. on duplicate key update .. 구문으로 Upsert 처리하여 시퀀스를 발급하도록 하겠습니다.

insert into t_sequence (name, seq_num, mtime) values ('abc', last_insert_id(1), now(6)) 
    on duplicate key update seq_num = last_insert_id(seq_num+1), mtime = now(6);

그런데, 이 구문은 단순히 시퀀스값을 매번 1씩 증가하는 것으로. 우리에게 필요한 것은 매일 0시 혹은 매시 시퀀스 값이 1부터 다시 초기화되는 로직이 쿼리안에 필요한 것입니다. 그래서, 위 쿼리를 아래와 같이 변경을 해봅니다. (매분 1로 초기화)

insert into t_sequence (name, seq_num, mtime) values ('abc', last_insert_id(1), now(6)) 
    on duplicate key update seq_num = last_insert_id(if(mtime < date_format(now(6), '%Y-%m-%d %H:%i:00'), 1, seq_num+1)), mtime = now(6);

on duplicate key update 안에 시간을 체크하는 로직을 추가하여, 결과적으로 0분때마다 다시 1부터 다시 시작하는 값이 추출되는 것이죠.

mysql> insert into t_sequence (name, seq_num, mtime) values ('abc', last_insert_id(1), now(6))
    ->     on duplicate key update seq_num = last_insert_id(if(mtime < date_format(now(6), '%Y-%m-%d %H:%i:00'), 1, seq_num+1)), mtime = now(6);
Query OK, 2 rows affected (0.00 sec)

mysql> select now(), last_insert_id();
+---------------------+------------------+
| now()               | last_insert_id() |
+---------------------+------------------+
| 2021-06-21 12:31:58 |                6 |
+---------------------+------------------+
1 row in set (0.00 sec)

mysql> insert into t_sequence (name, seq_num, mtime) values ('abc', last_insert_id(1), now(6))
    ->     on duplicate key update seq_num = last_insert_id(if(mtime < date_format(now(6), '%Y-%m-%d %H:%i:00'), 1, seq_num+1)), mtime = now(6);
Query OK, 2 rows affected (0.00 sec)

mysql> select now(), last_insert_id();
+---------------------+------------------+
| now()               | last_insert_id() |
+---------------------+------------------+
| 2021-06-21 12:32:01 |                1 |
+---------------------+------------------+
1 row in set (0.00 sec)

자 이제, 이 쿼리들을 조합해서, 아래와 같은 Function을 만들어보겠습니다.

delimiter //
drop function nextval//
create function nextval(in_name varchar(100), in_type char(1)) returns bigint
begin
  declare date_format varchar(20);
  SET date_format = (
    case 
      when in_type = 'M' then '%Y-%m-01 00:00:00' 
      when in_type = 'D' then '%Y-%m-%d 00:00:00' 
      when in_type = 'H' then '%Y-%m-%d %H:00:00' 
      when in_type = 'I' then '%Y-%m-%d %H:%i:00' 
      when in_type = 'S' then '%Y-%m-%d %H:%i:%S' 
      else '%Y-%m-%d 00:00:00'  
    end
  );
  insert into t_sequence (name, seq_num, mtime) values (in_name, last_insert_id(1), now(6)) 
      on duplicate key update seq_num = last_insert_id(if(mtime < date_format(now(6), date_format), 1, seq_num+1)), mtime = now(6);
  return last_insert_id();
end
//
delimiter ;

Function 함수에 나와있듯이, M인경우는 매월 리셋, D는 매일 리셋, H는 매시 리셋.. 등등 파라메터로 리셋할 시점을 정해서 만들어볼 수 있겠습니다.

mysql> select nextval('abc', 'I') seq, now();
+------+---------------------+
| seq  | now()               |
+------+---------------------+
|    1 | 2021-06-21 12:40:42 |
+------+---------------------+
1 row in set (0.00 sec)

mysql> select nextval('abc', 'I') seq, now();
+------+---------------------+
| seq  | now()               |
+------+---------------------+
|    2 | 2021-06-21 12:40:52 |
+------+---------------------+
1 row in set (0.00 sec)

mysql> select nextval('abc', 'I') seq, now();
+------+---------------------+
| seq  | now()               |
+------+---------------------+
|    3 | 2021-06-21 12:40:56 |
+------+---------------------+
1 row in set (0.00 sec)

mysql> select nextval('abc', 'I') seq, now();
+------+---------------------+
| seq  | now()               |
+------+---------------------+
|    1 | 2021-06-21 12:41:00 |
+------+---------------------+
1 row in set (0.00 sec)

필요하다면, Function의 insert into .. on duplicate update.. 구문 안에 더 다양한 요구 사항을 넣어볼 수 있을 듯 합니다. 🙂

Performance

함수로 만들어지기 때문에.. 느릴 수도 있다고 선입견을 가지신 분들을 위해서.. 간단하게 아래와 같이 테스트를 해보았습니다.

Environments

Intel(R) Core(TM) i3-8100 CPU @ 3.60GHz(4core), 32G Memory

MySQL parameter

mysql> show variables where Variable_name in ('innodb_flush_log_at_trx_commit', 'sync_binlog');
+--------------------------------+-------+
| Variable_name                  | Value |
+--------------------------------+-------+
| innodb_flush_log_at_trx_commit | 0     |
| sync_binlog                    | 0     |
+--------------------------------+-------+
2 rows in set (0.00 sec)

1. Local test

시퀀스 특성 상 특정 row에 대한 Lock이 매번 발생할 수밖에 없습니다. 이 얘기는, 네트워크 레이턴시가 관여할 수록 더욱 낮은 퍼포먼스를 보인다는 이야기인데요. 우선 서버에 접속해서 mysqlslap으로 아래와 같이 시퀀스 발급 트래픽을 무작위로 줘봅니다.

$ time mysqlslap -utest      \
  --password=test123         \
  --create-schema=test       \
  --iterations=1             \
  --number-of-queries=100000 \
  --query="select test.nextval('abc', 'H');"
Benchmark
    Average number of seconds to run all queries: 5.979 seconds
    Minimum number of seconds to run all queries: 5.979 seconds
    Maximum number of seconds to run all queries: 5.979 seconds
    Number of clients running queries: 10
    Average number of queries per client: 10000
real    0m5.996s
user    0m0.915s
sys 0m1.709s
mysqlslap -uroot --concurrency=10 --create-schema=test --iterations=1    0.18s user 0.32s system 15% cpu 3.285 total

5.996초 수행되었고, 초당 16,666 시퀀스 발급이 이루어졌네요!!

2. Remote test

거실에 있는 블로그 서버로의 네트워크 레이턴시는 대략 아래와 같습니다. 04~0.5ms 사이를 왔다갔다 하는듯..

$ ping 10.5.5.11
PING 10.5.5.11 (10.5.5.11): 56 data bytes
64 bytes from 10.5.5.11: icmp_seq=0 ttl=64 time=0.404 ms

이 환경에서 위와 동일한 테스트 트래픽을 발생시켜보았습니다.

$ time mysqlslap -utest      \
  --password=test123         \
  --host=10.5.5.11           \
  --concurrency=10           \
  --create-schema=test       \
  --iterations=1             \
  --number-of-queries=100000 \
  --query="select test.nextval('abc', 'H');"
mysqlslap: [Warning] Using a password on the command line interface can be insecure.
Benchmark
    Average number of seconds to run all queries: 7.191 seconds
    Minimum number of seconds to run all queries: 7.191 seconds
    Maximum number of seconds to run all queries: 7.191 seconds
    Number of clients running queries: 10
    Average number of queries per client: 10000
mysqlslap -utest --password=test123 --host=10.5.5.11 --concurrency=10      0.43s user 0.44s system 11% cpu 7.238 total

7.191초 수행하였고, 초당 13,906건 정도 시퀀스 발급이 이루어졌습니다.

개인적인 생각으로는.. 단일 시퀀스 성능으로는 이정도도 나쁘지 않다고 생각합니다만.. ^^ 만약 시퀀스 자체가 이렇게 공유하는 개념이 아닌 개인별로 할당되는 구조로 관리된다면..? row lock으로 인한 불필요한 대기를 어느정도 줄여줄 수 있을 것으로 생각되네요.

$ time mysqlslap -utest      \
  --password=test123         \
  --host=10.5.5.11           \
  --concurrency=10           \
  --create-schema=test       \
  --iterations=1             \
  --number-of-queries=100000 \
  --query="select test.nextval(concat('ab',floor(rand()*10)), 'H');"
mysqlslap: [Warning] Using a password on the command line interface can be insecure.
Benchmark
    Average number of seconds to run all queries: 5.702 seconds
    Minimum number of seconds to run all queries: 5.702 seconds
    Maximum number of seconds to run all queries: 5.702 seconds
    Number of clients running queries: 10
    Average number of queries per client: 10000
mysqlslap -utest --password=test123 --host=10.5.5.11 --concurrency=10      0.40s user 0.45s system 14% cpu 5.767 total

앞서 7.2초 걸리던 결과를 5.7초 정도로 처리하였는데. 만약 네트워크 레이턴시가 많이 안좋은 환경에서는 Lock으로 인한 대기를 크게 경감시킴으로써 훨씬 더 좋은 효과를 보여줄 것이라 생각합니다.

Concluion

MySQL에 없는 시퀀스를 서비스 요구사항에 맞게 좀더 재미나게 만들어보자라는 생각으로 시작하였습니다.
특정 서비스건, 개인화 서비스건.. 0시 기준으로 새롭게 1부터 시작해야하는 시퀀스 요구사항을 가끔 듣기는 했습니다. 이럴때 기존이라면, 락을 걸고, 현재 시퀀스 값을 가지고 리셋 처리 여부를 결정해야할 것인데.. 여기서는 이것을 간단하게 단건의 INSERT 구문으로 해결을 하였습니다.

필요에 따라.. 특정 이벤트의 개인화 테이블에.. 최근 1시간동안 10회 이상이면 다시 1부터 시작하는 이상스러운 시퀀스도 재미나게 만들어볼 수 있을 듯 하네요.

오랜만의 포스팅을 마칩니다.

MySQL binlog파서와 memcached plugin의 콜라보레이션!

Overview

6개월도 훌쩍 넘은 시간에. 간만에 포스팅합니다. 그동안 OGG javaue든, MySQL Binlog파서든.. 흐르는 데이터를 핸들링하는 고민으로 하루하루를 지내왔던 것 같아요. 그러던 중 이전 포스팅에서 주제로 삼았던, InnoDB memcached plugin을 Binlog parsing을 통해 데이터를 맞추면 좋을 것 같다는 생각이 들었습니다.
오늘 이 자리에서는 이런 답답함을 극복하고자, Binlog 이벤트를 활용하여, 최신 데이터를 유지시키는 방안에 대해서 이야기를 해보도록 하겠습니다.

MySQL Binary log?

MySQL에서 데이터복제를 위해서는 Binnary Log(binlog)를 쓰게 되는데, 이중 ROW 포멧으로 만들어지는 이벤트를 활용하여 다양한 데이터 핸들링이 가능합니다.
file

ROW Event는 특정 테이블에 대한 정보를 알려주는 Table map event가 우선 선행하고, 해당 테이블과 연계된 데이터가 뒤따르게 됩니다. 참고로, (InnoDB에서) 트랜잭션 처리가 이루어지면 아래와 같은 이벤트로 바이너리 로그에 기록이 됩니다.

[SQL Event] begin;
[TABLE MAP Event] tableA
  [WRITE ROW Event] data[]
  [WRITE ROW Event] data[]
  [WRITE ROW Event] data[]
[TABLE MAP Event] tableB
  [UPDATE ROW Event]  <data[], data[]>
  [UPDATE ROW Event] < data[], data[]>
[XID Event]

그리고, MySQL에서는 binlog_row_image 에 따라 전체데이터(FULL)로 기록할 것인지, 최소한의 정보(MINIMAL)만 기록할 것인지 설정할 수 있습니다.

"FULL" binlog_row_image

변경 이전/이후 모든 데이터를 기록하게 되는데, 많은 정보가 포함되어 있는만큼 다양한 핸들링을 해볼수 있습니다만.. 바이너리 로그 사이즈가 지나치게 커질 수 있다는 리스크가 있습니다.
게시판 조회수를 예로 들자면.. 게시물내용과, 게시물조회수가 동일한 테이블에 있는 상황이라면, 단순히 조회수 1이 증가할 뿐인데.. 이에 대한 바이너리 로그에는 게시물내용까지 포함되어 기록되는 결과로 이어집니다. (잘나가는 사이트에서는 환장할지도 몰라용.)

"MINIMAL" binlog_row_image

변경된 데이터와 PK 데이터만 포함하는.. 정말 최소한의 데이터만 바이너리로그에 기록합니다. 이렇게되면, 로그 사이즈 리스크에서는 자유로워졌지만.. 자유로운 데이터 핸들링에서는 아무래도 제약이 있을 수밖에 없습니다.

참고로, FULL이든 MINIMAL이든, 어떤 칼럼들이 변경되었는지에 대한 정보를 BIT SET로 묶어서 TABLE MAP Event에 포함시켜 전달을 하죠. MINIMAL 이미지 에서도 제대로 처리하기 위해서는 이녀석을 잘 보고, 정말로 변경된 칼럼을 잘 선별해야 합니다. 🙂

Binlog parser & Memcached plugin

자! Binlog 포멧에 대한 구구절절한 이야기는 이정도로 하고, MySQL Row format의 이런 특성을 통해서 무엇을 상상해볼 수있을 지 이야기를 해보도록 하겠습니다. (바로 아래와 같은 상상?)
file

이런 구조에서 고민을 해야할 것은 두가지 정도라고 생각하는데요.

1. DDL 작업
기존 서비스DB에, Memcached plugin 설정 시 칼럼 맵핑 작업을 통해, 여러 칼럼들을 묶어서 memcache protocal에 실어나를 수 있습니다만.. InnoDB plugin으로 인한 대상 테이블에 대한 DDL 제약이 생각보다 많이 생기게 됩니다.

2. 최신 캐시 데이터
캐시서비스를 운영한다면, 가장 최신의 데이터를 캐시에 유지하는 것이라고 포인트라고 개인적으로 생각합니다.

그런데, ROW format에는 어떤 경우는 PK에 대한 정보는 반드시 포함이 되어 있다고 했죠! 그렇다면, 위 그림과 같이, Binlog 이벤트를 받으면, 전달받은 PK를 기반으로 소스DB에서 데이터를 SELECT하고, 결과를 JSON으로 변환해서 타겟DB에 덮어쓰는 것은 어떨까요?
스키마로 묶여있던 칼럼들을 JSON으로 변환(스키마리스)해서 넣기 때문에, DDL 작업으로부터 자유롭습니다. 그리고, 데이터 변경 시 소스에서 가장 최신의 데이터로 적용하기 때문에, 데이터 일관성 측면에서도 유리합니다. (Binlog에 저장된 순으로 데이터를 기록하게 되면, 결과적으로 변경되는 과정이 사용자에게 노출될 수도 있겠죠?)

Binlog이벤트를 받아서, 최신 데이터를 JSON형태로 유지를 시키면, 어플리케이션에서는 그냥 memcache protocal을 통해 데이터를 GET만 하면, 굉장히 빠른 속도로 데이터 처리가 가능해지는 것이죠.

참고로 MySQL InnoDB memcached 플러그인에 대한 내용은 예전에 시리즈로 포스팅을 한 적이 있습니다. (그냥 참고삼아.ㅎ)

uldra-binlog-json-transfer

최근 밤에 잠이 잘 안와서, 간단하게 구현해보았습니다. 개인적으로 CDC관련된 자체 아류작이 워낙 많다보니.. 이제는 하루이틀 집중하면, 간단한 처리 구현은 아예 찍어내고 마네요. -_-;;
https://github.com/gywndi/uldra-binlog-json-transfer
file

Conclusion

MySQL Binlog 이벤트 처리를 활용하여 Memcached 데이터를 잘 응용해볼 수 있는 방안에 대해서 정리해봤습니다. 그리고, 고민의 결과는 이미 위에서 깃헙에 공유를 해놓았고요. ^^

uldra-binlog-json-transfer로 만든 내용은 아래와 같습니다.
1. MySQL Binlog parser
2. PK로 소스 테이블에서 데이터 SELECT
3. 결과를 JSON으로 변환하여 타겟DB에 저장

프로토타입 수준으로 만들어놓은 프로젝트이기에.. 지적질 환영이고, 의견 교류 너무 좋아하고 환장합니다. 🙂

다들 코로나 멋지게 극복하세요.