Go言語でAWSのサービスを使ってみる ~SQS・DynamoDBを試す~

gopher_aws-sqs-dynamodb

この記事はAmazon Web Services Advent Calendar 2016の18日目の記事です。
昨日の記事はryo-yamaokaさんの自作ツールネタ(の予定) でした。
 

この記事では、Go言語を使ってaws-sdk-goの基本的な使い方と、
SQS・DynamoDBの簡単な使い方を解説していきます。
 

はじめに

この記事の対象者は↓のような人々です。
 

  • Go言語の基本的な文法が分かる人
  • AWSの各サービスをawsコマンドやその他の言語で触ったことがある人
  • Go言語でAWSのサービスを使ってみたい人

 

aws-sdk-go

GoでAWSのサービスを使うにはaws/aws-sdk-goという公式のSDKを使います。
 

利用可能な各サービスは以下に列挙されています。

各サービスのディレクトリの中には、 example_test.go という名前のファイルがあります。
これはサービスごとの主要なAPIの使い方が記載されたexampleファイルとなっていて、
この中を見れば基本的な使い方は分かると思います。
 

各サービスの初期化処理は、ほぼ同様の手順で以下のようになります。
 

import (
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/[サービス名]"
)

func main(){
    // (1) セッションの作成(認証はここで行う)
    sess, err := session.NewSession()
    if err != nil {
        panic(err)
    }

    // サービス用クライアントの初期化(セッションを渡す)
    svc := [サービス名].New(sess)

    // APIのパラメータには API名+Input という構造体を使う
    params := [サービス名].GetHogeInput{
        Param1: aws.Int64(1),       // (2) パラメータをポインタとして渡す
        Param2: aws.String("hoge"), // (2) パラメータをポインタとして渡す
    }

    // APIの結果のレスポンスとエラーを受け取る
    resp, err := svc.GetHoge(params)

}

(※ 簡単な例示のためカジュアルにpanicしてたりエラーハンドリングが適切じゃなかったりします。実コードでは使わないようにしてください。)

 

(1) ではセッションと呼ばれるオブジェクトを作成しています。
次の項目で詳しく説明します。
 

(2) では GetHogeInput{} というパラメータ構造体を作っており、
そのパラメータは全てポインタとして渡しています。
aws.Int64()aws.String() は単に値のポインタを返却するだけの関数なので、
この関数を使わずに &value のような形でポインタを指定しても大丈夫です。
 

session.Sessionの作成

