Kafka offset查询

Kafkakairenlo 发表了文章 • 0 个评论 • 33 次浏览 • 2017-04-13 14:18 • 来自相关话题

1)
查询消息总量
/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node17:6667 --topic ssp-info-7 --time -1

1.1)
查询group列表
/usr/hdp/current/kafka-broker/bin/kafka-consumer-groups.sh --list --bootstrap-server node17:6667 --new-consumer
/usr/hdp/current/kafka-broker/bin/kafka-consumer-groups.sh --list --zookeeper node17:2181/kafka

2) 查询所有offset,要设置:consumer.properties : exclude.internal.topics=false
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper node17:2181/kafka --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning --delete-consumer-offsets

3) 查询指定group的消费offset
3.1) 获取group对应的在__consumer_offsets的partition算法,下面3.2和3.3命令里指定这个值去查询:
Math.abs("g0005".hashCode()) % 50 == 0
Math.abs("g003".hashCode()) % 50 == 40

3.2) group.id=g0005
/usr/hdp/current/kafka-broker/bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list node17:6667 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

3.3) group.id=g003
/usr/hdp/current/kafka-broker/bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 40 --broker-list node17:6667 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" 查看全部
1)
查询消息总量
/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node17:6667 --topic ssp-info-7 --time -1

1.1)
查询group列表
/usr/hdp/current/kafka-broker/bin/kafka-consumer-groups.sh --list --bootstrap-server node17:6667 --new-consumer
/usr/hdp/current/kafka-broker/bin/kafka-consumer-groups.sh --list --zookeeper node17:2181/kafka

2) 查询所有offset,要设置:consumer.properties : exclude.internal.topics=false
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper node17:2181/kafka --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning --delete-consumer-offsets

3) 查询指定group的消费offset
3.1) 获取group对应的在__consumer_offsets的partition算法,下面3.2和3.3命令里指定这个值去查询:
Math.abs("g0005".hashCode()) % 50 == 0
Math.abs("g003".hashCode()) % 50 == 40

3.2) group.id=g0005
/usr/hdp/current/kafka-broker/bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list node17:6667 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

3.3) group.id=g003
/usr/hdp/current/kafka-broker/bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 40 --broker-list node17:6667 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"






ELK(ElasticSearch, Logstash, Kibana)搭建实时日志分析平台

大数据hadoopkairenlo 发表了文章 • 0 个评论 • 23 次浏览 • 2017-04-11 14:34 • 来自相关话题

ELK下载: https://www.elastic.co/downloads/

yum安装MariaDB的无法启动

回复

Linuxkairenlo 发起了问题 • 1 人关注 • 0 个回复 • 47 次浏览 • 2017-03-28 12:32 • 来自相关话题

Centos7下yum快速安装 Mariadb(MySql)

MySQLkairenlo 发表了文章 • 0 个评论 • 40 次浏览 • 2017-03-28 11:50 • 来自相关话题

从最新版本的centos系统开始,默认的是 Mariadb而不是mysql!

使用系统自带的repos安装很简单:#安装
yum install mariadb mariadb-server
#启动mariadb
systemctl start mariadb
#开机自启动
systemctl enable mariadb
#设置 root密码等相关
mysql_secure_installation
#测试登录!
mysql -uroot -p123456结束! 查看全部
从最新版本的centos系统开始,默认的是 Mariadb而不是mysql!

使用系统自带的repos安装很简单:
#安装
yum install mariadb mariadb-server
#启动mariadb
systemctl start mariadb
#开机自启动
systemctl enable mariadb
#设置 root密码等相关
mysql_secure_installation
#测试登录!
mysql -uroot -p123456
结束!

Linux CentOS 使用命令设置网络代理

Linuxkairenlo 发表了文章 • 0 个评论 • 40 次浏览 • 2017-03-28 01:59 • 来自相关话题

