ᗡocuments

LevelDBのメモリテーブル

メモリテーブル

LevelDBに対してデータをPUTすると、データはメモリ上でデータを管理するメモリテーブルへ書き込まれます。
その後メモリテーブルが一定の容量に達するとテーブルデータとしてファイルに書き込まれます。
ここではメモリーテーブルの仕組みに関して記述を行っていきます。

メモリアロケータ

メモリテーブルではメモリのアロケーションを効率的に行うために、独自のメモリアロケータを利用しています。メモリアロケータはArenaクラス(/util/arena.*)として実装されており、4KBのメモリ領域を最初に確保しておき、必要に応じてその領域を細切れにして割り当てていくといった一般的な仕組みとなっています。

メモリアロケータ

スキップリスト

メモリテーブルでのデータ構造はスキップリストを利用しています。スキップリストを利用することで、データによるアクセスがあった際の検索の効率化と、メモリテーブルデータをファイルに書き込む際のキーによるソート処理の効率化を担うことが出来ます。

スキップリスト

格納データ

前述のスキップリストの要素に対して、PUTで指定したデータを格納します。
以下に内部的に格納されるデータの構造を示します。

格納データの構造

キーとバリュー値の長さ情報はそれぞれ圧縮された状態で格納されます。またデータのメタ情報としてシーケンス番号とデータタイプの情報が格納されます。
シーケンス番号はデータがLevelDBに格納されるたびにインクリメントされながら割り当てられる番号です。この番号によってトランザクションの管理を行います。
またLevelDBにおいてデータの削除は論理削除によって実装されているため、DELETE操作が行われると対象のキーの削除データが格納されます。そのためPUT操作かDELETE操作でのデータかを判別する為にデータタイプを格納します。

LevelDBでの数値データの永続化

LevelDBではメタデータとなる数値データを永続化する際に、圧縮を行わない場合と、圧縮を行う場合があります。ここでは、それぞれ場合について説明します。
またLevelDBでは32bitと64bit幅の数値データを扱います(ここではunsignedのデータのみを考慮します)。

圧縮なしの場合

圧縮を行わない場合には32bitまたは64bitをそのまま書込みますが、ポータビリティを考慮して、リトルエンディアンデンでの書込みを行います。

圧縮ありの場合

数値データを格納する際に、小さい数値である場合が多いことを考慮して圧縮を行ってデータを格納する場合があります。
以下に32bitの数値データの圧縮方法を示します。

uint32_tデータの圧縮

圧縮は32bitデータを7bitずつに分けて、7bitごとに1bitの制御フラグを設定する事で行います。数値の値によって必要な領域のみを用いることで圧縮を行います。制御フラグが0の場合はデータの終端を、1の場合は後続の8bitもデータ領域であることを示しています。
64bitの場合も同じ要領で圧縮を行うことができます。
この圧縮をキー長などの比較的大きくならないであろうメタデータに対して行うことで、利用領域の縮小を図っています。

LevelDBのキャッシュ実装

LevelDBではキーバリューデータを内部的に効果的に扱うキャッシュ機構が実装されています。
LevelDBのキャッシュは”/include/leveldb/cache.h”でインターフェースが定義されており、実装は”/util/cache.cc”でShardedLRUCacheクラスとして行われています。

キャッシュ処理の分散

ShardedLRUCacheはキャッシュ操作を排他的に行うためのロック処理の効率化を行うために、実際のキャッシュを行うLRUCacheインスタンスを複数管理し、ハッシュ値によって、処理を行うLRUCacheオブジェクトを決定します。

キャッシュ処理の分散

ShardedLRUCacheがデータ操作のためにキー情報を受け取ると、キーからハッシュ値を算出します。
ShardedLRUCacheが管理を行うLRUCacheインスタンスは16要素の配列として管理されているので、ハッシュ値の上位4bitを用いて処理を行うLRUCacheインスタンスを決定します。
このように排他ロックを伴うハッシュ管理を複数のインスタンスへ分散することによって排他制御によるボトルネックを軽減しています。

キャッシュ管理

LevelDBのキャッシュ機構は、一般的なチェイン法によるハッシュテーブルと、LRUによるリソース管理がLRUCacheクラスで実装されています。