セッションの中には認証設定(クレデンシャル情報)やリクエストハンドラが含まれていて、
こいつを各サービスのコンストラクタに渡し、各サービス用のクライアントを作成します。
上の例では session.NewSession() の引数は空ですが、引数に "github.com/aws/aws-sdk-go/aws"*aws.Config{} を渡すことで、各種設定をすることができます。
(引数は可変で複数の *aws.Config{} を使うことができますが、後側の有効な設定でどんどん上書きされていくようです。

 

空のNewSession

// 空の引数でセッションを作成
sess, err := session.NewSession()

 

この例のように *aws.Config{} を一切使わずにセッションを作成すると、以下のような挙動になります。
 

  • 環境変数からクレデンシャル情報を取得
  • Shared credentialファイルからクレデンシャル情報を取得

他のAWSのSDKと同様、使える環境変数は以下の通りです。
 

環境変数 項目 優先 備考
AWS_ACCESS_KEY_ID アクセスキーID
AWS_ACCESS_KEY アクセスキーID ×
AWS_SECRET_ACCESS_KEY シークレットアクセスキー
AWS_SECRET_KEY シークレットアクセスキー ×
AWS_SESSION_TOKEN セッショントークン
AWS_REGION リージョン
AWS_DEFAULT_REGION リージョン × AWS_SDK_LOAD_CONFIG が有効のときのみ
AWS_PROFILE プロファイル
AWS_DEFAULT_PROFILE プロファイル × AWS_SDK_LOAD_CONFIG が有効のときのみ
AWS_SHARED_CREDENTIALS_FILE Shared credentialファイルのパス デフォルトは $HOME/.aws/credentials
AWS_CONFIG_FILE configファイルのパス AWS_SDK_LOAD_CONFIG が有効のときのみ, デフォルトは $HOME/.aws/config
AWS_SDK_LOAD_CONFIG 有効にするには 1true っぽい文字列を使う

 

異なる「環境変数」にて同一の「項目」が設定されている場合は、
「優先」がのものが優先されます。

 

指定値でのNewSession

アクセスキーIDやシークレットアクセスキーをコード中で直接指定したい場合は、
以下のように credentials.NewStaticCredentials()*aws.Config{} を使います。

 

import (
    "github.com/aws/aws-sdk-go/aws/credentials"
)

func main() {
    // クレデンシャルの作成
    cred := credentials.NewStaticCredentials("[アクセスキーID]", "[シークレットアクセスキー]", "") // 最後の引数は[セッショントークン]

    // クレデンシャルとリージョンをセットしたコンフィグの作成
    region := "ap-northeast-1"
    conf := &aws.Config{
        Credentials: cred,
        Region:      &region,
    }

    // セッションの作成
    sess, err := session.NewSession(conf)
    if err != nil {
        panic(err)
    }
}

 

サービスエンドポイントの指定

通常は指定したサービス名やリージョンによって自動的にエンドポイントURLが決まりますが、
これを任意のURLを使うように指定することができます。

 

import (
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
)

func main() {
    // エンドポイントをセットしたコンフィグの作成
    ep := "http://localhost:8080"
    conf := &aws.Config{
        Endpoint:      &ep,
    }

    // セッションの作成
    sess, err := session.NewSession(conf)
    if err != nil {
        panic(err)
    }
}

 

こうすると、テスト時にDynamoDBLocaliain/fake_sqsといったダミーのサービスを使うことができて便利です。

 

クレデンシャルの優先順位

NewSession を使用した場合の優先順位は

  • 引数のaws.Config{}の値 > 環境変数 > Shared credentialファイル

となっているようです。

 

もし特定の方法を使いたい場合は以下の関数の戻り値を、aws.Config{}.Credentials へセットすることで指定できます。

 

  • 指定値
    • credentials.NewStaticCredentials(“”, “”, “”)
  • 環境変数
    • credentials.NewEnvCredentials()
  • Shared credential
    • credentials.NewSharedCredentials(, )

 

各サービスの使い方

ここからは各AWSサービスの説明をしていきます。
 

SQS

まずはSQSから使ってみます。
 

SQSクライアントは以下のようにして作成します。
 

import (
    "github.com/aws/aws-sdk-go/service/sqs"
)

// (中略)

svc := sqs.New(sess) // `sess`はセッション

このクライアントの各メソッドに対して専用のパラメータ引数を渡すことでSQSの操作を行うことが出来ます。
ここからは、

  • キューの作成(CreateQueue)
  • キューの取得(GetQueueUrl)
  • キューの削除(DeleteQueue)
  • メッセージの送信(SendMessage)
  • メッセージの受信(ReceiveMessage)
  • メッセージの削除(DeleteMessage)

を試していきます。

SQS キューの作成(CreateQueue)

キューの作成は以下のようにして行います。

import (
    "github.com/aws/aws-sdk-go/service/sqs"
)

// (中略)

func createQueue() {
    // SQSクライアントの作成
    svc := sqs.New(sess)

    // パラメータ
    queueName := "キュー名"
    params := &sqs.CreateQueueInput{
        QueueName: &queueName,
    }

    // CreateQueueの実行
    resp, err := svc.CreateQueue(params)
    if err != nil {
        panic(err)
    }

    // リクエスト成功時、*resp.QueueUrl には SQSキューのURLが入ります
}

svc.CreateQueue() のレスポンスのフィールドにはキューのURLが格納されています。
(不要であればレスポンス自体を _ に入れて捨ててしまいましょう。)
 

SQS キューの取得(GetQueueUrl)

キューURLの取得は以下のようにして行います。
 

import (
    "github.com/aws/aws-sdk-go/service/sqs"
)

// (中略)

func getQueueUrl() {
    // SQSクライアントの作成
    svc := sqs.New(sess)

    // パラメータ
    queueName := "キュー名"
    params := &sqs.GetQueueUrlInput{
        QueueName: &queueName,
    }

    // GetQueueUrlの実行
    resp, err := svc.GetQueueUrl(params)
    if err != nil {
        panic(err)
    }

    // リクエスト成功時、*resp.QueueUrl には SQSキューのURLが入ります
}

取得した QueueUrl はキューの削除やメッセージ操作で使用します。
 

SQS キューの削除(DeleteQueue)

キューの削除は以下のようにして行います。
 

import (
    "github.com/aws/aws-sdk-go/service/sqs"
)

// (中略)

func deleteQueue() {
    // SQSクライアントの作成
    svc := sqs.New(sess)

    // パラメータ
    queueURL := "[キューURL]" // GetQueueUrl等で取得したキューのURLを入れる
    params := &sqs.DeleteQueueInput{
        QueueUrl: &queueURL,
    }

    // DeleteQueueの実行
    _, err := svc.DeleteQueue(params)
    if err != nil {
        panic(err)
    }

    // レスポンスには重要な内容はないため捨てています
}

 

SQS メッセージの送信(SendMessage)

メッセージの送信は以下のようにして行います。

import (
    "github.com/aws/aws-sdk-go/service/sqs"
)

// (中略)

func sendMessage() {
    // SQSクライアントの作成
    svc := sqs.New(sess)

    // パラメータ
    queueURL := "" // GetQueueUrl等で取得したキューのURLを入れる
    body := `メッセージ内容`

    params := &sqs.SendMessageInput{
        QueueUrl:    &queueURL,
        MessageBody: &body,
        // MessageAttributes: map[string]*sqs.MessageAttributeValue{} // メタ情報用のパラメータ
    }

    // SendMessageの実行
    resp, err := svc.SendMessage(params)
    if err != nil {
        panic(err)
    }
}

レスポンスにはMessageIdに加え、MessageBodyやMessageAttributesの内容をMD5化した文字列が含まれており、
正常に送信されたかどうか検証ができるようです。

SQS メッセージの受信(ReceiveMessage)

メッセージの受信は以下のようにして行います。

import (
    "github.com/aws/aws-sdk-go/service/sqs"
)

// (中略)

func receiveMessage() {
    // SQSクライアントの作成
    svc := sqs.New(sess)

    // パラメータ
    queueURL := "" // GetQueueUrl等で取得したキューのURLを入れる
    max := 1
    wait := 10

    params := &sqs.ReceiveMessageInput{
        QueueUrl:    &queueURL,
        MaxNumberOfMessages: &max, // メッセージ最大受信数
        WaitTimeSeconds: &wait,    // ポーリング時間(秒)
    }

    // ReceiveMessageの実行
    resp, err := svc.ReceiveMessage(params)
    if err != nil {
        panic(err)
    }

    // resp.Messages に受信したメッセージリストが格納されます
    // *resp.Messages[i].Body に受信したメッセージ内容が格納されています
}

 

送信に比べて、指定できるパラメータが増えていますね。
 

MaxNumberOfMessages に指定した数値は、resp.Messages の中に入るメッセージ数の最大値になります。
例えば 10 を指定した場合は 0〜10件のメッセージが入る可能性があります。
 

WaitTimeSeconds を設定するとロングポーリングを行うことができます。
指定した秒数だけポーリングし、メッセージの受信を待機することができます。
 

なお、ここで受信したメッセージは削除時にも必要となります。
 

SQS メッセージの削除(DeleteMessage)

メッセージの削除は以下のようにして行います。
 

(そろそろコピペして打つのも疲れてきました…)
 

func deleteMessage() {
    // SQSクライアントの作成
    svc := sqs.New(sess)

    // パラメータ
    queueURL := "[キューURL]" // GetQueueUrl等で取得したキューのURLを入れる
    receiptHandle := "[ReceiptHandle]" // GetMessageで受信したメッセージ内の *resp.Messages[i].ReceiptHandle を入れる

    params := &sqs.DeleteMessageInput{
        QueueUrl:    &queueURL,
        ReceiptHandle: &receiptHandle,
    }

    // DeleteMessageの実行
    _, err := svc.DeleteMessage(params)
    if err != nil {
        panic(err)
    }

    // レスポンスには重要な内容はないため捨てています
}

 

メッセージの削除には、削除したいメッセージのReceiptHandleを含める必要があります。
GetMessageで受信したメッセージのReceiptHandleを使いましょう。
 

SQSのその他のアクション

この他にもキュー一覧のListQueuesや溜まったキューのパージを行うPurgeQueue
複数のメッセージの送信や削除を行う SendMessageBatchDeleteMessageBatch といったAPIがあります。
 

詳しくは公式のドキュメントを参照してください。
 

DynamoDB

DynamoDBクライアントは以下のようにして作成します。
 

import (
    "github.com/aws/aws-sdk-go/aws/service/dynamodb"
)

// (中略)

svc := dynamodb.New(sess) // `sess`はセッション

SQSの場合とほぼ同じですね。
 

このクライアントの各メソッドに対して専用のパラメータ引数を渡すことでDynamoDBの操作を行うことが出来ます。
 

ここからは、

  • テーブルの作成(CreateTable)
  • テーブルの取得(DescribeTable)
  • 項目のPUT(PutItem)
  • 項目の取得(GetItem)

を試していきます。

 

DynamoDB テーブルの作成(CreateTable)

テーブルの作成は以下のようにして行います。(!!)
一時変数が多くなりそうだったので、ポインタ変換関数 aws.String()aws.Int64() を使ってます。

 

import (
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/dynamodb"
)

func main() {
    // DynamoDBクライアントの作成
    svc := dynamodb.New(sess)

    // パラメータ
    params := &dynamodb.CreateTableInput{
        AttributeDefinitions: []*dynamodb.AttributeDefinition{
            {
                AttributeName: aws.String("user_id"), // フィールド名
                AttributeType: aws.String("N"),       // フィールド型 N=number, S=string, B=bool で型を指定
            },
        },
        KeySchema: []*dynamodb.KeySchemaElement{
            {
                AttributeName: aws.String("user_id"), // フィールド名
                KeyType:       aws.String("HASH"),    // HASH=ハッシュキー, RANGE=レンジキー
            },
        },
        ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
            ReadCapacityUnits:  aws.Int64(1), // 読み込みスループット
            WriteCapacityUnits: aws.Int64(1), // 書き込みスループット
        },
        TableName:              aws.String("user_table"), // テーブル名
        // GlobalSecondaryIndexes: []*dynamodb.GlobalSecondaryIndex{}, // GSIの設定
        // LocalSecondaryIndexes:  []*dynamodb.LocalSecondaryIndex{}, // LSIの設定
    }

    // CreateTableの実行
    resp, err := svc.CreateTable(params)
    if err != nil {
        panic(err)
    }

    // resp.TableDescription の中にテーブル定義情報が入っています
}