设置HTTP全局代理,方法如下:
修改 /etc/profile 文件,添加下面内容: http_proxy=http://username:password@yourproxy:8080/
ftp_proxy=http://username:password@yourproxy:8080/
export http_proxy
export ftp_proxy
如果没有密码限制,则以上内容可以修改为以下内容: http_proxy=http://yourproxy:8080/
ftp_proxy=http://yourproxy:8080/
export http_proxy
export ftp_proxy若只针对某个用户而言,则修改 ~/.bash_profile 文件,添加相同内容;
修改完成后,注销重新登录即可。
 
设置yum代理:
要另外设置 /etc/yum.conf 文件,添加以下代码: proxy=http://username:password@yourproxy:8080/
#若无密码限制,则为以下方式:
proxy=http://yourproxy:8080/这样yum的操作就通过代理了。 

设置wget代理:
在~/.wgetrc中设置http_proxy = http://yourproxy:8080/
ftp_proxy = http://yourproxy:8080/
use_proxy = on
wait = 15 查看全部
设置HTTP全局代理,方法如下:
修改 /etc/profile 文件,添加下面内容: 
http_proxy=http://username:password@yourproxy:8080/ 
ftp_proxy=http://username:password@yourproxy:8080/
export http_proxy
export ftp_proxy

如果没有密码限制,则以上内容可以修改为以下内容: 
http_proxy=http://yourproxy:8080/ 
ftp_proxy=http://yourproxy:8080/
export http_proxy
export ftp_proxy
若只针对某个用户而言,则修改 ~/.bash_profile 文件,添加相同内容;
修改完成后,注销重新登录即可。
 
设置yum代理:
要另外设置 /etc/yum.conf 文件,添加以下代码: 
proxy=http://username:password@yourproxy:8080/ 
#若无密码限制,则为以下方式:
proxy=http://yourproxy:8080/
这样yum的操作就通过代理了。 

设置wget代理:
在~/.wgetrc中设置
http_proxy = http://yourproxy:8080/
ftp_proxy = http://yourproxy:8080/
use_proxy = on
wait = 15

Hadoop web加安全机制

大数据hadoopkairenlo 发表了文章 • 0 个评论 • 46 次浏览 • 2017-03-26 18:57 • 来自相关话题

在core-site.xml中加入如下属性:
1.修改文件core-site.xml<property>
<name>hadoop.http.filter.initializers</name>
<value>org.apache.hadoop.security.AuthenticationFilterInitializer</value>
</property>
<property>
<name>hadoop.http.authentication.type</name>
<value>simple</value>
</property>
<property>
<name>hadoop.http.authentication.token.validity</name>
<value>3600</value>
</property>
<property>
<name>hadoop.http.authentication.signature.secret.file</name>
<value>/hadoop-http-auth-signature-secret</value>
</property>
<property>
<name>hadoop.http.authentication.cookie.domain</name>
<value></value>
</property>
<property>
<name>hadoop.http.authentication.simple.anonymous.allowed</name>
<value>false</value>
</property>2.生成文件hadoop-http-auth-signature-secret
命令:echo \"xcg\" > /hadoop-http-auth-signature-secret3.将生成的文件复制到其他data节点主机对应目录下,重启集群。

4.在地址栏用http://192.168.1.101:50070(我的namenode节点主机)会401错误,但在地址后加上?user.name=xcg,xcg是刚才我在/hadoop-http-auth-signature-secret文件中输入的用户名,这时可以正常访问了。 查看全部
在core-site.xml中加入如下属性:
1.修改文件core-site.xml
<property>
<name>hadoop.http.filter.initializers</name>
<value>org.apache.hadoop.security.AuthenticationFilterInitializer</value>
</property>
<property>
<name>hadoop.http.authentication.type</name>
<value>simple</value>
</property>
<property>
<name>hadoop.http.authentication.token.validity</name>
<value>3600</value>
</property>
<property>
<name>hadoop.http.authentication.signature.secret.file</name>
<value>/hadoop-http-auth-signature-secret</value>
</property>
<property>
<name>hadoop.http.authentication.cookie.domain</name>
<value></value>
</property>
<property>
<name>hadoop.http.authentication.simple.anonymous.allowed</name>
<value>false</value>
</property>
2.生成文件hadoop-http-auth-signature-secret
命令:
echo \"xcg\" > /hadoop-http-auth-signature-secret
3.将生成的文件复制到其他data节点主机对应目录下,重启集群。

