0%

mirrormaker是kafka自带的topic复制工具,可以让两个集群之间的数据互相导入同步。 很多时候,我们需要把其中一个集群的某个topic数据,复制到另一个集群,但使用不同名字的topic。
官方好像并没有提供这样功能,但留了一个message.handler的入口,我们可以通过额外的插件来实现这样的功能。

首先放上这个插件地址: https://github.com/opencore/mirrormaker_topic_rename.git

使用方法:

  1. 直接下载编译好的jar文件:

    1
    2
    wget https://github.com/opencore/mirrormaker_topic_rename/files/2024649/mirrormaker_topic_rename.zip
    unzip mirrormaker_topic_rename.zip
  2. 添加环境变量:

    1
    2
    3
    4
    5
    6
    7
    8
    # 由于执行mirrormaker可能需要导出比较大的数据量,所以建议适当加些内存。这个根据需要来调试。
    export KAFKA_HEAP_OPTS="-Xmx3G"
    # 2.这个必写,很关键
    export CLASSPATH=/mirrormaker_topic_rename/target/mmchangetopic-1.0-SNAPSHOT.jar
    # 3. 源kafka集群 后面均用其来代替
    export SOURCE_KAFKAS=xxx:9092
    # 4. 目标kafka集群 后面均用其来代替
    export SOURCE_KAFKAS=xxx:9092
    阅读全文 »

目标

目前已有mysql和kafka,需要利用Debezium将mysql的数据同步到kafka持久化保存,之后再通过jdbc sink导出kafka中的数据至其他mysql

Debezium是一个分布式平台,它可以将你现有的数据库变成事件流,因此应用程序可以看到并立即响应数据库中的每个行级变化。

Debezium建立在Apache Kafka之上,并提供Kafka Connect兼容的连接器,可以监控特定的数据库管理系统。Debezium在Kafka日志中记录数据变化的历史,从你的应用程序消费它们的地方。这使得您的应用程序可以轻松地正确和完整地消费所有的事件。即使你的应用程序意外停止,它也不会错过任何东西:当应用程序重新启动时,它将继续消耗它离开的事件。

准备工作

开始前需要准备好需要的资源

  1. kafka 集群 (这里使用的是aws msk)
  2. MYSQL 集群
  3. kafka以及mysql的客户端命令行工具
  4. kafka connect
阅读全文 »

说明

tengine 配合lua性能没话说,但有时候会遇到很多包和库需要自己编译下载,这里列出常用的以备不时之需。

1、官方源码包
wget http://tengine.taobao.org/download/tengine-2.3.0.tar.gz

2、基础依赖库
http://luajit.org/download/LuaJIT-2.0.5.tar.gz
https://www.openssl.org/source/openssl-1.0.2q.tar.gz
https://github.com/maxmind/geoip-api-c/releases/download/v1.6.12/GeoIP-1.6.12.tar.gz
https://sourceforge.net/projects/pcre/files/pcre/8.42/pcre-8.42.tar.gz
https://github.com/openresty/lua-cjson/archive/2.1.0.6.tar.gz
https://github.com/libgd/libgd/releases/download/gd-2.2.5/libgd-2.2.5.tar.gz
https://github.com/yaoweibin/nginx_tcp_proxy_module/archive/v0.4.5.zip

3、第三方模块
1)需静态编译的C模块
https://github.com/simplresty/ngx_devel_kit/archive/v0.3.0.tar.gz
https://github.com/openresty/array-var-nginx-module/archive/v0.05.tar.gz
https://github.com/calio/form-input-nginx-module/archive/v0.12.tar.gz
https://github.com/openresty/encrypted-session-nginx-module/archive/v0.08.tar.gz
https://github.com/calio/iconv-nginx-module/archive/v0.14.tar.gz
https://github.com/openresty/lua-nginx-module/archive/v0.10.13.tar.gz
https://github.com/openresty/set-misc-nginx-module/archive/v0.32.tar.gz

阅读全文 »

