Technical Works

ASC Technical support site

*

Infinispanの応用

      2014/11/25

今回はInfinispanを利用した応用編となります。

今回紹介する機能は以下の2つです。
①Listener機能
②Map/Reduce機能

1.Listener機能

まず、Infinispanのキャッシュにデータ(Key/Value)をプット(PUT)した際、イベントが発生します。そのイベントを契機に処理を行うことが出来ます。たとえば、はじめに加工してない生のデータをそのままPUTして、非同期で、PUTされた後に、ルールエンジン(BRMS等)を利用してデータハンドリングを行うことが出来ます。

Listenerには2種類のレベルがあります。1つ目はキャッシュレベルのもので、2つ目はキャッシュ管理レベルがあります。Listenerは、@Listenerアノテーションを付与したPOJOとして作成します。イベント通知を受けるのもイベントに対応したアノテーションを付与したメソッドになります。

まず、キャッシュレベルのアノテーションと内容を以下に示します。

アノテーション イベント 内容
CacheEntriesEvicted CacheEntriesEvictedEvent Evictを有効にした場合に、メモリ上のCacheエントリ(複数)がCacheStoreに追い出された場合に発生
CacheEntryActivated CacheEntryActivatedEvent EvictとPassivationを有効にした場合に、CacheエントリをCacheStoreからロード(活性化)した時に発生
CacheEntryCreated CacheEntryCreatedEvent Cacheにエントリが追加された時(Cache#put)に発生
CacheEntryInvalidated CacheEntryInvalidatedEvent クラスタリングモードがINVALIDATION_SYNCまたはINVALIDATION_ASYNCの時かつCacheエントリの追加、変更等を行った時に、他のNode上のCacheエントリが無効になった場合に発生。複数のNodeで、同じキーに対して操作を行うことで発生
CacheEntryLoaded CacheEntryLoadedEvent CacheのエントリをCacheStoreからロードした時に発生。参照時や変更時等に必要に応じて発生
CacheEntryModified CacheEntryModifiedEvent Cacheのエントリが変更された時(Cache#replace)に発生。追加時(Cache#put)にも発生する
CacheEntryPassivated CacheEntryPassivatedEvent EvictとPassivationを有効にした場合に、CacheエントリをCacheStoreに保存(非活性化)した時に発生
CacheEntryRemoved CacheEntryRemovedEvent Cacheのエントリを削除した時(Cache#remove)に発生
CacheEntryVisited CacheEntryVisitedEvent Cacheのエントリを参照した時(Cache#get)に発生
DataRehashed DataRehashedEvent クラスタリングモードの時に、クラスタメンバの増減によりデータのリハッシュを行う際の開始、終了時に発生
TopologyChanged TopologyChangedEvent クラスタリングモードの時に、クラスタメンバの増減によりトポロジの構成に変更があった時の開始、終了で発生
TransactionCompleted TransactionCompletedEvent トランザクションが有効な時に、トランザクションが完了した時(コミット、ロールバック)に発生
TransactionRegistered TransactionRegisteredEvent トランザクションが有効な時に、トランザクションが開始された時に発生

 

次に、キャッシュ管理レベルのアノテーションと内容を以下に示します。

アノテーション イベント 内容
CacheStarted CacheStartedEvent Cacheの開始時(Cache#start)に発生。停止後のCacheを再度開始させるとListenerの登録は解除されるため必要であれば再度登録しなければならない
CacheStopped CacheStoppedEvent Cacheの終了時(Cache#stop)に発生
Merged MergedEvent 分断されたクラスタがマージされた場合に発生
ViewChanged ViewChangedEvent クラスタ参加メンバの増減によりクラスタ構成が変化した場合に発生

 

以下にListener用のサンプルソースを示します。

import org.infinispan.Cache;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryActivated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryLoaded;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryPassivated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited;
import org.infinispan.notifications.cachelistener.event.CacheEntryActivatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryLoadedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryPassivatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryVisitedEvent;
import org.infinispan.notifications.cachemanagerlistener.annotation.CacheStarted;
import org.infinispan.notifications.cachemanagerlistener.annotation.CacheStopped;
import org.infinispan.notifications.cachemanagerlistener.event.CacheStartedEvent;
import org.infinispan.notifications.cachemanagerlistener.event.CacheStoppedEvent;

public class TestListener {

	public static void main(String[] args) {

		TestListener test = new TestListener();

		test.masterNode();
	}

	private void masterNode() {

		DefaultCacheManager manager = null;
		try {
			manager = new DefaultCacheManager();
			Cache<String, String> cache = manager.getCache("default");

			manager.addListener(new CacheManagerLevelListener());
			cache.addListener(new CacheLevelListener());

			cache.stop();
			cache.start();

			cache.addListener(new CacheLevelListener());

			System.out.println("get");
			System.out.println("get:" + cache.get("key1"));

			System.out.println("put");
			cache.put("key1", "val1");

			System.out.println("get");
			System.out.println("get:" + cache.get("key1"));

			System.out.println("put");
			cache.put("key1", "val1_1");

			System.out.println("remove");
			cache.remove("key1");

			cache.stop();

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (manager != null) {
				manager.stop();
			}
		}
	}

	@Listener
	@SuppressWarnings("rawtypes")
	class CacheLevelListener {

		@CacheEntryCreated
		public void cacheEntryCreated(CacheEntryCreatedEvent event) {
			if (!event.isPre()) {
				System.out.println("cacheEntryCreated:" + event.getKey() + "="
						+ event.getValue() + "," + event.isPre() + ","
						+ event.getType());
			}
		}

		@CacheEntryModified
		public void cacheEntryModified(CacheEntryModifiedEvent event) {
			if (!event.isPre()) {
				System.out.println("cacheEntryModified:" + event.getKey() + "="
						+ event.getValue() + "," + event.isPre() + ","
						+ event.getType());
			}
		}

		@CacheEntryRemoved
		public void cacheEntryRemoved(CacheEntryRemovedEvent event) {
			if (!event.isPre()) {
				System.out.println("cacheEntryRemoved:" + event.getKey() + "="
						+ event.getValue() + "," + event.isPre() + ","
						+ event.getType());
			}
		}

		@CacheEntryVisited
		public void cacheEntryVisited(CacheEntryVisitedEvent event) {
			if (!event.isPre()) {
				System.out.println("cacheEntryVisited:" + event.getKey() + "="
						+ event.getValue() + "," + event.isPre() + ","
						+ event.getType());
			}
		}

		@CacheEntryLoaded
		public void cacheEntryLoaded(CacheEntryLoadedEvent event) {
			if (!event.isPre()) {
				System.out.println("cacheEntryLoaded:" + event.getKey() + "="
						+ event.getValue() + "," + event.isPre() + ","
						+ event.getType());
			}
		}

		@CacheEntryActivated
		@CacheEntryPassivated
		public void cacheEntryActivatedOrPassivated(CacheEntryEvent event) {
			if (event instanceof CacheEntryActivatedEvent) {
				System.out.println("cacheEntryActivated:" + event.getKey()
						+ "=" + ((CacheEntryActivatedEvent) event).getValue());
			} else if (event instanceof CacheEntryPassivatedEvent) {
				System.out.println("cacheEntryPassivated:" + event.getKey()
						+ "=" + ((CacheEntryPassivatedEvent) event).getValue());
			} else {
				System.out.println("event:" + event);
			}
		}
	}

	@Listener
	class CacheManagerLevelListener {

		@CacheStarted
		public void cacheStarted(CacheStartedEvent event) {
			System.out.println("cacheStarted:" + event.getCacheName());
		}

		@CacheStopped
		public void cacheStopped(CacheStoppedEvent event) {
			System.out.println("cacheStopped:" + event.getCacheName());
		}
	}

}

 

2.MapReduce機能

Infinispanは単なるキャッシュサーバの機能だけではなく、キャッシュされたデータに対するMapReduceアプリケーションの実行基盤として使用できます。MapReduceは、データギリッド上の巨大なデータを分散されていることを意識せずに、分散処理を可能にします。MapフェーズとReduceフェーズの2つの異なる計算処理の考え方が名前の由来となっております。Mapフェーズでは、Master Nodeが入力を受け取って、グリッド上でMapフェーズを実行するために、分割して送信することでタスクを開始します。各NodeでのMap関数の実行結果が中間結果としてMaster Nodeに戻ってきます。Master NodeのタスクはMapフェーズの実行結果を全て集めて、中間結果のKeyで結合して、KeyとValueをグリッド上でReduceフェーズを行うために送信します。最後に、Master Nodeは全てのReduceフェーズの結果を受信して、MapReduceタスクの呼び出し元に結果を返却します。

infinispan_mapred

 MapReduceは以下の4つの部分で構成されます。

  • Mapper
  • Reducer
  • Collator
  • MapReduceTask

①Mapper

MapperはKey/Valueのペアを受け取り、処理の中間結果を再びKey/Valueの形でCollectorに渡します。以下にMapperのインターフェースを示します。

package org.infinispan.distexec.mapreduce;
import java.io.Serializable;

public interface Mapper<KIn, VIn, KOut, VOut> extends Serializable {
/**
 * Invoked once for each input cache entry KIn,VOut pair.
 */
void map(KIn key, VIn value, Collector<KOut, VOut> collector);
}

 

②Reducer

Reducerは、Mapperが作成した中間結果のKeyに対するCollectionをIteratorという形で受け取ります。処理結果は、単一の戻り値として返却します。以下にReducerのインターフェースを示します。

package org.infinispan.distexec.mapreduce;
import java.io.Serializable;
import java.util.Iterator;

public interface Reducer<KOut, VOut> extends Serializable {
/**
 * Combines/reduces all intermediate values for a particular intermediate key to a single value.
 */
VOut reduce(KOut reducedKey, Iterator iter);
}

 

③Collator

Collatorは、Reducerの実行結果となったMap<KOut, VOut>を呼び出し元に返却する際の集約処理を行います。

package org.infinispan.distexec.mapreduce;
import java.util.Map;

public interface Collator<KOut, VOut, R> {
/**
 * Collates all reduced results and returns R to invoker of distributed task.
 *
 * @return final result of distributed task computation
 */
R collate(Map<KOut, VOut> reducedResults);
}

 

以下のMapReduceサンプルソースの処理を説明します。キャッシュに乗っている特定のキー(keyで始まるもの)を使用して、該当するキーの場合のみ、値を2倍する処理を行います。ここで、「TestMapper」クラスは、該当キーだけを対象に値を2倍した結果を出力します。次に「TestReducer」クラスでは、同じキーの結果を集計(全部足し算する)します。最後に「TestCollator」クラスでは、すべての結果をまとめて、結果を作成します。この一連の流れる管理するのは「run」メソッドの中の「MapReduceTask」クラスで行います。MapReduceTaskは、キャッシュをキーにインスタンスを生成し、Mapper、Reducerクラスを指定し、MapReduceTaskクラスの「execute」メソッドを利用してMapReduceを実行します。その際に、Collatorクラスを指定します。詳細は以下のサンプルソースを参考にして下さい。

import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;

import org.infinispan.Cache;
import org.infinispan.distexec.mapreduce.Collator;
import org.infinispan.distexec.mapreduce.Collector;
import org.infinispan.distexec.mapreduce.MapReduceTask;
import org.infinispan.distexec.mapreduce.Mapper;
import org.infinispan.distexec.mapreduce.Reducer;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;

public class TestMapReduce {

	public static void main(String[] args) {
		TestMapReduce test = new TestMapReduce();
		System.setProperty("nodeName", "Test");
		test.run("dist");
	}

	public void prepare(Cache<String, Object> cache) {

		for (int i = 0; i < 10; i++) {
			cache.put("key" + (i + 1), i + 1);
		}

		for (int i = 0; i < 100; i++) {
			cache.put("test" + (i + 1), String.valueOf(i + 1));
		}
	}

	public void run(String cacheName) {

		EmbeddedCacheManager cacheManager = null;
		Cache<String, Object> cache = null;
		try {
			cacheManager = new DefaultCacheManager("infinispan.xml");
			cache = cacheManager.getCache(cacheName);

			prepare(cache);

			MapReduceTask<String, Object, String, Integer> task = new MapReduceTask<>(
					cache);
			task.mappedWith(new TestMapper()).reducedWith(new TestReducer());

			Object result = task.execute(new TestCollator());
			System.out.println("result: " + result);

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			cache.stop();
			cacheManager.stop();
		}
	}

	static class TestMapper implements Mapper<String, Object, String, Integer> {

		@Override
		public void map(String paramKIn, Object paramVIn,
				Collector<String, Integer> paramCollector) {
			try {
				if (paramKIn.startsWith("key")) {
					paramCollector.emit(paramKIn, ((Integer) paramVIn) * 2);
				}
			} catch (NumberFormatException e) {
				e.printStackTrace();
			}
		}
	}

	static class TestReducer implements Reducer<String, Integer> {

		@Override
		public Integer reduce(String paramKOut, Iterator paramIterator) {

			Integer sum = 0;
			while (paramIterator.hasNext()) {
				sum += paramIterator.next();
			}
			return sum;
		}
	}

	static class TestCollator implements Collator<String, Integer, Integer> {

		@Override
		public Integer collate(Map<String, Integer> paramMap) {

			Integer sum = 0;
			for (Entry<String, Integer> entry : paramMap.entrySet()) {
				sum += entry.getValue();
			}
			return sum;
		}
	}

}

 

ソースのコンパイル及び実行に関しては前回の資料を参考にして下さい。

The following two tabs change content below.

最新記事 by 李 (全て見る)

 - JBoss

Loading Facebook Comments ...

Message

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

CAPTCHA


*

  関連記事

Infinispanの紹介

はじめに Infinispanはオープンソースデータグリッドプラットフォームで、 …

Jbossを利用したWebアプリケーションローカル環境構築手順(1/3)

Webアプリケーションの、ちょっとした動作確認や 自由に利用できる自分専用のデー …

Active MQ紹介

1.Message Queueとは まず、メッセージとはアプリケーションにとって …

Drools(Java Rules Engine)の紹介

ルール・エンジンとは ルール・エンジンとは、ビジネス上のルールといった「分岐処理 …