4.在地址栏用http://192.168.1.101:50070(我的namenode节点主机)会401错误,但在地址后加上?user.name=xcg,xcg是刚才我在/hadoop-http-auth-signature-secret文件中输入的用户名,这时可以正常访问了。

DBeaver的Phoenix包

大数据hadoopkairenlo 发表了文章 • 0 个评论 • 52 次浏览 • 2017-03-25 02:48 • 来自相关话题

org.apache.phoenix\phoenix-core-4.9.0-HBase-1.1.jar
org.apache.phoenix\phoenix-server-4.7.0-HBase-1.1.jar
org.apache.phoenix\phoenix-core-4.9.0-HBase-1.1.jar
org.apache.phoenix\phoenix-server-4.7.0-HBase-1.1.jar

Phoenix4.8.1安装

大数据hadoopkairenlo 发表了文章 • 0 个评论 • 43 次浏览 • 2017-03-25 02:10 • 来自相关话题

Phoenix是一个提供hbase的sql操作的框架,Phoenix是构建在HBase之上的一个SQL中间层。Phoenix完全使用Java编写,代码位于GitHub上,并且提供了一个客户端可嵌入的JDBC驱动。对于简单的低延迟查询,其性能量级为毫秒;对于百万级别的行数来说,其性能量级为秒。Phoenix并不是像HBase那样
用于map-reduce job的,而是通过标准化的语言来访问HBase数据的。
 
1、phoenix安装
上传apache-phoenix-4.8.1-HBase-1.2-bin.tar.gz文件到linux的/usr/hadoop目录下面
解压:tar -zxvf apache-phoenix-4.8.1-Hbase-1.2-bin.tar.gz生成新的目录apache-phoenix-4.8.1-HBase-1.2-bin
将其配置到环境变量中去(可选操作)#apache-phoenix
export PHOENIX_HOME=/usr/Hadoop/apache-phoenix-4.8.1-HBase-1.2-bin
export PHOENIX_CLASSPATH=$PHOENIX_HOME/lib
export PATH=$PATH:$PHOENIX_HOME/bin
配置完以后是环境变量生效source /etc/profile2、hbase设置

将/usr/hadoop/ apache-phoenix-4.8.1-HBase-1.2-bin目录下面的phoenix-4.8.1-HBase-1.2-server.jar文件拷贝到每一台HRegionServer的hbase安装目录的lib目录下面去cp phoenix-4.8.1-HBase-1.2-server.jar /usr/hadoop/hbase-1.2.3/lib/
scp phoenix-4.8.1-HBase-1.2-server.jarroot@node1:/usr/hadoop/hbase-1.2.3/lib/
scp phoenix-4.8.1-HBase-1.2-server.jar root@node2:/usr/hadoop/hbase-1.2.3/lib/
重新启动hbasestop-hbase.sh
start-hbase.sh3、启动phoenix

进入phoenix目录的bin目录下面,进入sqlline命令行#格式即:sqlline.py + 主机名 + zookeeper端口号 2181
./sqlline.py master,node1,node2:2181
然后进行测试#列出表名
!tables;  查看全部
Phoenix是一个提供hbase的sql操作的框架,Phoenix是构建在HBase之上的一个SQL中间层。Phoenix完全使用Java编写,代码位于GitHub上,并且提供了一个客户端可嵌入的JDBC驱动。对于简单的低延迟查询,其性能量级为毫秒;对于百万级别的行数来说,其性能量级为秒。Phoenix并不是像HBase那样
用于map-reduce job的,而是通过标准化的语言来访问HBase数据的。
 
1、phoenix安装
上传apache-phoenix-4.8.1-HBase-1.2-bin.tar.gz文件到linux的/usr/hadoop目录下面
解压:
tar -zxvf apache-phoenix-4.8.1-Hbase-1.2-bin.tar.gz
生成新的目录apache-phoenix-4.8.1-HBase-1.2-bin
将其配置到环境变量中去(可选操作)
#apache-phoenix
export PHOENIX_HOME=/usr/Hadoop/apache-phoenix-4.8.1-HBase-1.2-bin
export PHOENIX_CLASSPATH=$PHOENIX_HOME/lib
export PATH=$PATH:$PHOENIX_HOME/bin