キャッシュ要素

まずキャッシュを行うデータ要素はLRUHandle構造体によって行われます。以下にLRUHandle構造体の定義を記述します。

struct LRUHandle {
void* value;
void (*deleter)(const Slice&, void* value);
LRUHandle* next_hash;
LRUHandle* next;
LRUHandle* prev;
size_t charge; // TODO(opt): Only allow uint32_t?
size_t key_length;
bool in_cache; // Whether entry is in the cache.
uint32_t refs; // References, including cache reference, if present.
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
char key_data[1]; // Beginning of key
};

LRUHandle構造体のkey_dataでキャッシュデータのキー、valueでバリューへの参照をそれぞれ管理します。
その他の要素はハッシュテーブルやLRUの管理のために用いられます。

ハッシュテーブル

LevelDBではキャッシュされたデータを高速に検索するために、キャッシュデータをチェイン法によるキャッシュテーブルで管理しています。キャッシュテーブルの実装はHandleTableクラスで行われています。
またキャッシュテーブルのサイズは固定ではなく、格納要素がキャッシュテーブルのサイズより大きくなる度に、キャッシュテーブルのサイズを2倍に拡張するようになっています(キャッシュテーブル位置をビット演算で検出するためにサイズは2の乗数とします)。

ハッシュテーブルでのキャッシュ要素の管理

LRUHandle構造体のnext_hashメンバを用いて、ハッシュリストの管理しています。

LRUによるリソース管理

キャッシュ機構は、特定のメモリ領域内でデータを保持するようになっています(始めに利用するメモリ領域を指定します)。そのためメモリ領域以上のデータをキャッシュを行おうとすると、他のデータをキャッシュから離脱させます。この時にLRUによって離脱するデータを管理するようになっています。
LevelDBでは以下の図に示す2つの双方向リンクリストによってリソースを管理します。

LRUによるリソース管理

まず1つ目はlru_メンバによって管理されるキャッシュにあって参照がされていない要素のリストで、2つ目はキャッシュにありユーザに参照されている要素のin_user_リストです。
任意の要素はユーザによる参照が行われるとlru_リストから、in_user_リストの末尾へと移動します(LRUHandle構造体のrefsで参照すうを管理)。その後、ユーザによる参照が無くなるとin_user_リストからlru_リストの末尾へ移動されます。

以上のことからlru_リストの先頭要素ほど、参照され無くなってから時間が経過している要素となるため、キャッシュのメモリ容量以上のデータの挿入が発生すると、キャッシュから離脱させる要素の対象となります。

ちなみに要素の挿入時と要素に対する検索が行われたときに、参照がされたと判断されます。

このようにLevelDBではキャッシュテーブルとLRUアルゴリズムによってキャッシュ機構を実現しています。

Kafkaログ機構(1)

ここではBrokerが受けったメッセージをディスクへ書き込んで管理するためのログ機構について説明を行っていきます。
ソースコードは”src/main/scala/kafka/log/”ディレクトリ内のファイルに対応します。

ログの格納

メッセージをまとめてログファイルとしてディスクに書き出すディレクトリは、プロパティ”log.dirs”(もしくは”log.dir”)で指定したディレクトリ(ログディレクトリ)以下の”<トピック名>-<パーティション>“ディレクトリ内に格納されます。

新規でトピックを作成した場合などにログファイルを新規で作成しますが、ログディレクトリを複数指定していた場合には、最も作成されている数(ログディレクトリ内のディレクトリ数)が少ないログディレクトリを作成対象として選択します。

ログセグメント

ログは1つのファイルに逐次書き込まれていくのではなく、複数のファイルに分割されながら書き込まれていきます。この分割の単位をログセグメントと呼びます。
以下の図のように、受け取ったメッセージをログとして書き込んでいきます。

ログセグメント

ログセグメントの構成

さてこのログセグメントは3つのファイルから構成されます。
1つはメッセージを格納していくログファイル、さらにメッセージの読み込み時に参照するためのオフセットインデックス(OffsetIndex)とタイムインデックス(TimeIndex)です。

ログセグメントの構成