なんだかいきなり複雑になりましたね。
 

DynamoDBを使ったことがある人にはお馴染みのパラメータかと思いますが、
ここにLocalSecondaryIndexGlobalSecondaryIndexが加わってくると、かなりラビリンスな状態になります。
 

DynamoDB テーブルの情報取得(DescribeTable)

テーブルの情報取得は以下のようにして行います。
 

import (
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/dynamodb"
)

func main() {
    // DynamoDBクライアントの作成
    svc := dynamodb.New(sess)

    // パラメータ
    params := &dynamodb.DescribeTableInput{
        TableName: aws.String("user_table"), // テーブル名
    }

    // DescribeTableの実行
    resp, err := svc.DescribeTable(params)
    if err != nil {
        panic(err)
    }

    // resp.Table の中にテーブル定義情報が入っています
}

 

テーブル作成に比べると断然、簡単ですね。
 

DynamoDB 項目のPUT(PutItem)

項目ってなんやねんかと思うかもしれませんが、
RDBでいうところの行、ドキュメント指向KVSでいうところのドキュメントのことです。

 

import (
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/dynamodb"
)

func main() {
    // DynamoDBクライアントの作成
    svc := dynamodb.New(sess)

    // パラメータ
    params := &dynamodb.PutItemInput{
        TableName: aws.String("user_table"),       // テーブル名
        Item: map[string]*dynamodb.AttributeValue{ // 項目のデータ
            "user_id": {
                N: aws.String("100"),
            },
            "name": {
                S: aws.String("hoge"),
            },
        },
    }

    // PutItemの実行
    resp, err := svc.PutItem(params)
    if err != nil {
        panic(err)
    }
}

 