配置完以后是环境变量生效
source /etc/profile
2、hbase设置

将/usr/hadoop/ apache-phoenix-4.8.1-HBase-1.2-bin目录下面的phoenix-4.8.1-HBase-1.2-server.jar文件拷贝到每一台HRegionServer的hbase安装目录的lib目录下面去
cp phoenix-4.8.1-HBase-1.2-server.jar /usr/hadoop/hbase-1.2.3/lib/
scp phoenix-4.8.1-HBase-1.2-server.jarroot@node1:/usr/hadoop/hbase-1.2.3/lib/
scp phoenix-4.8.1-HBase-1.2-server.jar root@node2:/usr/hadoop/hbase-1.2.3/lib/

重新启动hbase
stop-hbase.sh
start-hbase.sh
3、启动phoenix

进入phoenix目录的bin目录下面,进入sqlline命令行
#格式即:sqlline.py + 主机名 + zookeeper端口号 2181
./sqlline.py master,node1,node2:2181

然后进行测试
#列出表名
!tables;
 

Linux搭建代理服务器

Linuxkairenlo 发表了文章 • 0 个评论 • 53 次浏览 • 2017-03-25 01:18 • 来自相关话题

 首先有一台能上外网的服务器,20.1.200.106linux系统下,其他工作机用不了外网。用此机器搭建代理服务器。
 
1.如果系统中还没有装squid,按以下顺序输入命令后即可完成安装#下载Squid代理安装包
wget http://www.squid-cache.org/Ver ... ar.gz
#解压Squid安装包
tar -zxvf squid-3.0.STABLE18.tar.gz
cd squid-3.0.STABLE18
#配置Squid代理安装路径之类的
./configure --prefix=/usr/local/squid --sysconfdir=/usr/local/squid/etc --bindir=/usr/local/squid/bin --sbindir=/usr/local/squid/sbin --mandir=/usr/local/squid/share/man --enable-gnuregex --enable-carp --enable-async-io=80 --enable-removal-policies=heap,lru --enable-icmp --enable-delay-pools --enable-useragent-log --enable-referer-log --enable-kill-parent-hack --enable-snmp --enable-arp-acl --enable-htcp --enable-cache-digests --enable-default-err-language=Simplify_Chinese --enable-err-languages="Simplify_Chinese" --enable-poll --enable-linux-netfilter --disable-ident-lookups --enable-underscores --enable-auth="basic" --enable-basic-auth-helpers="NCSA" --enable-external-acl-helpers="ip_user" --enable-x-accelerator-vary
#编译
make
#安装Squid代理软件
make install
 2.安装好后配置文件在/usr/local/squid/etc目录中,删除原有的squid.conf配置文件,新建squid.conf文件,只输入以下内容:http_port 1080 transparent
http_access allow all
visible_hostname webfree注意,以上是squid2.6及以后版本的配置,如果是2.6以前版本的squid,配置如下:http_port 1080
httpd_accel_host virtual
httpd_accel_port 1080
httpd_accel_with_proxy on
httpd_accel_uses_host_header on3./usr/local/squid/sbin目录下的squid为运行文件。
第一次运行时,先运行:/usr/local/squid/sbin/squid -z
即创建缓存文件夹。缓存文件夹在/usr/local/squid/var下创建,因此创建缓存前还需运行:chmod 777 /usr/local/squid/var -R
即给该文件夹权限。
再运行:/usr/local/squid/sbin/squid -d 1即开启squid。
如果要关闭squid,运行:/usr/local/squid/sbin/squid -k shutdown
则是安全关闭。
注意,如果没有将新装的squid配置成环境变量的话,尽量在sbin目录下运行./squid,以免运行了服务器上已有的旧squid。
 