当kafka添加broker时,我们需要重新分配partition,使其流量平衡,这里整理出相关的命令以供日常使用。
说明: 这里以topic test和test1为例。

  1. 前提变量

    1
    2
    export ZOOKEEPERS=zookeeper:2181 #自己根据需要改,新版本基本上用不到这个
    export KAFKAS=kafka:9092 #自己根据需要改
  2. 修改topic的partitions

    1
    2
    ./kafka-topics.sh --zookeeper $ZOOKEEPERS --alter --topic test --partitions 6
    ./kafka-topics.sh --zookeeper $ZOOKEEPERS --alter --topic test1 --partitions 6
  3. 数据迁移–生成迁移计划json文件

    手动生成一个json文件topic.json

    1
    2
    3
    4
    5
    6
    7
     {
    "topics": [
    {"topic": "test"},
    {"topic": "test1"}
    ],
    "version": 1
    }

    使用 –generate生成迁移计划,将test和test1的partition均匀扩充到所有机器上

    1
    ./kafka-reassign-partitions.sh  --zookeeper $ZOOKEEPERS --topics-to-move-json-file topic.json  --broker-list  "1,2,3,4,5,6" --generate

    生成的结果类似于这样:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    Current partition replica assignment

    {
    "version":1,
    "partitions":[.....]
    }
    Proposed partition reassignment configuration

    {
    "version":1,
    "partitions":[.....]
    }

    我们将 “Proposed partition reassignment configuration” 后面的json复制下来生成reassignment.json 这个文件

    1
    2
    3
    4
    5
    6
    #reassignment.json

    {
    "version":1,
    "partitions":[.....]
    }
  4. 数据迁移–执行扩容

    利用上一步生成的reassignment.json执行扩容命令:

    1
    ./kafka-reassign-partitions.sh --zookeeper $ZOOKEEPERS --reassignment-json-file reassignment.json --execute
  5. 验证结果

    1
    ./kafka-reassign-partitions.sh --zookeeper $ZOOKEEPERS --reassignment-json-file reassignment.json --verify

    大概长这个样子:

    1
    2
    3
    4
    5
    ...

    Reassignment of partition test-x completed successfully
    Reassignment of partition test1-x completed successfully
    ...

WordPress 官方默认 Docker 是基于 Apache 来做的,但为了自动加上 SSL,我用了一个 Nginx 容器来做反向代理。于是问题出现了:用 HTTPS 访问 Nginx,生成出来的网页里面所有生成的 URL 都是 HTTP,而不是 HTTPS。

1.修改数据库地址http改为https:

1
UPDATE wp_options SET option_value = replace(option_value, 'http://xxx.com, 'https://xxx.com') WHERE option_name = 'home' OR option_name = 'siteurl';

2.在 wp-config.php 里面加上:

1
2
3
4
if((!empty( $_SERVER['HTTP_X_FORWARDED_HOST'])) || (!empty( $_SERVER['HTTP_X_FORWARDED_FOR'])) ) {
$_SERVER['HTTP_HOST'] = $_SERVER['HTTP_X_FORWARDED_HOST'];
$_SERVER['HTTPS'] = 'on';
}

需求和实现方式

使用vpn相比代理方式的确会稳定,但是常常会导致客户端网络改变而造成一些问题,所以我们更希望同时具有vpn的稳定以及基于不同网络访问请求而走不同流量的策略。我们可以通
过 dnsmasq+ipset区分出不同域名的不同ip,并且对ip进行分组,然后使用iptable 对不同组的ip打上不同的标签,最后再走不同的路由表实现策略路由。 基本的流程图如下:

graph LR
    正常流量 --> dnsmasq --> ipset --> iptables --> iproute --> 流量转发

配置流程

  1. 创建 ipset

    这里创建两个:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    #安装
    apt-get update && apt-get install -y ipset

    #配置(使用这个即可 下面的都是一些常用命令)
    ipset create gfwlist2 hash:ip
    ipset create gfwlist3 hash:ip

    #可以通过以下命令查看
    ipset list

    # 保存
    ipset save | tee /etc/ipset.conf
    阅读全文 »

前言

为什么会有这个需求?

kafka consumer 消费会存在延迟情况,我们需要查看消息堆积情况,就是所谓的消息Lag。目前是市面上也有相应的监控工具KafkaOffsetMonitor,我们自己也写了一套监控kmanager。但是随着kafka版本的升级,消费方式也发生了很大的变化,因此,我们需要重构一下kafka offset监控。

如何计算Lag?
在计算Lag之前先普及几个基本常识

LEO(LogEndOffset): 这里说的和官网说的LEO有点区别,主要是指堆consumer可见的offset.即HW(High Watermark)

CURRENT-OFFSET: consumer消费到的具体位移

知道以上信息后,可知Lag=LEO-CURRENT-OFFSET。计算出来的值即为消费延迟情况。

官方查看方式

这里说的官方查看方式是在官网文档中提到的,使用官方包里提供的bin/kafka-consumer-groups.sh

最新版的工具只能获取到通过broker消费的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ bin/kafka-consumer-groups.sh --describe --bootstrap-server 192.168.0.101:8092 --group test
Consumer group 'test' has no active members.

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
truman_test_offset 2 1325 2361 1036 - - -
truman_test_offset 6 1265 2289 1024 - - -
truman_test_offset 4 1245 2243 998 - - -
truman_test_offset 9 1310 2307 997 - - -
truman_test_offset 1 1259 2257 998 - - -
truman_test_offset 8 1410 2438 1028 - - -
truman_test_offset 3 1225 2167 942 - - -
truman_test_offset 0 1218 2192 974 - - -
truman_test_offset 5 1262 2252 990 - - -
truman_test_offset 7 1265 2277 1012 - - -

真正开始

前面都是万能网友提供的一些基础知识和介绍,现在正式开始我们的解决方案。 我们采用 kafka-lag-exporter + prometheus + grafana 的方式, 原因很简单,我们的大部分业务都在k8s上,所有主流的监控都在上面。 kafka-lag-exporter 也不需要安装额外的插件,只需要能连接到kafka即可。 流程也很简单。

graph LR
    kafka --> kafka-lag-exporter --> prometheus --> grafana

这个项目介绍的极其详细,支持k8s helm 部署。 由于我这里暂时还不需要部署集群直接使用examples/standalone 提供的脚本部署。

修改application.conf的相关信息 例如:bootstrap-brokers。 然后直接执行 run-docker.sh 即可。默认暴露端口为8000。

之后修改 Prometheus 的配置文件: 添加job

1
2
3
4
5
6
7
8
9
...
scrape_configs:
- job_name: 'kafka-lag-exporter'
scrape_interval: 10s
static_configs:
- targets: ['xxx:8000']
labels:
job: 'kafka-lag-exporter'
...

最后使用该项目提供的 dashboard json 文件上传到grafana 就可以了。

简单介绍

ocserv 是目前比较常用的vpn软件。这里完整记录下从零开始搭建过程。

名称 说明
操作系统 ubuntu 16.04 所用到的基础image

其实没啥需求,直接装就行。这里不废话直接给出命令,搞成脚本一件执行即可。

安装ocsserv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#依赖的安装
apt-get update
apt-get -y install wget git dh-autoreconf libgnutls28-dev libev-dev libwrap0-dev libpam0g-dev liblz4-dev libseccomp-dev libreadline-dev libnl-route-3-dev libnl-route-3-dev libkrb5-dev libradcli-dev libprotobuf-c-dev libtalloc-dev libhttp-parser-dev libpcl1-dev protobuf-c-compiler gperf liblockfile-bin nuttcp lcov libuid-wrapper libpam-wrapper libnss-wrapper libsocket-wrapper gss-ntlmssp haproxy iputils-ping freeradius gawk yajl-tools

#下载源代码
git clone https://gitlab.com/openconnect/ocserv.git

#编译安装
cd ocserv && autoreconf -fvi && ./configure && make
cp src/ocserv /usr/sbin/ocserv
cp src/ocserv-worker /usr/sbin/ocserv-worker
cp src/ocserv-fw /usr/bin/ocserv-fw
cp src/occtl/occtl /usr/bin/
cp src/ocpasswd/ocpasswd /usr/bin/
cp doc/systemd/standalone/ocserv.service /etc/systemd/system/ocserv.service

#内核参数调优
echo "net.ipv4.ip_forward = 1" >> /etc/sysctl.conf
echo "net.core.default_qdisc = fq" >> /etc/sysctl.conf
echo "net.ipv4.tcp_congestion_control = bbr" >> /etc/sysctl.conf
sysctl -p
阅读全文 »

kafka速度快的原因

我们都知道Kafka非常快,比绝大多数的市场上其他消息中间件都要快。这里来研究下那么为什么Kafka那么快(当然不会是因为它用了Scala)。
Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间。
但是实际上,Kafka其中一个特性却是高吞吐率,即使是普通的服务器,Kafka也能轻松支持每秒百万级的写入请求,超过了大部分的消息中间件。这种特性使得Kafka在日志处理等海量数据场景中应用广泛。那么为什么Kafka速度那么快,可以从数据写入和数据读取两方面来分析。

Kafka的数据写入(生产者)

生产者(Producer)是负责向Kafka提交数据的,Kafka会把收到的消息都写入到磁盘中,因此可以认为它绝对不会丢失数据。

而为了优化写入速度,Kafka采用了两种技术,一种是顺序写入,一种是MMFile。

顺序写入

磁盘读写的快慢取决于你怎么使用它,一般可以分为顺序读写或者随机读写。

因为硬盘是机械结构,每次读写都会经过一个【寻址->写入】的过程,其中的寻址是一个十分耗时的机械动作,所以硬盘最讨厌随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用的顺序I/O。而且Linux对于磁盘的读写优化也比较多,包括read-ahead、write-behind和磁盘缓存等。更多的,对Java的内存管理和垃圾回收会有优化,因为如果在内存做这些操作的时候,一个会导致Java对象的内存开销很大,另一个是随着堆内存数据的增多,Java的GC时间会变得很长。

因此可以总结出使用磁盘操作有以下几个好处:

1.磁盘顺序读写速度超过内存随机读写。

2.JVM的GC效率低,内存占用大,使用磁盘可以避免这一问题。

3.系统冷启动后,磁盘上的缓存依然可用(内存一旦关机数据就会清空,持久化到磁盘上则不会)。

阅读全文 »

整理了一些,以供自己日常使用:

  1. 前提变量

    1
    2
    export ZOOKEEPERS=zookeeper:2181 #自己根据需要改,新版本基本上用不到这个
    export KAFKAS=kafka:9092 #自己根据需要改
  2. 列出所有Topic:

    1
    kafka-topics.sh  --describe --bootstrap-server $KAFKAS
  3. 创建一个topic

    1
    kafka-topics.sh  --create --topic test --bootstrap-server $KAFKAS --partitions 3 --replication-factor 2
  4. 消费者订阅主题

    1
    kafka-console-consumer.sh --bootstrap-server $KAFKAS --topic test
  5. 生产者向topic发送消息

    1
    kafka-console-producer.sh --broker-list $KAFKAS  --topic test
  6. 修改topic配置

    1
    kafka-configs.sh --zookeeper $ZOOKEEPERS --entity-type topics --entity-name test   --alter --add-config compression.type=producer retention.ms=86400000
  7. 压力测试

    1
    2
    3
    4
    5
    #生产者
    kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --print-metrics --topic test --num-records 6000000 --throughput 100000 --record-size 100 --producer-props bootstrap.servers=$KAFKAS buffer.memory=67108864 batch.size=32768 linger.ms=20 acks=1

    #消费者
    kafka-consumer-perf-test.sh --broker-list $KAFKAS --messages 1000000 --threads 1 --topic test --print-metrics
  1. 下载kafka安装包
    1
    2
    3
    #image: openjdk:8-slim-buster
    apt-get update && apt-get install -y wget curl net-tools vim procps
    wget https://archive.apache.org/dist/kafka/2.4.1/kafka_2.12-2.4.1.tgz && tar -xzf kafka_2.12-2.4.1.tgz