ログ機構では書き込みの単位をメッセージ処理で説明したMessageSetというメッセージの集合として受け取ります。このMessageSetを書込みデータとして受け取るとログファイルの末尾への書込みが行われます。
一定量(プロパティ”log.index.interval.bytes”で指定したバイト数)ごとの書込みが行われると現在のオフセットと、そのオフセットに対応するメッセージがログファイルのどこに記述されているかの書込み位置をオフセットインデックスへ格納します。また現在のタイムスタンプとオフセットの情報をタイムインデックスへ格納します。
ただしインデックスファイルへ書き込まれるオフセットの値はオフセットのそのものの値ではなく、オフセットと基準オフセットと呼ばれるログファイルに最初に書かれたオフセットとの差分の値が書き込まれることになります。

インデックスファイルを作成することによって、任意のオフセットやタイムスタンプのメッセージを取得する処理が高速に行えるようになります。
インデックスファイルが存在しない場合には、ログファイルの先頭からシーケンシャルスキャンによって対応するオフセット(もしくはタイムスタンプ)のメッセージを捜さなければなりません。
しかしインデックスファイルがあると、まずインデックスファイルの内容から2分探索(オフセットは短調増加していくので2分探索が可能になります)によって、オフセットに対応するメッセージがメッセージファイル内のどこに位置するかがおおよそでわかるため、シーケンシャルスキャンを行わなけばならない範囲を限定することが出来ます。

このようにインデックスファイルを作成することによって読み込み処理が効率的に行われるようになります。インデックスファイルへの書き込みの閾値であるプロパティ”log.index.interval.bytes”を小さくすればより効率的に読み込みが行えますが、インデックスファイルが大きくなったり、書込み処理が重くなったりとトレードオフがあることに注意する必要があります。

Kafkaメッセージ処理(2)

Kafkaメッセージ処理(1)では、メッセージ構造から非圧縮のメッセージセットまでを説明しました。ここでは主に圧縮を行った場合のメッセージの扱いについて記述を行います。

メッセージセット(圧縮)

まずKafkaメッセージ処理(1)で記述した、非圧縮の場合のメッセージセット構造を改めて以下に示します。

MessageSet構造(非圧縮)

このように圧縮を行わなかった場合にはメッセージにオフセットとメッセージ長を付加して並べているだけでした。
メッセージを圧縮した場合には上で示したメッセージ集合を圧縮したものを、ペイロードとして持つメッセージを作成します。以下に圧縮した場合のメッセージについての図を示します。

MessageSet構造(圧縮)

図のように、圧縮を行った場合にはメッセージ集合を圧縮したものをペイロードとして格納した1つのメッセージとなります。ここで圧縮メッセージをペイロードとしてもつメッセージに割り当てるオフセットの値は、圧縮されたメッセージ集合の最後のメッセージのオフセットと同じ値を格納します。また圧縮を施すメッセージ集合のmagic値はすべて同じ値となり、magic値が1の場合にはメッセージ集合内のオフセットの値が、先頭のメッセージからの差分の値が格納されることになります。
このようにしてメッセージを圧縮することによって、圧縮されていないメッセージと構造が同じになり内部的な処理が行いやすくなります。

メッセージのイテレート処理

圧縮をする場合も、しない場合もメッセージ集合は同じような構造で格納することになるます。そのため1つ1つのメッセージを読み込む際の処理は、まずメッセージのメタ情報として格納されている圧縮コードを見て、そのメッセージが圧縮されおらずペイロードにデータが格納されているのか、圧縮されたメッセージ集合がペイロードに格納されているのかを判断します。圧縮されたメッセージ集合を持つ場合にはペーロードに対してデコードを施し、格納されていたメッセージ集合をさらにイテレートしていきます。

shallowイテレートとdeepイテレート

メッセージ集合をイテレートしていく処理ですが、格納されているメッセージを1つ1つ取り出していく場合には圧縮されたメッセージをデコードしながらイテレートしていく必要がありますが、一方でメッセージ集合のサイズが全体でどれくらいになるのかを知りたい場合は圧縮されたメッセージをデコードせずに圧縮メッセージ集合をペイロードとしてもつメッセージを1つのメッセージをそのままイテレートしていけば良いことになります(メッセージをディスクに書き込む場合などはメッセージを圧縮したまま書き込みを行うことになるので、この場合は圧縮された場合のメッセージ集合のサイズを調べます)。