作成に比べれば良心的ですね。
 

Item にはキー名と値を入れます。値は dynamodb.AttributeValue{} を使います。
型に合わせて値を入れるフィールドが変わります。
例えば上の例のように、NumberならN、StringならSとなります。
 

DynamoDB 項目の取得(GetItem)

項目の取得には以下のようになります。
 

import (
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/dynamodb"
)

func main() {
    // DynamoDBクライアントの作成
    svc := dynamodb.New(sess)

    // パラメータ
    params := &dynamodb.GetItemInput{
        TableName: aws.String("user_table"),       // テーブル名
        Key: map[string]*dynamodb.AttributeValue{ // 項目のデータ
            "user_id": {
                N: aws.String("100"),
            },
        },
    }

    // GetItemの実行
    resp, err := svc.GetItem(params)
    if err != nil {
        panic(err)
    }

    // resp.Item の中に項目データが入っています
}

 

使い方はPutとほぼ一緒ですね。Key にはインデックスのキーを指定します。
レンジキーが存在する場合はKeyのmap内に2つのデータが入ります。
 

resp.Itemmap[string]*dynamodb.AttributeValue となっていて、
resp.Item["user_id"].N, resp.Item["name"].S のような形で値を取り出します。
 

DynamoDBのその他のアクション

ここまででDynamoDBの実力(と辛さ)の10%も引き出せてないわけですが、
項目の一括取得を行うScanQuery、項目の削除を行うDeleteItemInputといったAPIがあります。
 

