SSH, MySQL, Zerolog, そして Kra

SSH, MySQL, Zerolog

VPS 上に構築された MySQL サービスに大量のデータを送り込む必要がありまして。 Go でバッチ処理を組もうと考えたわけだ。 当然 MySQL サービスは VPS の外から直接アクセスできないので SSH トンネルをくぐる必要がある。

というわけで最初に作ったのが github.com/goark/sshql だった。

再掲載すると,こんな感じに書ける。

package main

import (
    "database/sql"
    "fmt"
    "os"

    "github.com/goark/sshql"
    "github.com/goark/sshql/mysqldrv"
)

func main() {
    dialer := &sshql.Dialer{
        Hostname:   "sshserver",
        Port:       22,
        Username:   "remoteuser",
        Password:   "passphraseforauthkey",
        PrivateKey: "/home/username/.ssh/id_eddsa",
    }
    mysqldrv.New(dialer).RegisterDial("ssh+tcp")

    db, err := sql.Open("mysql", "dbuser:dbpassword@ssh+tcp(localhost:3306)/dbname")
    if err != nil {
        fmt.Fprintln(os.Stderr, err)
        return
    }
    defer dialer.Close()
    defer db.Close()

    rows, err := db.Query("SELECT id, name FROM example ORDER BY id")
    if err != nil {
        fmt.Fprintln(os.Stderr, err)
        return
    }
    for rows.Next() {
        var id int64
        var name string
        if err := rows.Scan(&id, &name); err != nil {
            fmt.Fprintln(os.Stderr, err)
            break
        }
        fmt.Printf("ID: %d  Name: %s\n", id, name)
    }
    rows.Close()
}

ssh+tcp という名前で dialer を登録し,この名前を使って DSN (Data Source Name) を構成するというのがポイント。 Dialer のクローズを忘れずに(笑)

ただ,これだとログが取れない。 んで,どうせログを取るなら github.com/rs/zerolog パッケージを使いたいわけですよ。

そこで登場するのが github.com/simukti/sqldb-logger パッケージ。 これを使えば標準の sql.DBzerolog などのサードパーティ製 logger を仕込むことができる。 こんな感じ。

package main

import (
    "database/sql"
    "fmt"
    "os"

    "github.com/goark/sshql"
    "github.com/goark/sshql/mysqldrv"
    "github.com/rs/zerolog"
    sqldblogger "github.com/simukti/sqldb-logger"
    "github.com/simukti/sqldb-logger/logadapter/zerologadapter"
)

func main() {
    dialer := &sshql.Dialer{
        Hostname:   "sshserver",
        Port:       22,
        Username:   "remoteuser",
        Password:   "passphraseforauthkey",
        PrivateKey: "/home/username/.ssh/id_eddsa",
    }
    mysqldrv.New(dialer).RegisterDial("ssh+tcp")

    dsn := "dbuser:dbpassword@ssh+tcp(localhost:3306)/dbname"
    mysqlDb, err := sql.Open("mysql", dsn)
    if err != nil {
        fmt.Fprintln(os.Stderr, err)
        return
    }
    db := sqldblogger.OpenDriver(
        dsn,
        mysqlDb.Driver(),
        zerologadapter.New(zerolog.New(os.Stderr)),
        sqldblogger.WithMinimumLevel(sqldblogger.LevelDebug),
    )
    defer dialer.Close()
    defer db.Close()

    rows, err := db.Query("SELECT id, name FROM example ORDER BY id")
    if err != nil {
        fmt.Fprintln(os.Stderr, err)
        return
    }
    for rows.Next() {
        var id int64
        var name string
        if err := rows.Scan(&id, &name); err != nil {
            fmt.Fprintln(os.Stderr, err)
            break
        }
        fmt.Printf("ID: %d  Name: %s\n", id, name)
    }
    rows.Close()
}

DSN を2回指定しないといけないのが若干鬱陶しいが,ともかくこれで zerolog による構造化ログが出るようになった。

さらに Kra を仕込む

2022年春の Go Conference でとても感銘を受けたのが github.com/taichi/kra パッケージ。

いや ent とかってデータ構造とその関係を最初から構築するならいい道具だと思うけど(コードで設計できるのは素晴らしい!),既にある RDBMS 環境を使う場合には必ずしもマッチしないのよね。 更に言うと,既存 ORM やクエリビルダとかの中途半端な SQL 抽象化・隠蔽にはウンザリしてるのよ(この辺は Go に限らないけど)。 それなら最初からガチで SQL 文を書いて,クエリプランをチェックしつつ評価・最適化して,それから実装を進めるべきだと常々思っていた1