このように圧縮されたメッセージを展開しないでイテレートしていく機構がshallowIterator、圧縮されたメッセージを展開しながらイテレートする機構がdeepIteratorとして、それぞれ実装されています。以下はshallowイテレートとdeepイテレートの違いを示した図です。

shallowイテレートとdeepイテレート

図のように”メッセージ1″から”メッセージ6″までがあるとします(簡易化のためオフセットなどは省略しています)。deepイテレートではメッセージの1,3,4,5,6と圧縮されたメッセージを展開しながらメッセージを返していきます。一方でshallowイテレートの場合には圧縮を展開しないため1,2,3という順番でメッセージを返します。

Kafkaメッセージ処理(1)

ここではKafkaのメッセージについて記述していきます。
Kafkaではproducerから受け取ったメッセージをlogとしてディスクに書き込みながら管理を行います。

ログディレクトリ

そのデータはプロパティファイルの”log.dirs”プロパティで指定したディレクトリ(以下、ログディレクトリ)で管理されることになります。プロパティで指定するログディレクトリは1つでなくて複数でも可能で、CSVと同様にカンマ「,」区切りで記述します(ログディレクトリを1つしか指定しない場合には”log.dir”プロパティでも指定することが出来る)。

メッセージ構造

producerから受け取ったメッセージはTopicとPartitionごとに管理されます。
Topicはproducerから明示的に指定しますがPartitionは内部で自動的に割り当てられます。メッセージのTopicとPartitionが決定すると、そのメッセージはログディレクトリの「<Topic>-<Partirtoion>」に対応するディレクトリ内に格納されることになります。

1つのメッセージはMessageクラストして管理されます。以下にMessage構造についての図を示します。ソースコードでは”src/main/scala/kafka/message/message.scala”に対応します。

Message構造

Kafkaではバージョン0.10から、メッセージに対してタイムスタンプを付与出来るようになりました。そのためMessage構造はメッセージにタイムスタンプを付与するかどうかで、少し異なります。メッセージにタイムスタンプを付与しない場合にはmagicの値が0、付与する場合には1を格納します。
Message構造は一般的なキーバリューストアと同様にキーとペイロード(後述しますがバリュー値以外も圧縮されたメッセージも格納されます)とそのほかの情報から成り立ちます。

細かく見ていくと図中のCRC32は、メッセージ誤り検出のためのチェックサム値、magicにはマジックナンバー(メッセージにタイムスタンプを付与するかどうかで0か1が格納)、属性にはペイロードの圧縮を示すための値とタイムスタンプに関する情報がそれぞれ格納されます。またタイムスタンプを付与する場合には属性値の後に8byteのタイムスタンプ情報が加わります。
Messageペイロードの圧縮方法について見ていくと、属性値の下位3bitで圧縮コードが表され、0が未圧縮であることを示し、1がgzip,、2がsnappy、3がlz4の圧縮方法でそれぞれ圧縮されていることを表しています。

メッセージセット(非圧縮)

複数のメッセージをログディレクトリへと格納する際には、先程のMessages構造とは別にメッセージごとにユニークな値(ここではTopic-partitionごとにユニークな値)であるOffsetが割り当てられます。Offsetはメッセージに対してシーケンシャルに割り当てられる値と解釈出来る。
以上のことから、複数のメッセージをlogとしてログディレクトリ格納すると以下の図のようになります。ここでは圧縮をしない場合についてのパターンを示しています、

Kafkaでは1つずつのメッセージをその都度、ログディレクトリへ書き込むのではなく、いくつかのメッセージをまとめて(メッセージセットごとに)書き込むようになっていて、複数のメッセージをMessageSetオブジェクトとして扱います。ソースでは”src/main/scala/kafka/message/MessageSet.scala”や”src/main/scala/kafka/message/ByteBufferMessageSet.scala”に対応します。

MessageSet構造(非圧縮)

このようにKafkaではproducerから受け取ったメッセージをまとめてログディレクトリへ書き込んでいきます。