詳しくは公式のドキュメントを参照してください。
 

ラッパーライブラリ aws-sdk-go-wrapper

DynamoDBを使っていると、N やら S やら B、さらには M といったフィールドに格納されたポインタをいじることになりますが、
我々人類にはまだ早いんじゃないかと思います。
 

そんな時代の混乱を解決するためにラッパーライブラリを作っているので、ついでにここで宣伝させていただきます。(本題)
 

evalphobia/aws-sdk-go-wrapper
 

今のところは、S3やSQS、SNSにDynamoDBといったサービスに対応しています。
 

コンフィグ

公式のSDKの場合はセッションを作成し、サービス用クライアントに渡して作成しますが、
ラッパーでも同様にまずはコンフィグを作成しクライアントを作成します。
 

簡単な例は以下のようになります。
 

import (
    "github.com/evalphobia/aws-sdk-go-wrapper/config"
    "github.com/evalphobia/aws-sdk-go-wrapper/[サービス名]"
)

func main() {
    // コンフィグの作成
    conf := config.Config{
        /* クレデンシャル情報を直接指定する場合は以下を指定 */
        AccessKey: "[アクセスキーID]",
        SecretKey: "[シークレットアクセスキー]",
        Region: "ap-north-east1",

        /* Shared credentialファイルを使う場合は以下を指定 */
        // Filename: "[ファイル名]",
        // Profile:  "[プロファイル名]",

        /* その他のオプション */
        // Endpoint:  "http://localhost:8000", // エンドポイントを指定する場合に使用
        // DefaultPrefix: "dev",   // SQSキューやDynamoDBテーブルに付けるプリフィクス
        // S3ForcePathStyle: true, // S3でバケット指定にパススタイルを使う
    }

    cli, err := [サービス名].New(conf)
    if err != nil {
        panic(err)
    }
    // 各サービスの処理が続く...
}

ラッパーと公式ではクレデンシャルの優先順位が異なっています。
 

  • 公式: 直接指定 > 環境変数 > Shared credentialファイル
  • ラッパー: 環境変数 > 直接指定 > Shared credentialファイル

コンパイル済みのバイナリでも容易に動作を変えられるように、環境変数が一番優先されるようにしています。
 

SQS

公式の例と同様に、まずはSQSを使ってみます。
 

SQS キューの作成(CreateQueue)