可能的问题:
Squid有时候运行时会有报错:
clientNatLookup: NF getsockopt(SO_ORIGINAL_DST) failed: (92) Protocol not available
不过似乎不影响访问
 
4.如果需要设置代理服务器密码,那么squid.conf配置如下:auth_param basic program /usr/local/squid/libexec/ncsa_auth /usr/local/squid/etc/passwd
auth_param basic children 5
auth_param basic credentialsttl 2 hours
auth_param basic realm Example.com's Squid proxy-caching
http_port 1080
acl auth_user proxy_auth REQUIRED
http_access allow auth_user
visible_hostname webfree
上面第一行是配置用户和密码文件,生成该文件命令如下:htpasswd -c /usr/local/squid/etc/passwd kairenlo
kairenlo 是用户名,密码会在该命令过程中输入。 查看全部
 首先有一台能上外网的服务器,20.1.200.106linux系统下,其他工作机用不了外网。用此机器搭建代理服务器。
 
1.如果系统中还没有装squid,按以下顺序输入命令后即可完成安装
#下载Squid代理安装包
wget http://www.squid-cache.org/Ver ... ar.gz
#解压Squid安装包
tar -zxvf squid-3.0.STABLE18.tar.gz
cd squid-3.0.STABLE18
#配置Squid代理安装路径之类的
./configure --prefix=/usr/local/squid --sysconfdir=/usr/local/squid/etc --bindir=/usr/local/squid/bin --sbindir=/usr/local/squid/sbin --mandir=/usr/local/squid/share/man --enable-gnuregex --enable-carp --enable-async-io=80 --enable-removal-policies=heap,lru --enable-icmp --enable-delay-pools --enable-useragent-log --enable-referer-log --enable-kill-parent-hack --enable-snmp --enable-arp-acl --enable-htcp --enable-cache-digests --enable-default-err-language=Simplify_Chinese --enable-err-languages="Simplify_Chinese" --enable-poll --enable-linux-netfilter --disable-ident-lookups --enable-underscores --enable-auth="basic" --enable-basic-auth-helpers="NCSA" --enable-external-acl-helpers="ip_user" --enable-x-accelerator-vary
#编译
make
#安装Squid代理软件
make install

 2.安装好后配置文件在/usr/local/squid/etc目录中,删除原有的squid.conf配置文件,新建squid.conf文件,只输入以下内容:
http_port 1080 transparent
http_access allow all
visible_hostname webfree
注意,以上是squid2.6及以后版本的配置,如果是2.6以前版本的squid,配置如下:
http_port 1080
httpd_accel_host virtual
httpd_accel_port 1080
httpd_accel_with_proxy on
httpd_accel_uses_host_header on
3./usr/local/squid/sbin目录下的squid为运行文件。
第一次运行时,先运行:
/usr/local/squid/sbin/squid -z

即创建缓存文件夹。缓存文件夹在/usr/local/squid/var下创建,因此创建缓存前还需运行:
chmod 777 /usr/local/squid/var -R

即给该文件夹权限。
再运行:
/usr/local/squid/sbin/squid -d 1
即开启squid。
如果要关闭squid,运行:
/usr/local/squid/sbin/squid -k shutdown

则是安全关闭。
注意,如果没有将新装的squid配置成环境变量的话,尽量在sbin目录下运行./squid,以免运行了服务器上已有的旧squid。
 
可能的问题:
Squid有时候运行时会有报错:
clientNatLookup: NF getsockopt(SO_ORIGINAL_DST) failed: (92) Protocol not available
不过似乎不影响访问
 
4.如果需要设置代理服务器密码,那么squid.conf配置如下:
auth_param basic program /usr/local/squid/libexec/ncsa_auth /usr/local/squid/etc/passwd
auth_param basic children 5
auth_param basic credentialsttl 2 hours
auth_param basic realm Example.com's Squid proxy-caching
http_port 1080
acl auth_user proxy_auth REQUIRED
http_access allow auth_user
visible_hostname webfree

上面第一行是配置用户和密码文件,生成该文件命令如下:
htpasswd -c /usr/local/squid/etc/passwd kairenlo 

kairenlo 是用户名,密码会在该命令过程中输入。