というわけで github.com/taichi/kra を使うチャンスを伺っていたのだが,今回はお試しにはちょうどいいサイズだったので採用した。 とはいえ,今回のような構成ではどうすればいいのか分からなくて kra パッケージのソースコードやサンプルコードを眺めながら,まずは以下のように書いてみる。

package main

import (
    "database/sql"
    "fmt"
    "os"

    "github.com/goark/sshql"
    "github.com/goark/sshql/mysqldrv"
    "github.com/rs/zerolog"
    sqldblogger "github.com/simukti/sqldb-logger"
    "github.com/simukti/sqldb-logger/logadapter/zerologadapter"
    "github.com/taichi/kra"
    kraSql "github.com/taichi/kra/sql"
)

func main() {
    dialer := &sshql.Dialer{
        Hostname:   "sshserver",
        Port:       22,
        Username:   "remoteuser",
        Password:   "passphraseforauthkey",
        PrivateKey: "/home/username/.ssh/id_eddsa",
    }
    mysqldrv.New(dialer).RegisterDial("ssh+tcp")

    dsn := "dbuser:dbpassword@ssh+tcp(localhost:3306)/dbname"
    mysqlDb, err := sql.Open("mysql", dsn)
    if err != nil {
        fmt.Fprintln(os.Stderr, err)
        return
    }
    db := kraSql.NewDB(
        sqldblogger.OpenDriver(
            dsn,
            mysqlDb.Driver(),
            zerologadapter.New(zerolog.New(os.Stderr)),
            sqldblogger.WithMinimumLevel(sqldblogger.LevelDebug),
        ),
        kraSql.NewCore(kra.NewCore(kra.MySQL)),
    )
    defer dialer.Close()
    defer db.Close()

    rs, err := db.Query(context.TODO(), "SELECT id, name FROM example ORDER BY id")
    if err != nil {
        fmt.Fprintln(os.Stderr, err)
        return
    }
    rows := rs.Rows()
    for rows.Next() {
        rec := struct {
            Id   int64  `db:"id"`
            Name string `db:"name"`
        }{}
        if err := kraSql.NewRows(kraSql.NewCore(kra.NewCore(kra.MySQL)), rows).Scan(&rec); err != nil {
            fmt.Fprintln(os.Stderr, err)
            break
        }
        fmt.Printf("%#v\n", rec)
    }
    rows.Close()
}

クエリ結果の取り出しが若干まどろこしいが,これは kra/sql.Rows には何故か Next() メソッドがないため2。 しょうがないので 標準の sql.Rows を取り出して,そちらの Next() メソッドで回している。

これでちゃんと動いてログも取れているのを確認できた。

トランザクション制御

トランザクション制御用に以下のような関数を用意する。

func Transaction(ctx context.Context, db *kraSql.DB, opts *sql.TxOptions, fn func(tx *kraSql.Tx) error) error {
    tx, err := db.BeginTx(ctx, opts)
    if err != nil {
        return errs.Wrap(err)
    }
    defer func() {
        if v := recover(); v != nil {
            _ = tx.Rollback()
            panic(v)
        }
    }()

    if err := fn(tx); err != nil {
        if rErr := tx.Rollback(); rErr != nil {
            return errs.Wrap(rErr, errs.WithCause(err))
        }
        return errs.Wrap(err)
    }

    if err := tx.Commit(); err != nil {
        return errs.Wrap(err)
    }
    return nil
}

なお,エラーハンドリングには自作の github.com/goark/errs パッケージを使っている。 zerolog と組み合わせてエラーを構造化してログに吐けるのが利点。

実際にトランザクション処理を行う場合は,たとえば

// logger := zerolog.New(os.Stderr)
// ctx := context.TODO()
values := struct {
    Id   int64  `db:"id"`
    Name string `db:"name"`
}{
    Id:   100,
    Name: "Alice",
}
if err := Transaction(ctx, db, &sql.TxOptions{}, func(tx *kraSql.Tx) error {
    stmt, err := tx.Prepare(ctx, "INSERT INTO example(id,name) VALUES (:id,:name)")
    if err != nil {
        return errs.Wrap(err)
    }
    defer stmt.Close()

    res, err := stmt.Exec(ctx, &values)
    if err != nil {
        return errs.Wrap(err)
    }
    count, err := res.RowsAffected()
    if err != nil {
        return errs.Wrap(err)
    }
    logger.Info().Int64("affected", count).Send()
    return nil
}); err != nil {
    logger.Error().Interface("error", err).Send()
    ...
}

てな感じに書ける。 こういうのが kra にあるとめがっさ便利なんだけどねぇ(それを言い出すとパッケージがどんどん膨れてしまうのだがw)。

LOAD DATA INFILE 文で大量のデータを突っ込む

さて,いよいよ MySQL の テーブルに大量のデータを突っ込むのだが, INSERT 文でちまちまやってたら日が暮れてしまうので(実際に試して日が暮れた) LOAD DATA INFILE 文を使うことにする。