キューの作成は以下のようにして行います。
 

import (
    "github.com/evalphobia/aws-sdk-go-wrapper/sqs"
)

// (中略)

func createQueue() {
    // SQSクライアントの作成
    svc, err := sqs.New(conf)
    if err != nil {
        panic(err)
    }

    // 既に作成済みなら何もせずに終了
    ok, err := svc.IsExistQueue("[キュー名]")
    if ok {
        return
    }

    // CreateQueueの実行
    err = svc.CreateQueueWithName("[キュー名]")
    if err != nil {
        panic(err)
    }
}

 

SQS キューの取得(GetQueue)

キューURLの取得は以下のようにして行います。
 

import (
    "github.com/evalphobia/aws-sdk-go-wrapper/sqs"
)

func getQueue() {
    // SQSクライアントの作成
    svc, err := sqs.New(conf)
    if err != nil {
        panic(err)
    }


    // キューの取得
    queue, err := svc.GetQueue("[キュー名]")
    if err != nil {
        panic(err)
    }
}

 

取得した queue はSQSキューを表すオブジェクトになります。
メッセージの操作で使います。
 

SQS キューの削除(DeleteQueue)

キューの削除は以下のようにして行います。
 

import (
    "github.com/evalphobia/aws-sdk-go-wrapper/sqs"
)

func deleteQueue() {
    // SQSクライアントの作成
    svc, err := sqs.New(conf)
    if err != nil {
        panic(err)
    }


    // キューの削除
    err = svc.DeleteQueue("[キュー名]")
    if err != nil {
        panic(err)
    }
}

 

SQS メッセージの送信(SendMessage)

メッセージの送信は以下のようにして行います。
 

import (
    "github.com/evalphobia/aws-sdk-go-wrapper/sqs"
)

func sendMessage() {
    // SQSクライアントの作成
    svc, err := sqs.New(conf)
    if err != nil {
        panic(err)
    }

    // キューの取得
    queue, err = svc.GetQueue("")
    if err != nil {
        panic(err)
    }

    // スプールへメッセージを追加 (1)
    queue.AddMessage("メッセージその1")

    // スプールへメッセージを追加 (2)
    mapData := map[string]interface{}{
        "key": "メッセージその2",
    }
    queue.AddMessageMap(mapData)

    // スプールへメッセージを追加 (3)
    structData := struct{
        Key string `json:"key"`
    }{
        Key: "メッセージその3",
    }
    queue.AddMessageJSONMarshal(structData)

    // スプール内のメッセージを全て送信する
    err = queue.Send()
    if err != nil {
        panic(err)
    }
}

 

メッセージの送信には、queue.AddMessage()queue.Send() を使用します。
json.Marshal() でJSON文字列にしたい場合は、queue.AddMessageMap()queue.AddMessageJSONMarshal() を使ってください。
 

SQS メッセージの受信(ReceiveMessage)

メッセージの受信は以下のようにして行います。
 

import (
    "fmt"
    "github.com/evalphobia/aws-sdk-go-wrapper/sqs"
)

func sendMessage() {
    // SQSクライアントの作成
    svc, err := sqs.New(conf)
    if err != nil {
        panic(err)
    }

    // キューの取得
    queue, err = svc.GetQueue("[キュー名]")
    if err != nil {
        panic(err)
    }

    // メッセージを1件だけ取得
    msg, err := queue.FetchOne()
    if err != nil {
        panic(err)
    }
    fmt.Println(msg.Body()) // メッセージ内容を表示

    // メッセージを最大10件取得
    msgList, err := queue.Fetch(10)
    if err != nil {
        panic(err)
    }
    for _, m := range msgList {
        fmt.Println(m.Body()) // メッセージ内容を表示
    }

    // Fetch実行時に自動削除をするオプションを設定
    queue.AutoDel(true) 

    // メッセージを最大10件取得(レスポンスはメッセージ内容のスライス)
    bodyList := queue.FetchBody(10)
    for _, body := range bodyList {
        fmt.Println(bodyList) // メッセージ内容を表示
    }
}

 

基本的な使い方は queue.Fetch(num)numに件数を入れて取得します。
 