Phoenix与Spark整合

Sparkkairenlo 发表了文章 • 0 个评论 • 43 次浏览 • 2017-03-24 20:13 • 来自相关话题

目的是将phoenix做存储,spark做计算层。这样就结合了phoenix查询速度快和spark计算速度快的优点。
在这里将Phoenix的表作为spark的RDD或者DataFrames来操作,并且将操作的结果写回phoenix中。
这样做也扩大了两者的使用场景。

Phoenix 版本 4.4.0
Hbase版本 0.98
spark版本 spark-1.5.2-bin-hadoop2.6
 
首先配置 SPARK_CLASSPATH
要想在spark中操作phoenix,就必须让spark可以找到phoenix的相关类,所以我们把client放到spark_classpath中:
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/phoenix/phoenix-spark-4.4.0-HBase-0.98-tests.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/phoenix/phoenix-4.4.0-HBase-0.98-client.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/phoenix/phoenix-server-client-4.4.0-HBase-0.98.jar
这样就可以在spark-shell中操作phoenix了
下来结合两者做下实验:
在phoenix中创建几张表
[hadoop@10.10.113.45 ~/phoenix/bin]$>./sqlline.py 10.10.113.45:2181
0: jdbc:phoenix:10.10.113.45:2181> CREATE TABLE EMAIL_ENRON(
. . . . . . . . . . . . . . . . .> MAIL_FROM BIGINT NOT NULL,
. . . . . . . . . . . . . . . . .> MAIL_TO BIGINT NOT NULL
. . . . . . . . . . . . . . . . .> CONSTRAINT pk PRIMARY KEY(MAIL_FROM, MAIL_TO));
0: jdbc:phoenix:10.10.113.45:2181> CREATE TABLE EMAIL_ENRON_PAGERANK(
. . . . . . . . . . . . . . . . .> ID BIGINT NOT NULL,
. . . . . . . . . . . . . . . . .> RANK DOUBLE
. . . . . . . . . . . . . . . . .> CONSTRAINT pk PRIMARY KEY(ID));
No rows affected (0.52 seconds查看下是否创建成功:0: jdbc:phoenix:10.10.113.45:2181> !tables
+------------------------------------------+------------------------------------------+------------------------------------------+--------------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | |
+------------------------------------------+------------------------------------------+------------------------------------------+--------------+
| | SYSTEM | CATALOG | SYSTEM TABLE |
| | SYSTEM | FUNCTION | SYSTEM TABLE |
| | SYSTEM | SEQUENCE | SYSTEM TABLE |
| | SYSTEM | STATS | SYSTEM TABLE |
| | | EMAIL_ENRON | TABLE |
| | | EMAIL_ENRON_PAGERANK | TABLE |
+------------------------------------------+------------------------------------------+------------------------------------------+--------------+
0: jdbc:phoenix:10.10.113.45:2181>
在将数据load到phoenix中,数据有40万行
[hadoop@10.10.113.45 ~/phoenix/bin]$>./psql.py -t EMAIL_ENRON 10.10.113.45:2181 /home/hadoop/sfs/enron.csv
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
15/12/03 10:06:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
csv columns from database.
CSV Upsert complete. 367662 rows upserted
Time: 21.783 sec(s)数据来源:https://snap.stanford.edu/data/email-Enron.html
然后在查询下:0: jdbc:phoenix:10.10.113.45:2181> select count(*) from EMAIL_ENRON;
+------------------------------------------+
| COUNT(1) |
+------------------------------------------+
| 367662 |
+------------------------------------------+
1 row selected (0.289 seconds)看37万数据,查询不到一秒!!!
下面进入到spark-shell 的交互模式,我们做一个PageRank 算法的例子:
[hadoop@10.10.113.45 ~/spark/bin]$>./spark-shell
scala> import org.apache.spark.graphx._
import org.apache.spark.graphx._
scala> import org.apache.phoenix.spark._
import org.apache.phoenix.spark._
scala> val rdd = sc.phoenixTableAsRDD("EMAIL_ENRON", Seq("MAIL_FROM", "MAIL_TO"), zkUrl=Some("10.10.113.45"))
rdd: org.apache.spark.rdd.RDD[Map[String,AnyRef]] = MapPartitionsRDD[2] at map at SparkContextFunctions.scala:39
scala> val rawEdges = rdd.map{ e => (e("MAIL_FROM").asInstanceOf[VertexId], e("MAIL_TO").asInstanceOf[VertexId]) }
rawEdges: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId)] = MapPartitionsRDD[3] at map at <console>:29
scala> val graph = Graph.fromEdgeTuples(rawEdges, 1.0)
graph: org.apache.spark.graphx.Graph[Double,Int] = org.apache.spark.graphx.impl.GraphImpl@621bb3c3
scala> val pr = graph.pageRank(0.001)
pr: org.apache.spark.graphx.Graph[Double,Double] = org.apache.spark.graphx.impl.GraphImpl@55e444b1
scala> pr.vertices.saveToPhoenix("EMAIL_ENRON_PAGERANK", Seq("ID", "RANK"), zkUrl = Some("10.10.113.45"))(这一步会很耗内存,可能有的同学在测试的时候会报OOM,建议增大spark中executor memory,driver memory的大小)我们在去phoenix中查看一下结果:0: jdbc:phoenix:10.10.113.45:2181> select count(*) from EMAIL_ENRON_PAGERANK;
+------------------------------------------+
| COUNT(1) |
+------------------------------------------+
| 29000 |
+------------------------------------------+
1 row selected (0.113 seconds)
0: jdbc:phoenix:10.10.113.45:2181> SELECT * FROM EMAIL_ENRON_PAGERANK ORDER BY RANK DESC LIMIT 5;
+------------------------------------------+------------------------------------------+
| ID | RANK |
+------------------------------------------+------------------------------------------+
| 273 | 117.18141799210386 |
| 140 | 108.63091596789913 |
| 458 | 107.2728800448782 |
| 588 | 106.11840798585399 |
| 566 | 105.13932886531066 |
+------------------------------------------+------------------------------------------+
5 rows selected (0.568 seconds) 查看全部
目的是将phoenix做存储,spark做计算层。这样就结合了phoenix查询速度快和spark计算速度快的优点。
在这里将Phoenix的表作为spark的RDD或者DataFrames来操作,并且将操作的结果写回phoenix中。
这样做也扩大了两者的使用场景。