こんな感じの SQL 文。

LOAD DATA LOCAL
  INFILE 'input.file'
  INTO TABLE exsample_table
  CHARACTER SET utf8mb4
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
  (
    field1,
    field2,
    ...
  )

これでローカルにある input.file ファイルの内容をリモートの MySQL の exsample_table テーブルに送り込める。 (field1, field2, ...) の並びと入力ファイルの要素の並びが同じであることが前提。 またサーバ側の MySQL サービスが --local_infile オプション付きで起動されていること。

CHARACTER SET 句はファイルの文字エンコードディングがDBサービスのデフォルトと異なる場合に設定する。 FIELDS 句および LINES を省略した場合のデフォルト値はこうなっているそうな。

FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'
LINES TERMINATED BY '\n' STARTING BY ''

いわゆる TSV (Tab Separated Value) 形式のレコードだね。 これ以外の形式なら明示的に設定する必要がある。

で,これを github.com/go-sql-driver/mysql パッケージで実装するには,3つのステップが必要。

ひとつ目はデータ読み込みハンドラを登録する。 こんな感じ3

file, err := os.Open("input.file")
if err != nil {
    return err
}
defer file.Close()

mysql.RegisterReaderHandler("data", func() io.Reader {
    return file
})

次に実際の SQL 文を発行する。

// logger := zerolog.New(os.Stderr)
// ctx := context.TODO()
if err := Transaction(ctx, db, &sql.TxOptions{}, func(tx *kraSql.Tx) error {
    res, err := tx.Exec(ctx, `LOAD DATA LOCAL INFILE 'Reader::data' INTO TABLE exsample_table (field1, field2, ...)`)
    if err != nil {
        return errs.Wrap(err)
    }
    count, err := res.RowsAffected()
    if err != nil {
        return errs.Wrap(err)
    }
    logger.Info().Int64("affected", count).Send()
    return nil
}); err != nil {
    logger.Error().Interface("error", err).Send()
    ...
}

ファイルを指定する部分に先ほど登録したハンドラの名前を使って 'Reader::data' と指定する。 爆速でした。

ちなみに,うっかり tx.Prepare() で前準備しようとすると「そんな構文はサポートしてない」(←超意訳)と怒られる。 PREPARE で対応している構文は以下のページが参考になる。

最後に mysql.DeregisterReaderHandler() 関数で登録を解除する。 後始末はきちんとね。

ブックマーク

参考図書

photo
プログラミング言語Go
アラン・ドノバン (著), ブライアン・カーニハン (著), 柴田芳樹 (著)
丸善出版 2016-06-20 (Release 2021-07-13)
Kindle版
B099928SJD (ASIN)
評価     

Kindle 版出た! 一部内容が古びてしまったが,この本は Go 言語の教科書と言ってもいいだろう。感想はこちら

reviewed by Spiegel on 2021-05-22 (powered by PA-APIv5)

photo
デベロッパーゴースーパーゴラン Tシャツ
Geek Go Super Golang Tees
ウェア&シューズ
B09C2XBC2F (ASIN)
評価     

ついカッとなってポチった。反省はしない

reviewed by Spiegel on 2022-04-10 (powered by PA-APIv5)


  1. SQL はひとつの独立した言語で(チューリング完全),宣言型プログラミング言語と考えるのが分かりやすい(異論は認めるw)。宣言型で分かりやすいのは正規表現だろう(関数型言語には宣言型が多い。 Lisp とか Haskell とか)。特に Go は宣言型の言語とはあまり相性がよくない。たとえば,正規表現の(構文解析やコンパイラではなく)ビルダを作ろうと考える人は少ないだろう(労力に見合わない)。どの言語・フレームワークでも同じことだが ORM やクエリビルダを使って頑張って抽象化や隠蔽をしても上手くマッチしない局面が多く,結局は「ガチの SQL でいいぢゃん。 PREPARE 構文で事前準備して変数部分はプレースホルダ経由で渡せば安全は確保される」となる。そういう意味じゃ Go 標準の database/sql パッケージは,かなり妥当な割り切りをしてると思う。まぁ,後方互換性を保つためにちょっとアレな感じになっているのは否めないけど(笑) ↩︎

  2. github.com/jackc/pgx 専用の kra/pgx.Rows にはちゃんと Next() メソッドが付いている。提供されているメソッドが微妙に違う理由はよく分からないが,ソースコードを眺めるに,何となく意図的にそうなってる気がする。 ↩︎

  3. 実は mysql.RegisterLocalFile() 関数を使えば直接ファイルパスを登録することができる。ハンドラ登録で io.Reader interface 型で渡すほうが応用が効きやすいので,今回はこちら。 ↩︎