FetchBody(num)FetchBodyOne()を使うと、メッセージ内容のみ取得できますが、
このままだと取り出したメッセージの削除ができないため、事前に queue.AutoDel(true) を指定し、
取得した際に自動削除されるようにしておきます。
 

SQS メッセージの削除(DeleteMessage)

メッセージの削除は以下のようにして行います。
 

import (
    "github.com/evalphobia/aws-sdk-go-wrapper/sqs"
)

func sendMessage() {
    // SQSクライアントの作成
    svc, err := sqs.New(conf)
    if err != nil {
        panic(err)
    }

    // キューの取得
    queue, err = svc.GetQueue("[キュー名]")
    if err != nil {
        panic(err)
    }

    // メッセージを最大10件取得
    msgList, err := queue.Fetch(10)
    if err != nil {
        panic(err)
    }

    // 取得したメッセージの削除
    for _, m := range msgList {
        queue.DeleteMessage(m)
    }
}

 

queue.DeleteMessage() にメッセージを渡すと削除できます。
queue.AutoDel(true) を指定しておくと、queue.Fetch() 実行時に自動で削除されます)
 

DynamoDB

公式の例と同様、DynamoDBも使ってみましょう。
 

DynamoDB テーブルの作成

テーブルの作成は以下のようにして行います。
 

import (
    "github.com/evalphobia/aws-sdk-go-wrapper/dynamodb"
)

func createTable() {
    // DynamoDBクライアントの作成
    svc, err := dynamodb.New(conf)
    if err != nil {
        panic(err)
    }

    // String型のidというハッシュキーを持つテーブル定義オブジェクトを作成
    design := dynamodb.NewTableDesignWithHashKeyS("table_name", "id")

    //テーブル定義を追加
    design.AddRangeKeyN("time")                        // Number型のtimeというレンジキーを追加
    design.AddLSIN("age-index", "age")                 // Number型のageというレンジキーを持つ、age-indexというローカルセカンダリインデックスを追加
    design.AddGSIN("update-index", "updated_at")       // Number型のupdated_atというハッシュキーを持つ、update-indexというグローバルセカンダリインデックスを追加
    design.AddGSISN("country-index", "country", "age") // String型のcountryというハッシュキーとNumber型のageというレンジキーを持つ、country-indexというグローバルセカンダリインデックスを追加

    err := svc.CreateTable(design)
    if err != nil {
        panic(err)
    }
}

 

この例では、

  • テーブル名: table_name
  • インデックス:
    • ハッシュキー: String型 id
    • レンジキー: Number型 time
  • ローカルセカンダリインデックス: age-index
    • レンジキー: Number型 time
  • グローバルセカンダリインデックス: update-index
    • ハッシュキー: Number型 updated_at
  • グローバルセカンダリインデックス: country-index
    • ハッシュキー: String型 country
    • レンジキー: Number型 age

というテーブルを作成しています。
 

DynamoDB テーブルの情報取得(DescribeTable)

テーブルの情報取得は以下のようにして行います。
 

import (
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/dynamodb"
)

func main() {
    // DynamoDBクライアントの作成
    svc := dynamodb.New(sess)

    // テーブルの取得
    table, err := svc.GetTable("table_name")
    if err != nil {
        panic(err)
    }
}

 

取得したテーブルのオブジェクトは項目の追加や取得に使用します。
 

DynamoDB 項目のPUT(PutItem)

項目のPUTは以下のようにして行います。
 

    // DynamoDBクライアントの作成
    svc := dynamodb.New(sess)

    // テーブルの取得
    table, err := svc.GetTable("table_name")
    if err != nil {
        panic(err)
    }

    // 項目オブジェクトの作成
    item := dynamodb.NewPutItem()
    item.AddAttribute("id", "A001")
    item.AddAttribute("time", time.Now().Unix())
    item.AddAttribute("age", 18)
    item.AddAttribute("country", "jp")
    item.AddAttribute("updated_at", time.Now().Unix())
    item.AddAttribute("status", 1)

    // スプールに項目を追加
    table.AddItem(item)

    // スプール内の項目を全てPUTする
    err = table.Put()
    if err != nil {
        panic(err)
    }
}

 

