本文概览:介绍了在本地调试Kafka Eagle和进行二次开发。
1 源码本地部署
1.1 部署步骤
STEP1 修改配置文件
在kafka-eagle-common模块下面的resources/systme-config.properties文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
###################################### # multi zookeeper&kafka cluster list ###################################### #kafka.eagle.zk.cluster.alias=cluster1,cluster2 #cluster1.zk.list=tdn1:2181,tdn2:2181,tdn3:2181 #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181 kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=localhost:2181 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size=25 ###################################### # kafka eagle webui port ###################################### kafka.eagle.webui.port=8048 ###################################### # kafka offset storage ###################################### kafka.eagle.offset.storage=kafka ###################################### # alarm email configure ###################################### kafka.eagle.mail.enable=true kafka.eagle.mail.sa=alert_sa kafka.eagle.mail.username=alert_sa@163.com kafka.eagle.mail.password=mqslimczkdqabbbg kafka.eagle.mail.server.host=smtp.163.com kafka.eagle.mail.server.port=25 ###################################### # delete kafka topic token ###################################### kafka.eagle.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### kafka.eagle.sasl.enable=false kafka.eagle.sasl.protocol=SASL_PLAINTEXT kafka.eagle.sasl.mechanism=PLAIN kafka.eagle.sasl.client=/hadoop/kafka-eagle/conf/kafka_client_jaas.conf ###################################### # kafka jdbc driver address ###################################### kafka.eagle.driver=com.mysql.jdbc.Driver kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=53061208 |
该配置文件信息可以参考:
STEP2 tomcat配置
(1)Add new Configuration
选择war包
(2)设置Application context,设置为“/ke”
STEP3 启动本地zookeeper
1 |
sh bin/zkServer.sh start |
STEP4 启动本地kafka.
1 |
./bin/kafka-server-start.sh config/server.properties |
STEP5 在IDEA中启动kafka-eagle
1.2 部署遇到问题
1、在我们项目中在kafka-eagle源码中包了一层(左边是项目,右边是eagle源码),如下关系。
(1)在我们调试源码或者修改源码时,需要我们只打开源码模块,如下图中右边工程。
(2)此时可能遇到无法正常使用git,需要将项目艮目录的.idea/vcs.xml拷贝到源码的.idea目录下面。
2、Intellij中出现:
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project kafka-eagle-common: Compilation failure: Compilation failure:
[ERROR] /Users/wuzhonghu/Project/baidu/baidu/financial-microservice/kafka-eagle/kafka-eagle/kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/protocol/KafkaSqlInfo.java:[34,57] -source 1.5 中不支持 diamond 运算符
[ERROR] (请使用 -source 7 或更高版本以启用 diamond 运算符)
(1)进入:File->ProductStructure,修改如下内容:
- Modules设置。每一个Module块Source部分的Language Level,修改为8-Lambda,type annotation etc.
- Project设置。修改Lanage Level
- SDKs设置
(2)选择IntllijIdea->Preferences
(3) 在kafka-eagle-core、kafka-eagle-web等每一个模块的xml中修改xml的文件
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> |
2 tar和源码的目录结构
kafka-eagle-bin-1.2.1.tar.gz的目录结构如下
(1)关于kafka-eagle-bin-1.2.1的目录
这个目录是和代码中如下目录对应。
在的源码执行“mvn clean package” 会在kafak-eagle-web模块的target目录下生产两个文件,其中就包含一个.tar.gz文件
(2)关于上面的kms目录。这个目录和tomcat目录保持一致
(3)关于上面webapps/ke目录
它是解压webapps/ke.war文件得到的(ke.war是源码的war包,不包含上面的tomcat的文件信息),参考ke.sh代码如下。
1 2 |
cd $KE_HOME/kms/webapps/ke ${JAVA_HOME}/bin/jar -xvf $KE_HOME/kms/webapps/ke.war |
3 二次开发
3.1 二次开发的步骤
STEP1 修改源码
STEP2 执行mvn clean package,结果如下
STEP3 替换服务器的key.war
使用上面重新编码的key.war,替换服务器key.war,服务器key.war的目录在:xxxx/kafka-eagle/kms/webapps。
如果是修改了配置文件,或者修改了tomcat的一些配置,那么此时可以替换kafka-eagle-bin-1.1.9.tar.gz,然后重新解压缩。
STEP4 关闭服务
1 |
sh ke.sh stop |
STEP5 重新启动ke.sh
1 |
sh ke.sh start |
3.2 增加一个无权限控制的添加报警接口
只需在权限验证时,把相应url去掉。修改SSOFilter中dofilter逻辑,实现对于包含“/services”的 url都不进行权限验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HttpServletRequest req = (HttpServletRequest) request; HttpServletResponse resp = (HttpServletResponse) response; String requestUri = req.getRequestURI(); if (requestUri.contains("/account/signin/action")) { String username = request.getParameter("username"); String password = request.getParameter("password"); Signiner signinerChk = accountService.login(username, password); if (!signinerChk.getUsername().equals(KConstants.Login.UNKNOW_USER)) { SSOAuthenticationToken token = new SSOAuthenticationToken(signinerChk.getRtxno(), signinerChk.getRealname(), signinerChk.getUsername(), signinerChk.getEmail()); SecurityUtils.getSubject().login(token); } } else if (requestUri.contains("/services")) { // 对于包含“/services接口”的url都不进行权限验证 // do nothing } else { Signiner signiner = (Signiner) getSession().getAttribute(KConstants.Login.SESSION_USER); if (signiner == null) { if (req.getHeader("x-requested-with") != null && req.getHeader("x-requested-with").equalsIgnoreCase("XMLHttpRequest")) { resp.setHeader("sessionstatus", "timeout"); return; } resp.sendRedirect("/ke/account/signin?" + requestUri); return; } } chain.doFilter(request, response); } |
3.3 相关问题
1、为什么请求URL中带有ke,而contorller不带ke?
1 2 |
# 一个请求URL http://localhost:8080/ke/alarm/topic/ajax |
请求中ke是在js或者前端页面中加上的,ke表示app的名字,ke后面的”/alarm/topic/ajax”才是真正的controller的@RequestMapping定义的请求的名字。