Phoenix 版本 4.4.0
Hbase版本 0.98
spark版本 spark-1.5.2-bin-hadoop2.6
 
首先配置 SPARK_CLASSPATH
要想在spark中操作phoenix,就必须让spark可以找到phoenix的相关类,所以我们把client放到spark_classpath中:
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/phoenix/phoenix-spark-4.4.0-HBase-0.98-tests.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/phoenix/phoenix-4.4.0-HBase-0.98-client.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/phoenix/phoenix-server-client-4.4.0-HBase-0.98.jar

这样就可以在spark-shell中操作phoenix了
下来结合两者做下实验:
  • 在phoenix中创建几张表

[hadoop@10.10.113.45 ~/phoenix/bin]$>./sqlline.py 10.10.113.45:2181
0: jdbc:phoenix:10.10.113.45:2181> CREATE TABLE EMAIL_ENRON(
. . . . . . . . . . . . . . . . .> MAIL_FROM BIGINT NOT NULL,
. . . . . . . . . . . . . . . . .> MAIL_TO BIGINT NOT NULL
. . . . . . . . . . . . . . . . .> CONSTRAINT pk PRIMARY KEY(MAIL_FROM, MAIL_TO));
0: jdbc:phoenix:10.10.113.45:2181> CREATE TABLE EMAIL_ENRON_PAGERANK(
. . . . . . . . . . . . . . . . .> ID BIGINT NOT NULL,
. . . . . . . . . . . . . . . . .> RANK DOUBLE
. . . . . . . . . . . . . . . . .> CONSTRAINT pk PRIMARY KEY(ID));
No rows affected (0.52 seconds
查看下是否创建成功:
0: jdbc:phoenix:10.10.113.45:2181> !tables
+------------------------------------------+------------------------------------------+------------------------------------------+--------------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | |
+------------------------------------------+------------------------------------------+------------------------------------------+--------------+
| | SYSTEM | CATALOG | SYSTEM TABLE |
| | SYSTEM | FUNCTION | SYSTEM TABLE |
| | SYSTEM | SEQUENCE | SYSTEM TABLE |
| | SYSTEM | STATS | SYSTEM TABLE |
| | | EMAIL_ENRON | TABLE |
| | | EMAIL_ENRON_PAGERANK | TABLE |
+------------------------------------------+------------------------------------------+------------------------------------------+--------------+
0: jdbc:phoenix:10.10.113.45:2181>

  • 在将数据load到phoenix中,数据有40万行

[hadoop@10.10.113.45 ~/phoenix/bin]$>./psql.py -t EMAIL_ENRON 10.10.113.45:2181 /home/hadoop/sfs/enron.csv
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
15/12/03 10:06:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
csv columns from database.
CSV Upsert complete. 367662 rows upserted
Time: 21.783 sec(s)
数据来源:https://snap.stanford.edu/data/email-Enron.html
然后在查询下:
0: jdbc:phoenix:10.10.113.45:2181> select count(*) from EMAIL_ENRON;
+------------------------------------------+
| COUNT(1) |
+------------------------------------------+
| 367662 |
+------------------------------------------+
1 row selected (0.289 seconds)
看37万数据,查询不到一秒!!!
下面进入到spark-shell 的交互模式,我们做一个PageRank 算法的例子:
[hadoop@10.10.113.45 ~/spark/bin]$>./spark-shell
scala> import org.apache.spark.graphx._
import org.apache.spark.graphx._
scala> import org.apache.phoenix.spark._
import org.apache.phoenix.spark._
scala> val rdd = sc.phoenixTableAsRDD("EMAIL_ENRON", Seq("MAIL_FROM", "MAIL_TO"), zkUrl=Some("10.10.113.45"))
rdd: org.apache.spark.rdd.RDD[Map[String,AnyRef]] = MapPartitionsRDD[2] at map at SparkContextFunctions.scala:39
scala> val rawEdges = rdd.map{ e => (e("MAIL_FROM").asInstanceOf[VertexId], e("MAIL_TO").asInstanceOf[VertexId]) }
rawEdges: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId)] = MapPartitionsRDD[3] at map at <console>:29
scala> val graph = Graph.fromEdgeTuples(rawEdges, 1.0)
graph: org.apache.spark.graphx.Graph[Double,Int] = org.apache.spark.graphx.impl.GraphImpl@621bb3c3
scala> val pr = graph.pageRank(0.001)
pr: org.apache.spark.graphx.Graph[Double,Double] = org.apache.spark.graphx.impl.GraphImpl@55e444b1
scala> pr.vertices.saveToPhoenix("EMAIL_ENRON_PAGERANK", Seq("ID", "RANK"), zkUrl = Some("10.10.113.45"))(这一步会很耗内存,可能有的同学在测试的时候会报OOM,建议增大spark中executor memory,driver memory的大小)
我们在去phoenix中查看一下结果:
0: jdbc:phoenix:10.10.113.45:2181> select count(*) from EMAIL_ENRON_PAGERANK;
+------------------------------------------+
| COUNT(1) |
+------------------------------------------+
| 29000 |
+------------------------------------------+
1 row selected (0.113 seconds)
0: jdbc:phoenix:10.10.113.45:2181> SELECT * FROM EMAIL_ENRON_PAGERANK ORDER BY RANK DESC LIMIT 5;
+------------------------------------------+------------------------------------------+
| ID | RANK |
+------------------------------------------+------------------------------------------+
| 273 | 117.18141799210386 |
| 140 | 108.63091596789913 |
| 458 | 107.2728800448782 |
| 588 | 106.11840798585399 |
| 566 | 105.13932886531066 |
+------------------------------------------+------------------------------------------+
5 rows selected (0.568 seconds)
友链:逍遥乐IT博客