dynamodb.NewPutItem() で項目オブジェクトを作成し、table.AddItem() で項目を書き込みスプールへ追加します。
そしてtable.Put()で書き込みスプール内の項目をPutします。
 

なお、svc.PutAll() を使うと、全てのテーブルの書き込みスプール内の項目をPutします。
 

DynamoDB 項目の取得(GetItem)

項目の取得には以下のようになります。
 

import (
    "github.com/aws/aws-sdk-go/service/dynamodb"
)

func main() {
    // DynamoDBクライアントの作成
    svc := dynamodb.New(sess)

    // テーブルの取得
    table, err := svc.GetTable("table_name")
    if err != nil {
        panic(err)
    }

    // 項目の取得
    id := "A001"
    time := 1
    item, err := table.GetOne(id, time)
    if err != nil {
        panic(err)
    }
}

 

table.GetOne(hashKey, rangeKey) として、項目を取得します。
ハッシュキーしかないテーブルの場合、rangeKeyは省略できます。
 

取得した項目はmap[string]interface{}となっています。
 

DynamoDB 項目の取得(QueryとScan)

ここからようやく、DynamoDBの便利APIのQueryとScanを使っていきます。
 

import (
    "github.com/evalphobia/aws-sdk-go-wrapper/dynamodb"
)

func main() {
    // DynamoDBクライアントの作成
    svc, err := dynamodb.New(conf)
    if err != nil {
        panic(err)
    }

    // テーブルの取得
    table, err := svc.GetTable("MyDynamoTable")
    if err != nil {
        panic(err)
    }

    // 項目をQueryで取得
    cond := table.NewConditionList()
    cond.AndEQ("country", "jp")
    cond.FilterLT("age", 20)
    cond.SetLimit(100)
    result, err := table.Query(cond) // Queryの実行
    if err != nil {
        panic(err)
    }

    // 結果を格納する構造体
    type User struct {
        ID     int64 `dynamodb:"id"`
        Age    int   `dynamodb:"age"`
        Status int   `dynamodb:"status"`
    }

    var list []*User
    err = result.Unmarshal(&list) // 取得結果をlistへマッピング
    if err != nil {
        panic(err)
    }


    // 項目をScanで取得
    cond = table.NewConditionList()
    cond.SetLimit(1000)
    cond.FilterEQ("status", 2)
    result, err = table.ScanWithCondition(cond) //Scanの実行
    if err != nil {
        panic(err)
    }

    data := result.ToSliceMap() // 取得結果を[]map[string]interface{}へ変換

    // 項目をScanで取得(前回取得した箇所の次から取得)
    cond.SetStartKey(result.LastEvaluatedKey) // 開始位置を指定
    result, err = table.ScanWithCondition(cond) //Scanの実行
    if err != nil {
        panic(err)
    }

    data = append(data, result.ToSliceMap()...) // 2回目の取得結果を1回目の結果へ追加
}

 

Queryはインデックスを使い複数の項目を一気に取得できるので、よく使われると思います。
Scanはスループットを消費するため使う機会は限られてくるかもしれませんが、集計・バッチ用途等で使うことがあるかもしれません。
 

まとめ

いかがでしょうか。
使い方は何となくおわかりいただけたかと思います。
 

公式の場合は、セッションを作成し、各サービスのクライアントを作成します。
そしてクライアントの使用したいAPIに対してAPI名+Inputという構造体のパラメータを渡します。
 

ラッパーの場合はコンフィグを作成し、各サービスのクライアントを作成します。
 

ラッパーは弊社の用途に合わせて作ってあるため、いくつか足りないAPIがありますが、
組み込み型のポインタや煩雑なパラメータを直接扱わなくて済むため、実装が楽になると思います。
もし用途に合うようであれば、使ってみてください。
 

(そして足りない機能・サービスのプルリクエストをもらえると助かります!)
 

  • このエントリーをはてなブックマークに追加

エウレカでは、一緒に働いていただける方を絶賛募集中です。募集中の職種はこちらからご確認ください!皆様のエントリーをお待ちしております!

Recommend

pairsの検索にElasticsearchを使ってみた。

pairsでKotlinを採用した5つの理由