I Use This!
Moderate Activity

News

Analyzed about 5 hours ago. based on code collected about 11 hours ago.
Posted over 4 years ago by Art van Scheppingen
TL; DR SELECT … FOR UPDATE has a (not so) surprising side effect on non-existent rows: it could cause a (serious) performance penalty and even prevent you from inserting new rows at all. Locking rows for update A development team of ours was working ... [More] on an application that needed to ensure an update on a single row item isn’t modified by another transaction. Naturally they started making use of SELECT … FOR UPDATE to lock the row before updating it. This worked excellent to keep anyone else from updating this row. However they started to get some lock wait timeouts on new inserts of totally unrelated items during a load test and they asked me to look into this. SELECT … FOR UPDATE is described as following in the MySQL documentation:A SELECT ... FOR UPDATE reads the latest available data, setting exclusive locks on each row it reads. Thus, it sets the same locks a searched SQL UPDATE would set on the rows. So far so good: this behavior is expected to happen. It also doesn’t mention anything about locking anything but the rows it reads. I asked the team whether they were attempting to insert the same data as they were locking in the other transaction and they said they were not. In pseudo code they were doing this:SELECT ... WHERE uuid='some-uuid' FOR UPDATE; if row { UPDATE row } else { INSERT row } The uuid column here is the primary key of the table. This code executed fine and had no issues by itself as a single transaction. You may wonder why not use the INSERT … ON DUPLICATE KEY UPDATE or REPLACE INTO? First of all we are inserting only occasionally, so that would fail the insert command 99% of the time. Second of all we may only be updating a single column within a single row, so that implies we would need to know the entire row up front when we have to insert or replace the row. No row to update Now what would happen if there is no row to update? According to the description in the MySQL documentation it sets an exclusive lock on each row it reads, but what about when there is no row to read? This other excerpt on gap locks might hint what it actually does do:For example, SELECT c1 FROM t WHERE c1 BETWEEN 10 and 20 FOR UPDATE; prevents other transactions from inserting a value of 15 into column t.c1, whether or not there was already any such value in the column, because the gaps between all existing values in the range are locked. If there is no row, a lock will still be set on the associated index entries. This shouldn’t be bad, right? We can still insert other rows, right? Wrong: it isn’t a gap lock alone, but a next-key lock! Since the lock is set on a non-existent index entry, a next-key lock is created. Depending on where you would insert in the index, you may find a whole range being locked as it needs to insert within this range. In our version of UUID this shouldn’t happen very often as there is a random factor, but it still can happen often if you only have a few rows in your table. In that case the gaps between UUIDs is large and due to the random factor yu may end up locking large gaps within the index. As cross region latency is present on this system, this keeps the next-key locks open longer and the chance of a collision in gap increases also a little bit. So that explains the behavior during the load test. So all’s well, ends well? Nasty side effect of Next-Key locks There is one nasty side effect with the next-key lock: if the index value would be greater than the largest value in the table it locks everything above the largest value until infinity. So what would happen to a table where the primary key is sequential like an integer? For example this table with the following rows:CREATE TABLE sometable ( id int(11) NOT NULL, some_data varchar(255) NOT NULL default '', PRIMARY KEY (some_id) ); INSERT INTO sometable VALUES (1, 'test'), (2, 'test'), (3, 'test'), (4, 'test'), (5, 'test'), (10, 'test'), (11, 'test'), (12, 'test'), (13, 'test'), (14, 'test'), (15, 'test'); This would create a gap between 5 and 10 and a gap from 15 till infinity. When we are selecting within the gap between 5 and 10, we create a next-key lock between 5 and 10 and we can’t insert new rows inside this gap. We can still insert new rows at the end of the table though. However if we would select a row on id greater than 15 we would put a next-key lock on 15 till infinity. This means nobody can append anything to this table anymore until we have committed our transaction! This could become a serious bottleneck if you insert more rows than update. Conclusion I wasn’t aware of the fact that SELECT … FOR UPDATE actually locked the index for rows that don’t exist yet. I would have reasoned that if there were no rows to be selected, there wouldn’t be anything to lock. And if there wouldn’t be anything to lock, the whole SELECT … FOR UPDATE would simply fizzle. Even though SELECT … FOR UPDATE sounds like a great way to ensure your transaction is the only one who modifies a specific row, it’s quite dangerous as it could lock out or delay other transactions. If you would take our example above, the safe way to do it (in pseudo code) is:SELECT ... WHERE uuid='some-uuid'; if row { SELECT ... WHERE uuid='some-uuid' FOR UPDATE; UPDATE row } else { INSERT row } This would ensure the lock would only be set when there actually is a row present, however this is at the expense of an additional query. Edit: added that UUID gaps can be significant [Less]
Posted over 4 years ago by MySQL Performance Blog
In a high concurrency world, where more and more users->connections->threads are used, contention is a given. But how do we identify the contention point easily? Different approaches had been discussed previously, like the one using Perf and ... [More] Flame graphs to track down the function taking way more time than expected. That method is great but how can we do it with what one normally has, like the MySQL Client? Enter: the SEMAPHORES section from the SHOW ENGINE INNODB STATUS command output. SEMAPHORES The SEMAPHORES section displays all the metrics related to InnoDB mechanics on waits. This section is your best friend if you have a high concurrency workload. In short, it contains 2 kinds of data: Event counters and a list of current waits. Current Waits That is a section that should be empty unless your MySQL has a high concurrency that causes InnoDB to start using the waiting mechanism. If you don’t see lines with the form “”– Thread was waited…” then you are good. No contention. Now, what does it look like? It could be like: ---------- SEMAPHORES ---------- OS WAIT ARRAY INFO: reservation count 1744351 --Thread 139964395677440 has waited at btr0cur.cc line 5889 for 0 seconds the semaphore: S-lock on RW-latch at 0x7f4c3d73c150 created in file buf0buf.cc line 1433 a writer (thread id 139964175062784) has reserved it in mode exclusive number of readers 0, waiters flag 1, lock_word: 0 Last time read locked in file btr0sea.cc line 1121 Last time write locked in file /mnt/workspace/percona-server-5.7-redhat-binary-rocks-new/label_exp/min-centos-7-x64/test/rpmbuild/BUILD/percona-server-5.7.28-31/percona-server-5.7.28-31/storage/innobase/btr/btr0sea.cc line 1121 OS WAIT ARRAY INFO: signal count 1483499 RW-shared spins 0, rounds 314940, OS waits 77827 RW-excl spins 0, rounds 205078, OS waits 7540 RW-sx spins 4357, rounds 47820, OS waits 949 Spin rounds per wait: 314940.00 RW-shared, 205078.00 RW-excl, 10.98 RW-sx or like ---------- SEMAPHORES ---------- OS WAIT ARRAY INFO: reservation count 1744302 --Thread 139964401002240 has waited at row0ins.cc line 2520 for 0 seconds the semaphore: X-lock (wait_ex) on RW-latch at 0x7f4c3d956890 created in file buf0buf.cc line 1433 a writer (thread id 139964401002240) has reserved it in mode wait exclusive number of readers 1, waiters flag 0, lock_word: ffffffffffffffff Last time read locked in file row0sel.cc line 3869 Last time write locked in file /mnt/workspace/percona-server-5.7-redhat-binary-rocks-new/label_exp/min-centos-7-x64/test/rpmbuild/BUILD/percona-server-5.7.28-31/percona-server-5.7.28-31/storage/innobase/row/row0upd.cc line 2881 OS WAIT ARRAY INFO: signal count 1483459 RW-shared spins 0, rounds 314905, OS waits 77813 RW-excl spins 0, rounds 204982, OS waits 7532 RW-sx spins 4357, rounds 47820, OS waits 949 Spin rounds per wait: 314905.00 RW-shared, 204982.00 RW-excl, 10.98 RW-sx Or incredibly long (not showing here). The way that particular section was monitored is through the execution of an infinite loop: while true; do mysql -N -e"show engine innodb status\G" | sed -n '/SEMAPHORES/,/TRANSACTIONS/p'; sleep 1; done But what should you do with that info? Looking for Info From the current waits, what we need is the following: The exact version of MySQL (and flavor: Percona Server, Oracle’s MySQL, MariaDB) Filename File line Let’s use the first example, which is a server that experienced high concurrency for a while as seen in Percona Monitoring and Management: One can say: “but there’s only a peak of 42 threads and the majority of the throughput distribution is on the low concurrency side!” This is a 2 core VM with small physical memory and thus a pretty small buffer pool. An average of 22 for threads running is high concurrency. Now, from the SEMAPHORES output of the first example we have: --Thread 139964395677440 has waited at btr0cur.cc line 5889 for 0 seconds the semaphore: S-lock on RW-latch at 0x7f4c3d73c150 created in file buf0buf.cc line 1433 a writer (thread id 139964175062784) has reserved it in mode exclusive number of readers 0, waiters flag 1, lock_word: 0 Last time read locked in file btr0sea.cc line 1121 Last time write locked in file /mnt/workspace/percona-server-5.7-redhat-binary-rocks-new/label_exp/min-centos-7-x64/test/rpmbuild/BUILD/percona-server-5.7.28-31/percona-server-5.7.28-31/storage/innobase/btr/btr0sea.cc line 1121 What’s the MySQL version? The last line tells us: Percona Server 5.7.28-31 What’s the file and line? btr0cur.cc line 5889 has waited for an S-Lock on an RW-latch created in buf0buf.cc line 1433 but another thread has reserver in mode exclusive in file btr0sea.cc line 1121. Ok, we have directions. Now let’s see what’s inside. Looking Inside the Code Do I need to download the source code for inspection? No, you don’t! What you need is to navigate to the code repository, which in this case is a GitHub repo. And here is where the exact version comes handy. In order to guarantee that we are reading the exact line code, we better make sure we are reading the exact version. Finding the Repository GitHub is pretty easy to navigate and in this case, what we are looking for is Release. 5.7.28-31 to be precise. The Percona Server repo URL is: https://github.com/percona/percona-server/. Once you are there, we need to find the release. Finding the Release The release can be found in the link showed in the below graph: Inside one can see the releases: Click the link and it will take you to the tag page: And finally, click the link shown above and you will be at the repo of the release needed: https://github.com/percona/percona-server/tree/Percona-Server-5.7.28-31 What’s next? Reading the actual content of the files. Navigating the Code Tree The relevant part of the code tree is: -root ---storage ------innobase The InnoDB storage engine code is inside the “innobase” directory. Inside that directory, there’s a bunch of other directories. But how do you choose the correct one? Well, the filename has the answer. The files we need to look are: btr0cur.cc btr0sea.cc buf0buf.cc All files have the same syntax: xxx0xxx.xx and the directory name is the part before the zero. In our case, we need to look inside two directories: btr and buf. Once inside the directories, finding the files is an easy task. These are our files in the mentioned line numbers: https://github.com/percona/percona-server/blob/Percona-Server-5.7.28-31/storage/innobase/btr/btr0sea.cc#L1121https://github.com/percona/percona-server/blob/Percona-Server-5.7.28-31/storage/innobase/buf/buf0buf.cc#L1433https://github.com/percona/percona-server/blob/Percona-Server-5.7.28-31/storage/innobase/btr/btr0cur.cc#L5889 The btr0sea.cc file as described in the head of the file is “The index tree adaptive search” a.k.a: the Adaptive Hash Index (AHI) The buf0buf.cc as described in the file is “The database buffer buf_pool” a.k.a: the InnoDB buffer Pool The btr0cur.cc as described in the file is “The index tree cursor” a.k.a: the actual B-Tree or where the data exists in InnoDB. What was Going on Then? At btr0cur.cc line 5889 InnoDB is inside a function called btr_estimate_n_rows_in_range_low,  and the description is documented as “Estimates the number of rows in a given index range” and the actual line is: btr_cur_search_to_nth_level(index, 0, tuple1, mode1, BTR_SEARCH_LEAF | BTR_ESTIMATE, &cursor, 0, __FILE__, __LINE__, &mtr); But what is that btr_cur_search_to_nth_level? In the same file, we can find the definition and it is described as “Searches an index tree and positions a tree cursor on a given level”. So basically, it is looking for a row value. But that operation is stalled because it needs to acquire a shared resource that in this case is the buffer pool: At buf0buf.cc line 1433, the buffer pool is trying to create a lock over a block rw_lock_create(PFS_NOT_INSTRUMENTED, &block->lock, SYNC_LEVEL_VARYING); That operation happens inside a function called “buf_block_init” (find it by scrolling up in the code) and is described as “Initializes a buffer control block when the buf_pool is created”. This is actually creating the buffer pool space after an innodb_buffer_pool_size modification and is delayed. At btr0sea.cc line 1121 the operation used was: if (!buf_page_get_known_nowait( latch_mode, block, BUF_MAKE_YOUNG, __FILE__, __LINE__, mtr)) { That line is inside the function btr_search_guess_on_hash is described as “Tries to guess the right search position based on the hash search info of the index.” So, it is using info from the AHI. And the buf_page_get_known_nowait definition is “This is used to get access to a known database page, when no waiting can be done” which is pretty much self-explanatory. So what do we have here? Contention on the AHI! Is this a problem? Let’s go back to the original line: --Thread 139964395677440 has waited at btr0cur.cc line 5889 for 0 seconds the semaphore: It says that it has waited for 0 seconds. Contention disappeared pretty fast in this case. Also, something very important to notice: this contention appeared only ONCE during the time range monitored. That means that it is not a constant issue, falling into the category of not a problem. It happened once and happened fast. But what if it was happening often? Then you should take action by increasing the buffer pool size, increasing the amount of AHI partitions, or even disabling the AHI entirely. All of those three options require testing, of course 🙂 In Conclusion The InnoDB code is pretty well documented to the extent that it helps in finding hot contention spots. Navigating the code is not an impossible task and the GitHub repositories make it pretty fast. Worth to mention is that in not every situation is it evident that the real problem and sometimes a deep understanding of the code comes handy. For those cases, Percona is here to help you to identify the issue and provide a solution. [Less]
Posted over 4 years ago by Mark Callaghan
Q: What is the best readahead size?A: O_DIRECTPerhaps I agree with Dr. Stonebraker. This is my answer which might not be the correct answer. My reasons for O_DIRECT are performance, quality of service (QoS) and manageability and performance might get ... [More] too much attention. I don't dislike Linux but the VM, buffered IO, readahead and page cache are there for all Linux use cases. They must be general purpose. Complex system software like a DBMS isn't general purpose and can do its own thing when needed. Also, I appreciate that kernel developers have done a lot to make Linux better for a DBMS. One of the perks at FB was easy access to many kernel developers.Most of my web-scale MySQL/InnoDB experience is with O_DIRECT. While InnoDB can use buffered IO we always chose O_DIRECT. Eventually, RocksDB arrived and it only did buffered IO for a few years. Then O_DIRECT support was added and perhaps one day the web-scale MyRocks team will explain what they use.I deal with readahead when running benchmarks and a common experience is using the wrong (too large) value and then repeating tests which means I spend more time and more SSD endurance thanks to buffered IO. I have many blog posts with performance results for readahead including at least one for MongoDB. Usually my goal was to find which small value is good enough. I learned that 0 is too small. Readahead can help scan-heavy workloads, but my focus is on OLTP where we avoided most scans except for logical backup.I understand why buffered IO is used by some DBMS. Early in the product lifecycle it can be a placeholder until more features are added to make O_DIRECT performant. The benefits of the OS page cache include: Filesystem readahead can be used before the DBMS adds support for prefetching when doing scans. But filesystem readahead is a black box, might differ between filesystems, provides no metrics and will do the wrong thing for some workloads. InnoDB provides a prefetch feature which can help when O_DIRECT is used. I disabled it because OLTP. The Facebook MySQL team (thanks Nizam) added logical readahead to make logical backup faster and more efficient. Filesystem readahead is likely to struggle with index structure fragmentation, so it is best suited for heap-organized tables and will suffer with index scans. Doing writes to the OS page cache followed by fsync can be used before the DBMS adds support for async IO or background write threads. But Postgres suffered for so long from this approach because calling fsync with an unknown amount of dirty pages in the OS page cache can starve all other pending IO requests for many seconds. The situation is less dire today thanks to work by Jens Axboe to make writeback less annoying. There was much discussion in 2014 at a summit that included Postgres and Linux kernel developers. In addition to Linux improvements, features have been added to Postgres to reduce the impact from writeback storms -- read this to learn about spread checkpoints. For a DBMS that does compression it is easier to use the DBMS cache for uncompressed pages and the OS page cache for compressed pages. I am familiar with amazing work in InnoDB to manage both in the DBMS cache. We all agree the code is very complex. RocksDB also has an option to cache both in its block cache but I have little experience with the feature. It is hard to figure out the best way to divide the DBMS cache between compressed and uncompressed pages. Performance advantages for O_DIRECT include: Does one memory copy to move data from storage to the DBMS while buffered needs two Avoids CPU and mutex contention overhead in the OS page cache Avoids wasting memory from double buffering between the DBMS cache and OS page cache QoS advantages for O_DIRECT include: Filesystem readahead is frequently wrong and either wastes IO or favors the wrong user leading to worse IO response times for other users OS page cache will get trashed by other services sharing the host Writeback storms starve other IO requests. Writeback is usually a background task and can tolerate response time variance. Too much writeback makes user reads and log fsync slower and those operations don't want response time variance. Reduces stalls - this is a placeholder because my local expert has yet to explain this in public. But you will have a better time with Linux when moving less data through the OS page cache, especially with modern storage devices that can sustain many GB/sec of throughput. And when you aren't having a good time then you can fix the DBMS. The DBMS is involved whether or not it relies on the OS page cache so you always have to make it work. Manageability advantages for O_DIRECT include: DBMS prefetch and writeback are documented, tunable and provide metrics. Such claims are less true for filesystem readahead and VM writeback. There is a lot of advice on the web and much disagreement especially on the topic of min_free_kbytes. Domas used to be my source on this but he doesn't blog enough about the Linux VM. [Less]
Posted over 4 years ago by Severalnines
This is the second part of a two-part series blog for Maximizing Database Query Efficiency In MySQL. You can read part one here. Using Single-Column, Composite, Prefix, and Covering Index Tables that are frequently receiving high traffic must be ... [More] properly indexed. It's not only important to index your table, but you also need to determine and analyze what are the types of queries or types of retrieval that you need for the specific table. It is strongly recommended that you analyze what type of queries or retrieval of data you need on a specific table before you decide what indexes are required for the table. Let's go over these types of indexes and how you can use them to maximize your query performance. Single-Column Index InnoD table can contain a maximum of 64 secondary indexes. A single-column index (or full-column index) is an index assigned only to a particular column. Creating an index to a particular column that contains distinct values is a good candidate. A good index must have a high cardinality and statistics so the optimizer can choose the right query plan. To view the distribution of indexes, you can check with SHOW INDEXES syntax just like below: root[test]#> SHOW INDEXES FROM users_account\G *************************** 1. row *************************** Table: users_account Non_unique: 0 Key_name: PRIMARY Seq_in_index: 1 Column_name: id Collation: A Cardinality: 131232 Sub_part: NULL Packed: NULL Null: Index_type: BTREE Comment: Index_comment: *************************** 2. row *************************** Table: users_account Non_unique: 1 Key_name: name Seq_in_index: 1 Column_name: last_name Collation: A Cardinality: 8995 Sub_part: NULL Packed: NULL Null: Index_type: BTREE Comment: Index_comment: *************************** 3. row *************************** Table: users_account Non_unique: 1 Key_name: name Seq_in_index: 2 Column_name: first_name Collation: A Cardinality: 131232 Sub_part: NULL Packed: NULL Null: Index_type: BTREE Comment: Index_comment: 3 rows in set (0.00 sec) You can inspect as well with tables information_schema.index_statistics or mysql.innodb_index_stats. Compound (Composite) or Multi-Part Indexes A compound index (commonly called a composite index) is a multi-part index composed of multiple columns. MySQL allows up to 16 columns bounded for a specific composite index. Exceeding the limit returns an error like below: ERROR 1070 (42000): Too many key parts specified; max 16 parts allowed A composite index provides a boost to your queries, but it requires that you must have a pure understanding on how you are retrieving the data. For example, a table with a DDL of... CREATE TABLE `user_account` ( `id` int(11) NOT NULL AUTO_INCREMENT, `last_name` char(30) NOT NULL, `first_name` char(30) NOT NULL, `dob` date DEFAULT NULL, `zip` varchar(10) DEFAULT NULL, `city` varchar(100) DEFAULT NULL, `state` varchar(100) DEFAULT NULL, `country` varchar(50) NOT NULL, `tel` varchar(16) DEFAULT NULL PRIMARY KEY (`id`), KEY `name` (`last_name`,`first_name`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1 ...which consists of composite index `name`. The composite index improves query performance once these keys are reference as used key parts. For example, see the following: root[test]#> explain format=json select * from users_account where last_name='Namuag' and first_name='Maximus'\G *************************** 1. row *************************** EXPLAIN: { "query_block": { "select_id": 1, "cost_info": { "query_cost": "1.20" }, "table": { "table_name": "users_account", "access_type": "ref", "possible_keys": [ "name" ], "key": "name", "used_key_parts": [ "last_name", "first_name" ], "key_length": "60", "ref": [ "const", "const" ], "rows_examined_per_scan": 1, "rows_produced_per_join": 1, "filtered": "100.00", "cost_info": { "read_cost": "1.00", "eval_cost": "0.20", "prefix_cost": "1.20", "data_read_per_join": "352" }, "used_columns": [ "id", "last_name", "first_name", "dob", "zip", "city", "state", "country", "tel" ] } } } 1 row in set, 1 warning (0.00 sec The used_key_parts show that the query plan has perfectly selected our desired columns covered in our composite index. Composite indexing has its limitations as well. Certain conditions in the query cannot take all columns part of the key. The documentation says, "The optimizer attempts to use additional key parts to determine the interval as long as the comparison operator is =, <=>, or IS NULL. If the operator is >, <, >=, <=, !=, <>, BETWEEN, or LIKE, the optimizer uses it but considers no more key parts. For the following expression, the optimizer uses = from the first comparison. It also uses >= from the second comparison but considers no further key parts and does not use the third comparison for interval construction…". Basically, this means that regardless you have composite index for two columns, a sample query below does not cover both fields: root[test]#> explain format=json select * from users_account where last_name>='Zu' and first_name='Maximus'\G *************************** 1. row *************************** EXPLAIN: { "query_block": { "select_id": 1, "cost_info": { "query_cost": "34.61" }, "table": { "table_name": "users_account", "access_type": "range", "possible_keys": [ "name" ], "key": "name", "used_key_parts": [ "last_name" ], "key_length": "60", "rows_examined_per_scan": 24, "rows_produced_per_join": 2, "filtered": "10.00", "index_condition": "((`test`.`users_account`.`first_name` = 'Maximus') and (`test`.`users_account`.`last_name` >= 'Zu'))", "cost_info": { "read_cost": "34.13", "eval_cost": "0.48", "prefix_cost": "34.61", "data_read_per_join": "844" }, "used_columns": [ "id", "last_name", "first_name", "dob", "zip", "city", "state", "country", "tel" ] } } } 1 row in set, 1 warning (0.00 sec) In this case (and if your query is more of ranges instead of constant or reference types) then avoid using composite indexes. It just wastes your memory and buffer and it increases the performance degradation of your queries. Prefix Indexes Prefix indexes are indexes which contain columns referenced as an index, but only takes the starting length defined to that column, and that portion (or prefix data) are the only part stored in the buffer. Prefix indexes can help lessen your buffer pool resources and also your disk space as it does not need to take the full-length of the column.What does this mean? Let's take an example. Let's compare the impact between full-length index versus the prefix index. root[test]#> create index name on users_account(last_name, first_name); Query OK, 0 rows affected (0.42 sec) Records: 0 Duplicates: 0 Warnings: 0 root[test]#> \! du -hs /var/lib/mysql/test/users_account.* 12K /var/lib/mysql/test/users_account.frm 36M /var/lib/mysql/test/users_account.ibd We created a full-length composite index which consumes a total of 36MiB tablespace for users_account table. Let's drop it and then add a prefix index. root[test]#> drop index name on users_account; Query OK, 0 rows affected (0.01 sec) Records: 0 Duplicates: 0 Warnings: 0 root[test]#> alter table users_account engine=innodb; Query OK, 0 rows affected (0.63 sec) Records: 0 Duplicates: 0 Warnings: 0 root[test]#> \! du -hs /var/lib/mysql/test/users_account.* 12K /var/lib/mysql/test/users_account.frm 24M /var/lib/mysql/test/users_account.ibd root[test]#> create index name on users_account(last_name(5), first_name(5)); Query OK, 0 rows affected (0.42 sec) Records: 0 Duplicates: 0 Warnings: 0 root[test]#> \! du -hs /var/lib/mysql/test/users_account.* 12K /var/lib/mysql/test/users_account.frm 28M /var/lib/mysql/test/users_account.ibd Using the prefix index, it holds up only to 28MiB and that's less than 8MiB than using full-length index. That's great to hear, but it doesn't mean that is performant and serves what you need.  If you decide to add a prefix index, you must identify first what type of query for data retrieval you need. Creating a prefix index helps you utilize more efficiency with the buffer pool and so it does help with your query performance but you also need to know its limitation. For example, let's compare the performance when using a full-length index and a prefix index. Let's create a full-length index using a composite index, root[test]#> create index name on users_account(last_name, first_name); Query OK, 0 rows affected (0.45 sec) Records: 0 Duplicates: 0 Warnings: 0 root[test]#> EXPLAIN format=json select last_name from users_account where last_name='Namuag' and first_name='Maximus Aleksandre' \G *************************** 1. row *************************** EXPLAIN: { "query_block": { "select_id": 1, "cost_info": { "query_cost": "1.61" }, "table": { "table_name": "users_account", "access_type": "ref", "possible_keys": [ "name" ], "key": "name", "used_key_parts": [ "last_name", "first_name" ], "key_length": "60", "ref": [ "const", "const" ], "rows_examined_per_scan": 3, "rows_produced_per_join": 3, "filtered": "100.00", "using_index": true, "cost_info": { "read_cost": "1.02", "eval_cost": "0.60", "prefix_cost": "1.62", "data_read_per_join": "1K" }, "used_columns": [ "last_name", "first_name" ] } } } 1 row in set, 1 warning (0.00 sec) root[test]#> flush status; Query OK, 0 rows affected (0.02 sec) root[test]#> pager cat -> /dev/null; select last_name from users_account where last_name='Namuag' and first_name='Maximus Aleksandre' \G PAGER set to 'cat -> /dev/null' 3 rows in set (0.00 sec) root[test]#> nopager; show status like 'Handler_read%'; PAGER set to stdout +-----------------------+-------+ | Variable_name | Value | +-----------------------+-------+ | Handler_read_first | 0 | | Handler_read_key | 1 | | Handler_read_last | 0 | | Handler_read_next | 3 | | Handler_read_prev | 0 | | Handler_read_rnd | 0 | | Handler_read_rnd_next | 0 | +-----------------------+-------+ 7 rows in set (0.00 sec) The result reveals that it's, in fact, using a covering index i.e "using_index": true and uses indexes properly, i.e. Handler_read_key is incremented and does an index scan as Handler_read_next is incremented. Now, let's try using prefix index of the same approach, root[test]#> create index name on users_account(last_name(5), first_name(5)); Query OK, 0 rows affected (0.22 sec) Records: 0 Duplicates: 0 Warnings: 0 root[test]#> EXPLAIN format=json select last_name from users_account where last_name='Namuag' and first_name='Maximus Aleksandre' \G *************************** 1. row *************************** EXPLAIN: { "query_block": { "select_id": 1, "cost_info": { "query_cost": "3.60" }, "table": { "table_name": "users_account", "access_type": "ref", "possible_keys": [ "name" ], "key": "name", "used_key_parts": [ "last_name", "first_name" ], "key_length": "10", "ref": [ "const", "const" ], "rows_examined_per_scan": 3, "rows_produced_per_join": 3, "filtered": "100.00", "cost_info": { "read_cost": "3.00", "eval_cost": "0.60", "prefix_cost": "3.60", "data_read_per_join": "1K" }, "used_columns": [ "last_name", "first_name" ], "attached_condition": "((`test`.`users_account`.`first_name` = 'Maximus Aleksandre') and (`test`.`users_account`.`last_name` = 'Namuag'))" } } } 1 row in set, 1 warning (0.00 sec) root[test]#> flush status; Query OK, 0 rows affected (0.01 sec) root[test]#> pager cat -> /dev/null; select last_name from users_account where last_name='Namuag' and first_name='Maximus Aleksandre' \G PAGER set to 'cat -> /dev/null' 3 rows in set (0.00 sec) root[test]#> nopager; show status like 'Handler_read%'; PAGER set to stdout +-----------------------+-------+ | Variable_name | Value | +-----------------------+-------+ | Handler_read_first | 0 | | Handler_read_key | 1 | | Handler_read_last | 0 | | Handler_read_next | 3 | | Handler_read_prev | 0 | | Handler_read_rnd | 0 | | Handler_read_rnd_next | 0 | +-----------------------+-------+ 7 rows in set (0.00 sec) MySQL reveals that it does use index properly but noticeably, there's a cost overhead compared to a full-length index. That's obvious and explainable, since the prefix index does not cover the whole length of the field values. Using a prefix index is not a replacement, nor an alternative, of full-length indexing. It can also create poor results when using the prefix index inappropriately. So you need to determine what type of query and data you need to retrieve. Covering Indexes Covering Indexes doesn't require any special syntax in MySQL. A covering index in InnoDB refers to the case when all fields selected in a query are covered by an index. It does not need to do a sequential read over the disk to read the data in the table but only use the data in the index, significantly speeding up the query. For example, our query earlier i.e.  select last_name from users_account where last_name='Namuag' and first_name='Maximus Aleksandre' \G As mentioned earlier, is a covering index. When you have a very well-planned tables upon storing your data and created index properly, try to make as possible that your queries are designed to leverage covering index so that you'll benefit the result. This can help you maximize the efficiency of your queries and result to a great performance. Leverage Tools That Offer Advisors or Query Performance Monitoring Organizations often initially tend to go first on github and find open-source software that can offer great benefits. For simple advisories that helps you optimize your queries, you can leverage the Percona Toolkit. For a MySQL DBA, the Percona Toolkit is like a swiss army knife.  For operations, you need to analyze how you are using your indexes, you can use pt-index-usage.  Pt-query-digest is also available and it can analyze MySQL queries from logs, processlist, and tcpdump. In fact, the most important tool that you have to use for analyzing and inspecting bad queries is pt-query-digest. Use this tool to aggregate similar queries together and report on those that consume the most execution time. For archiving old records, you can use pt-archiver. Inspecting your database for duplicate indexes, take leverage on pt-duplicate-key-checker. You might also take advantage of pt-deadlock-logger. Although deadlocks is not a cause of an underperforming and inefficient query but a poor implementation, yet it impacts query inefficiency. If you need table maintenance and requires you to add indexes online without affecting the database traffic going to a particular table, then you can use pt-online-schema-change. Alternatively, you can use gh-ost, which is also very useful for schema migrations. If you are looking for enterprise features, bundled with lots of features from query performance and monitoring, alarms and alerts, dashboards or metrics that helps you optimize your queries, and advisors, ClusterControl may be the tool for you. ClusterControl offers many features that show you Top Queries, Running Queries, and Query Outliers. Checkout this blog MySQL Query Performance Tuning which guides you how to be on par for monitoring your queries with ClusterControl. Conclusion As you've arrived at the ending part of our two-series blog. We covered here the factors that cause query degradation and how to resolve it in order to maximize your database queries. We also shared some tools that can benefit you and help solve your problems.   Tags:  query monitoring query tuning MySQL database performance performance monitoring [Less]
Posted over 4 years ago by MySQL Performance Blog
In the previous blog post of this series, MySQL Encryption: Talking About Keyrings, I described how keyrings work. In this post, I will talk about how master key encryption works and what the pros and cons are of using envelope encryption such as a ... [More] master key. The idea behind envelope encryption is that you use one key to encrypt multiple other keys. In InnoDB, this “one key” is the master encryption key and the “multiple other keys” are the tablespace keys. Those tablespace keys are the ones that are actually used to encrypt tablespaces. Graphically it can be presented like this:   The master key resides in the keyring, while encrypted tablespace keys reside in tablespace headers (written on page 0 of a tablespace). In the picture above: Table A is encrypted with key 1. Key 1 is encrypted with the master key and stored (encrypted) in Table A’s header. Table B is encrypted with key 2. Key 2 is encrypted with the master key and stored (encrypted) in Table B’s header. And so on. When a server wants to decrypt Table A, it fetches the master key from the keyring, reads the encrypted key 1 from Table A’s header, and decrypts the key 1. The decrypted key 1 is cached in server memory and used to decrypt Table A. InnoDB In InnoDB, the actual encryption/decryption is done in the I/O layer of the server. So just before a page is flushed to disk it gets encrypted, and also, just after an encrypted page is read from a disk, it gets decrypted. Encryption in InnoDB works only on the tablespace level. Normally when you create a standalone table you create a file-per-table tablespace (ON by default). So what you actually are creating is a tablespace that can contain only one table. You can also create a table that belongs to a general tablespace. Either way, your table always resides inside some tablespace. Since encryption works on tablespace level, a tablespace can be either fully encrypted or fully un-encrypted. So you cannot have some tables inside general tablespace encrypted, and some not. If for some reason you have file-per-table disabled, then all the standalone tables are actually created inside system tablespace. In Percona Server for MySQL, you can encrypt system tablespace by specifying (variable innodb_sys_tablespace_encrypt) during bootstrap or using encryption threads (still an experimental feature). In MySQL, you cannot ;). Before we go any further we need to understand how the master key ID is built. It consists of UUID, KEY_ID, and prefix INNODBKey. It looks like this: INNODBKey-UUID-KEY_ID. UUID is the server’s uuid in which tablespace was encrypted. KEY_ID is just an ever-increasing value. When the first master key is created this KEY_ID is equal to 1. Then when you rotate the master key, a new master key is created with KEY_ID = 2 and so on. We will discuss master key rotation in-depth later in the next blog posts of this series. Now that we know what master key ID looks like, we can have a look at an encryption header. When a tablespace is first encrypted, the encryption header is added to the tablespace header. It looks like this: KEY ID is the KEY_ID from the master key ID that we have already discussed. UUID is the server uuid, later used in master key ID. The tablespace key consists of 256 randomly generated (by the server) bits. Tablespace IV also consists of 256 randomly generated keys (although it should be 128 bits). IV is used to initialize AES encryption and decryption (only 128 bits of those 256 bits are used). Last we have CRC32 checksum of tablespace key and IV. All this time I was saying that we have an encrypted tablespace key in the header. I was oversimplifying this a bit. Actually, we store tablespace key and IV bundled together, and we encrypt them both using the master key. Remember, before we encrypt tablespace key and IV we first calculate CRC32 of both. Why Do We Need CRC32? Well, to make sure that we are using the correct master key to decrypt the tablespace key and IV. After we decrypt the tablespace key and IV, we calculate checksum and we compare it with the CRC32 stored in the header. In case they match, we know we have used the correct master key and we have a valid tablespace key and tablespace iv to decrypt the table. In case they do not match, we mark the tablespace as missing (we would not be able to decrypt it anyways). You may ask – when do we check if we have correct master keys to decrypt all the tables? The answer is: at the server startup. At the startup, each encrypted table/tablespace server reads the UUID, KEY_ID from the header to construct the master key ID. Next, it fetches the needed master key from Keyring, decrypts tablespace key and iv, and checks the checksum. Again, if checksum matches we are all good, if not, the tablespace is marked as missing. If you are following this encryption series of blog posts, you might remember that in MySQL Encryption: Talking About Keyrings I said that server-based keyrings only fetch a list of key ids (to be precise, key id and user id, as this pair uniquely identifies the key in the keyring) on server startup. Now I am saying that the server fetches all the keys it needs on the startup to validate that it can decrypt tablespace keys. So why does a server-based keyring fetch only key_id and user_id when initialized instead of fetching all the keys? Because not all the keys might be needed; this is mostly due to master key rotation. Master key rotation will generate a new master key in keyring but it will never delete old versions of the master key. So you might end up with many keys in Key Server (Vault server – if you are using keyring_vault) that are not needed by the server and thus not fetched on server startup. It is time we talk a bit about the pros and cons of master key encryption. The biggest advantage is that you only need one encryption key (master key) to be stored separately from your encrypted data. This makes the server startup fast and keyring small,  thus easier to manage. Also, the master key can be rotated with one command. However, master key encryption has one big disadvantage; once a tablespace is encrypted with tablespace_key it will always stay encrypted with the same key. Master key rotation will not help here. Why is this a disadvantage? We know that MySQL has bugs that can make it suddenly crash, producing a core file. Since the core file contains a memory dump of our server, it can so happen that this core dump will contain a decrypted tablespace key. What is worse, the decrypted tablespace keys are stored in memory that can be swapped to disk. You can say that it is not a disadvantage because you need to have root access to access those core files/swap partition. This is true – but you only need root access for some time. Once someone gets access to decrypted tablespace key, s/he can keep using it to decrypt the data, even if the root access is no longer possible for that person. Also, the disk can be stolen and swap partition/core files can be read with different means, and the purpose of TDE is to make it unreadable even if the disk gets stolen. Percona Server for MySQL offers a feature that makes actual re-encryption of a tablespace – with newly generated keys – possible. This feature is called encryption threads and is still experimental at the time I am writing this blog post. Stay tuned for more information on transparent data encryption in the next blog posts of this series. [Less]
Posted over 4 years ago by MySQL Performance Blog
Percona Monitoring and Management (PMM) is built upon the shoulder of giants like Prometheus and Grafana. And speaking of Grafana, one of the coolest features that come with it is the ability to customize the experience through 3rd party plugins. ... [More] Plugins are an easy way to enhance the ability to have specialized graphs. One case that we saw in Managed Services is the ability to have a throughput graph, that shows QPS vs Threads running. This is different in essence of the “default” graphs that show a metric against time (time being the X-axis) since what we wanted is to show queries per second not during a time (that graph already exists) but for specific values of threads. One way to achieve that is by using a plugin called Plotly that render metrics using the plot.ly javascript framework, but specifically will let us make the graph we need. Let’s get hands-on! Installing Grafana plugins Performing this task is pretty straightforward thanks to the grafana-cli command, that comes available inside any PMM installation. The only thing needed is the name of the desired plugin, and knowing the command syntax which is: grafana-cli plugins install My case is a PMM through docker, so the full command and response is as follow [root@app ~]# docker exec -t pmm-server bash -c "/usr/bin/grafana-cli plugins install natel-plotly-panel" installing natel-plotly-panel @ 0.0.6 from url: https://grafana.com/api/plugins/natel-plotly-panel/versions/0.0.6/download into: /var/lib/grafana/plugins � Installed natel-plotly-panel successfully Restart grafana after installing plugins . [root@app ~]# All right, the final step before the plugin is fully ready to be used is to restart the “grafana-server” service. In our case, restarting the docker container will make the trick, the command is: docker restart pmm-server And after the container is back, the plugin is available: Now, the next step was to actually create the graph. Like I mentioned above, the need for the plugin was to have a “live” graph of the database throughput, measured as questions per second, per each thread, just like the graphs that one can find on any benchmark article out there, where the idea is to show the units of work on different concurrency scenarios. The metrics that we need are already being collected by PMM: mysql_global_status_threads_running (the “Threads running”) and mysql_global_stauts_questions (the “QPS”). Those are added on the regular “metrics” tab: Then, we need the actual “Traces” for Plotly. Those are based on the metrics. We just need to assign the axis: And then we have the desired graph. What is showing is the QPS per threads during the selected time range. That’s the reason why there are several points instead of one, like on regular benchmark graphs, because is not an accumulated value. In this case, the graph shows that MySQL is mostly low concurrency traffic, with the majority of the threads around the value of 4, and some peaks at 6 threads, and QPS also low, having the peak at around 300 QPS. Does it Work with PMM2? You bet it does! Following the already described steps: [root@app ~]# docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 22bb13b5d6f9 percona/pmm-server:2 "/opt/entrypoint.sh" 3 hours ago Up About an hour 0.0.0.0:80->80/tcp, 0.0.0.0:443->443/tcp pmm-server [root@app ~]# docker exec -t pmm-server bash -c "/usr/bin/grafana-cli plugins install natel-plotly-panel" installing natel-plotly-panel @ 0.0.6 from: https://grafana.com/api/plugins/natel-plotly-panel/versions/0.0.6/download into: /var/lib/grafana/plugins � Installed natel-plotly-panel successfully Restart grafana after installing plugins . [root@app ~]# docker restart pmm-server pmm-server The plugin is available and we can make the same graph In Conclusion Enhancing the PMM experience through the Grafana plugins is a relatively easy task, in operational terms. It is worth mentioning that the plugin and the dashboard will survive in-place upgrades of PMM. In-place means using the default PMM upgrade option available in the home dashboard: This is key: Compatibility between plugins and Grafana versions is on you, Mr. User, and you only. If the PMM team updates Grafana to an incompatible version for any plugin you are using, it is on you only. Full list of Grafana plugins. [Less]
Posted over 4 years ago by MySQL Performance Blog
As the world’s most popular open-source database, MySQL has been around the block more than a few times. Traditionally installed in on-premise data centers, recent years have shown a major trend for MySQL in the cloud, and near the top of this list ... [More] is Amazon RDS. Amazon RDS allows you to deploy scalable MySQL servers within minutes in a cost-efficient manner with easily resizable hardware capacity. This frees you up to focus on application development and leaves many of the traditional database administration tasks such as backups, patching, and monitoring in the hands of AWS. In this post I’d like to go over six important benefits of Amazon RDS, and why a move into RDS may be the right move for you. Easy Deployment Amazon RDS allows you to use either the AWS Management Console or a set of APIs to create, delete, and modify your database instances. You have full control of access and security for your instances, as well as an easy process to manage your database backups and snapshots. Amazon RDS for MySQL instances are pre-configured with parameters and settings tuned for the instance type you have chosen. Fear not, however, as you have a massive amount of control over these parameters with easy to manage database parameter groups that provide granular control and tuning options for your database instances. Fast Storage Options Amazon RDS provides two SSD-backed storage options for your database instances. The General Purpose storage option provides cost-effective storage for smaller or medium-sized workloads. For those applications that demand higher performance (such as heavy OLTP workloads), Provisioned IOPS Storage delivers consistent storage performance of up to 40,000 I/O’s per second. Easily expandable as your storage requirements grow, you can provision additional storage on the fly with no downtime. Backup & Recovery A good DBA is only as good as their last backup. This is a saying I’ve heard ever since I started working with MySQL back in the 3.2.3 days. It was true then, and it is true now – without the data, what can even the best DBA do to restore production services? With Amazon RDS, the automated backup features enable backup and recovery of your MySQL database instances to any point in time within your specified retention periods (up to 35 days). You can also manually initiate backups of your instances, and all of these backups will be stored by Amazon RDS until you explicitly delete them. Backups have never been so easy. High Availability On-premise high availability is often a challenge, as so many pieces of the puzzle need to work together in unison, and this is discounting the need for multiple data centers that are geographically separated. Using Amazon RDS Multi-AZ deployments, you can achieve enhanced availability and durability for your MySQL database instances, making them a great fit for production database workloads. By using Amazon RDS Read Replicas, it is easy to elastically scale out beyond the capacity constraints of a single database instance for read-heavy workloads. Monitoring/Metrics With the available RDS monitoring features in Amazon Cloudwatch, all of the metrics for your RDS database instances are available at no additional charge. Should you want more detailed and in-depth monitoring options, CloudWatch Enhanced Monitoring provides access to over 50 CPU, memory, file system, and disk I/O metrics. You can view key operational metrics directly within the AWS Management Console, including compute, memory, storage capacity utilization, I/O activity, instance connections, and more. Never be caught off guard again by not knowing what is happening within your database stack. Security As a managed service, Amazon RDS provides a high level of security for your MySQL databases. These include network isolation using Amazon VPC (virtual private cloud), encryption at rest using keys that you create and control through the AWS Key Management Service (KMS). Data can also be encrypted through the wire in transit using SSL. This is a good point to mention the Shared Responsibility Model, as there are still components you’ll need to secure during your RDS setup. As you can see from the AWS documentation, you are still in control of your instances, and as such will need to be sure that the settings align with your desired security best practices. While a detailed dive into the Shared Responsibility Model is beyond the scope of this article, my colleague Michael Benshoof has written a detailed overview here: The Shared Responsibility Model of Security in the Cloud. Summary In closing, there are many benefits to leveraging the cloud for your database infrastructure, and Amazon RDS is one of the more popular options suitable for many workloads and users. While AWS RDS does remove much of the DBA overhead with traditional on-premise installations, the customer is still responsible for managing the data itself, along with the tasks that accompany that. Analyzing workloads, performance trends, traffic management, etc., is still going to be critical even when leveraging managed cloud services. Percona can assist with your RDS deployment by picking up where AWS leaves off: • Performance Tuning • Capacity Planning • Disaster Recovery • Resource Allocation With Percona supporting your RDS installation, you can focus your efforts on your application and business, while letting Amazon and Percona support your infrastructure. If you are researching a move into the cloud, Percona can help review your options and assist in helping you choose a solution that will be best suited for your business. For more information, download our solution brief below which outlines setting up MySQL Amazon RDS instances to meet your company’s growing needs. Amazon RDS is suitable for production workloads and also can accommodate rapid deployment and application development due to the ease of initial setup. Grow Your Business with an AWS RDS MySQL Environment [Less]
Posted over 4 years ago by Bhuvanesh R
We are living in the DataLake world. Now almost every organizations wants their reporting in Near Real Time. Kafka is of the best streaming platform for realtime reporting. Based on the Kafka connector, RedHat designed the Debezium which is an ... [More] OpenSource product and high recommended for real time CDC from transnational databases. I referred many blogs to setup this cluster. But I found just basic installation steps. So I setup this cluster for AWS with Production grade and publishing this blog. A shot intro: Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred. Basic Tech Terms: Kafka Broker: Brokers are the core for the kafka streaming, they’ll keep your messages and giving it to the consumers. Zookeeper: It’ll maintain the cluster status and node status. It’ll help to make the Kafka’s availability. Producers: The component who will send the messages(data) to the Broker. Consumers: The component who will get the messages from the Queue for further analytics. Confluent: Confluent is having their own steaming platform which basically using Apache Kafka under the hood. But it has more features. Here Debezium is our data producer and S3sink is our consumer. For this setup, Im going to stream the MySQL data changes to S3 with customized format. AWS Architecture: Kafka and Zookeepers are installed on the same EC2. We we’ll deploy 3 node confluent Kafka cluster. Each node will be in a different availability zone. 172.31.47.152 - Zone A 172.31.38.158 - Zone B 172.31.46.207 - Zone C For Producer(debezium) and Consumer(S3sink) will be hosted on the same Ec2. We’ll 3 nodes for this. 172.31.47.12 - Zone A 172.31.38.183 - Zone B 172.31.46.136 - Zone C Instance Type: Kafka nodes are generally needs Memory and Network Optimized. You can choose either Persistent and ephemeral storage. I prefer Persistent SSD Disks for Kafka storage. So add n GB size disk to your Kafka broker nodes. For Normal work loads its better to go with R5 instance Family. Mount the Volume in /kafkadata location. Security Group: Use a new Security group which allows the below ports. Installation: Install the Java and Kafka on all the Broker nodes. -- Install OpenJDK apt-get -y update sudo apt-get -y install default-jre -- Install Confluent Kafka platform wget -qO - https://packages.confluent.io/deb/5.3/archive.key | sudo apt-key add - sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.3 stable main" sudo apt-get update && sudo apt-get install confluent-platform-2.12Configuration: We need to configure Zookeeper and Kafaka properties, Edit the /etc/kafka/zookeeper.properties on all the kafka nodes -- On Node 1 dataDir=/var/lib/zookeeper clientPort=2181 maxClientCnxns=0 server.1=0.0.0.0:2888:3888 server.2=172.31.38.158:2888:3888 server.3=172.31.46.207:2888:3888 autopurge.snapRetainCount=3 autopurge.purgeInterval=24 initLimit=5 syncLimit=2 -- On Node 2 dataDir=/var/lib/zookeeper clientPort=2181 maxClientCnxns=0 server.1=172.31.47.152:2888:3888 server.2=0.0.0.0:2888:3888 server.3=172.31.46.207:2888:3888 autopurge.snapRetainCount=3 autopurge.purgeInterval=24 initLimit=5 syncLimit=2 -- On Node 3 dataDir=/var/lib/zookeeper clientPort=2181 maxClientCnxns=0 server.1=172.31.47.152:2888:3888 server.2=172.31.38.158:2888:3888 server.3=0.0.0.0:2888:3888 autopurge.snapRetainCount=3 autopurge.purgeInterval=24 initLimit=5 syncLimit=2We need to assign a unique ID for all the Zookeeper nodes. -- On Node 1 echo "1" > /var/lib/zookeeper/myid --On Node 2 echo "2" > /var/lib/zookeeper/myid --On Node 3 echo "3" > /var/lib/zookeeper/myidNow we need to configure Kafka broker. So edit the /etc/kafka/server.properties on all the kafka nodes. --On Node 1 broker.id.generation.enable=true delete.topic.enable=true listeners=PLAINTEXT://:9092 zookeeper.connect=172.31.47.152:2181,172.31.38.158:2181,172.31.46.207:2181 log.dirs=/kafkadata/kafka log.retention.hours=168 num.partitions=1 --On Node 2 broker.id.generation.enable=true delete.topic.enable=true listeners=PLAINTEXT://:9092 log.dirs=/kafkadata/kafka zookeeper.connect=172.31.47.152:2181,172.31.38.158:2181,172.31.46.207:2181 log.retention.hours=168 num.partitions=1 -- On Node 3 broker.id.generation.enable=true delete.topic.enable=true listeners=PLAINTEXT://:9092 log.dirs=/kafkadata/kafka zookeeper.connect=172.31.47.152:2181,172.31.38.158:2181,172.31.46.207:2181 num.partitions=1 log.retention.hours=168The next step is optimizing the Java JVM Heap size, In many places kafka will go down due to the less heap size. So Im allocating 50% of the Memory to Heap. But make sure more Heap size also bad. Please refer some documentation to set this value for very heavy systems. vi /usr/bin/kafka-server-start export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"The another major problem in the kafka system is the open file descriptors. So we need to allow the kafka to open at least up to 100000 files. vi /etc/pam.d/common-session session required pam_limits.so vi /etc/security/limits.conf * soft nofile 10000 * hard nofile 100000 cp-kafka soft nofile 10000 cp-kafka hard nofile 100000Here the cp-kafka is the default user for the kafka process. Create Kafka data dir: mkdir -p /kafkadata/kafka chown -R cp-kafka:confluent /kafkadata/kafka chmode 710 /kafkadata/kafkaStart the Kafka cluster: sudo systemctl start confluent-zookeeper sudo systemctl start confluent-kafka sudo systemctl start confluent-schema-registryMake sure the Kafka has to automatically starts after the Ec2 restart. sudo systemctl enable confluent-zookeeper sudo systemctl enable confluent-kafka sudo systemctl enable confluent-schema-registryNow our kafka cluster is ready. To check the list of system topics run the following command. kafka-topics --list --zookeeper localhost:2181 __confluent.support.metricsSetup Debezium: Install the confluent connector and debezium MySQL connector on all the producer nodes. apt-get update sudo apt-get install default-jre wget -qO - https://packages.confluent.io/deb/5.3/archive.key | sudo apt-key add - sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.3 stable main" sudo apt-get update && sudo apt-get install confluent-hub-client confluent-common confluent-kafka-connect-s3 confluent-kafka-2.12Configuration: Edit the /etc/kafka/connect-distributed.properties on all the producer nodes to make our producer will run on a distributed manner. -- On all the connector nodes bootstrap.servers=172.31.47.152:9092,172.31.38.158:9092,172.31.46.207:9092 group.id=debezium-cluster plugin.path=/usr/share/java,/usr/share/confluent-hub-componentsInstall Debezium MySQL Connector: confluent-hub install debezium/debezium-connector-mysql:latestit’ll ask for making some changes just select Y for everything. Run the distributed connector as a service: vi /lib/systemd/system/confluent-connect-distributed.service [Unit] Description=Apache Kafka - connect-distributed Documentation=http://docs.confluent.io/ After=network.target [Service] Type=simple User=cp-kafka Group=confluent ExecStart=/usr/bin/connect-distributed /etc/kafka/connect-distributed.properties TimeoutStopSec=180 Restart=no [Install] WantedBy=multi-user.targetStart the Service: systemctl enable confluent-connect-distributed systemctl start confluent-connect-distributedConfigure Debezium MySQL Connector: Create a mysql.json file which contains the MySQL information and other formatting options. { "name": "mysql-connector-db01", "config": { "name": "mysql-connector-db01", "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.server.id": "1", "tasks.max": "3", "database.history.kafka.bootstrap.servers": "172.31.47.152:9092,172.31.38.158:9092,172.31.46.207:9092", "database.history.kafka.topic": "schema-changes.mysql", "database.server.name": "mysql-db01", "database.hostname": "172.31.84.129", "database.port": "3306", "database.user": "bhuvi", "database.password": "my_stong_password", "database.whitelist": "proddb,test", "internal.key.converter.schemas.enable": "false", "key.converter.schemas.enable": "false", "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter", "internal.value.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" "transforms.unwrap.add.source.fields": "ts_ms", } } “database.history.kafka.bootstrap.servers” - Kafka Servers IP. “database.whitelist” - List of databases to get the CDC. key.converter and value.converter and transforms parameters - By default Debezium output will have more detailed information. But I don’t want all of those information. Im only interested in to get the new row and the timestamp when its inserted. If you don’t want to customize anythings then just remove everything after the database.whitelist Register the MySQL Connector: curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d @mysql.jsonCheck the status: curl GET localhost:8083/connectors/mysql-connector-db01/status { "name": "mysql-connector-db01", "connector": { "state": "RUNNING", "worker_id": "172.31.94.191:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "172.31.94.191:8083" } ], "type": "source" }Test the MySQL Consumer: Now insert something into any tables in proddb or test (because we have whilelisted only these databaes to capture the CDC. use test; create table rohi (id int), fn varchar(10), ln varchar(10), phone int ); insert into rohi values (2, 'rohit', 'ayare','87611');We can get these values from the Kafker brokers. Open any one the kafka node and run the below command. I prefer confluent cli for this. By default it’ll not be available, so download manually. curl -L https://cnfl.io/cli | sh -s -- -b /usr/bin/Listen the below topic: mysql-db01.test.rohi This is the combination of servername.databasename.tablename servername(you mentioned this in as a server name in mysql json file). confluent local consume mysql-db01.test.rohi ---- The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html ----- {"id":1,"fn":"rohit","ln":"ayare","phone":87611,"__ts_ms":1576757407000}Setup S3 Sink connector in All Producer Nodes: I want to send this data to S3 bucket. So you must have an EC2 IAM role which has access to the target S3 bucket. Or install awscli and configure access and secret key(but its not recommended) Install S3 Connector: confluent-hub install confluentinc/kafka-connect-s3:latestCreate s3.json file. { "name": "s3-sink-db01", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.bucket.name": "bhuvi-datalake", "name": "s3-sink-db01", "tasks.max": "3", "s3.region": "us-east-1", "s3.part.size": "5242880", "s3.compression.type": "gzip", "timezone": "UTC", "locale": "en", "flush.size": "10000", "rotate.interval.ms": "3600000", "topics.regex": "mysql-db01.(.*)", "internal.key.converter.schemas.enable": "false", "key.converter.schemas.enable": "false", "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "internal.value.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner", "path.format": "YYYY/MM/dd/HH", "partition.duration.ms": "3600000", "rotate.schedule.interval.ms": "3600000" } } "topics.regex": "mysql-db01" - It’ll send the data only from the topics which has mysql-db01 as prefix. In our case all the MySQL databases related topics will start with this prefix. "flush.size" - The data will uploaded to S3 only after these many number of records stored. Or after "rotate.schedule.interval.ms" this duration. Register this S3 sink connector: curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d @s3Check the Status: curl GET localhost:8083/connectors/s3-sink-db01/status { "name": "s3-sink-db01", "connector": { "state": "RUNNING", "worker_id": "172.31.94.191:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "172.31.94.191:8083" }, { "id": 1, "state": "RUNNING", "worker_id": "172.31.94.191:8083" }, { "id": 2, "state": "RUNNING", "worker_id": "172.31.94.191:8083" } ], "type": "sink" }Test the S3 sync: Insert the 10000 rows into the rohi table. Then check the S3 bucket. It’ll save the data in JSON format with GZIP compression. Also in a HOUR wise partitions. More Tuning: Replication Factor is the other main parameter to the data durability. Use internal IP addresses as much as you can. By default debezium uses 1 Partition per topic. You can configure this based on your work load. But more partitions more through put needed. References: Setup Kafka in production by confluent How to choose number of partition Open file descriptors for Kafka Kafka best practices in AWS Debezium documentation Customize debezium output with SMT [Less]
Posted over 4 years ago by Bhuvanesh R
We are living in the DataLake world. Now almost every organizations wants their reporting in Near Real Time. Kafka is of the best streaming platform for realtime reporting. Based on the Kafka connector, RedHat designed the Debezium which is an ... [More] OpenSource product and high recommended for real time CDC from transnational databases. I referred many blogs to setup this cluster. But I found just basic installation steps. So I setup this cluster for AWS with Production grade and publishing this blog. A shot intro: Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred. Basic Tech Terms: Kafka Broker: Brokers are the core for the kafka streaming, they’ll keep your messages and giving it to the consumers. Zookeeper: It’ll maintain the cluster status and node status. It’ll help to make the Kafka’s availability. Producers: The component who will send the messages(data) to the Broker. Consumers: The component who will get the messages from the Queue for further analytics. Confluent: Confluent is having their own steaming platform which basically using Apache Kafka under the hood. But it has more features. Here Debezium is our data producer and S3sink is our consumer. For this setup, Im going to stream the MySQL data changes to S3 with customized format. AWS Architecture: Kafka and Zookeepers are installed on the same EC2. We we’ll deploy 3 node confluent Kafka cluster. Each node will be in a different availability zone. 172.31.47.152 - Zone A 172.31.38.158 - Zone B 172.31.46.207 - Zone C For Producer(debezium) and Consumer(S3sink) will be hosted on the same Ec2. We’ll 3 nodes for this. 172.31.47.12 - Zone A 172.31.38.183 - Zone B 172.31.46.136 - Zone C Instance Type: Kafka nodes are generally needs Memory and Network Optimized. You can choose either Persistent and ephemeral storage. I prefer Persistent SSD Disks for Kafka storage. So add n GB size disk to your Kafka broker nodes. For Normal work loads its better to go with R5 instance Family. Mount the Volume in /kafkadata location. Security Group: Use a new Security group which allows the below ports. Installation: Install the Java and Kafka on all the Broker nodes. -- Install OpenJDK apt-get -y update sudo apt-get -y install default-jre -- Install Confluent Kafka platform wget -qO - https://packages.confluent.io/deb/5.3/archive.key | sudo apt-key add - sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.3 stable main" sudo apt-get update && sudo apt-get install confluent-platform-2.12Configuration: We need to configure Zookeeper and Kafaka properties, Edit the /etc/kafka/zookeeper.properties on all the kafka nodes -- On Node 1 dataDir=/var/lib/zookeeper clientPort=2181 maxClientCnxns=0 server.1=0.0.0.0:2888:3888 server.2=172.31.38.158:2888:3888 server.3=172.31.46.207:2888:3888 autopurge.snapRetainCount=3 autopurge.purgeInterval=24 initLimit=5 syncLimit=2 -- On Node 2 dataDir=/var/lib/zookeeper clientPort=2181 maxClientCnxns=0 server.1=172.31.47.152:2888:3888 server.2=0.0.0.0:2888:3888 server.3=172.31.46.207:2888:3888 autopurge.snapRetainCount=3 autopurge.purgeInterval=24 initLimit=5 syncLimit=2 -- On Node 3 dataDir=/var/lib/zookeeper clientPort=2181 maxClientCnxns=0 server.1=172.31.47.152:2888:3888 server.2=172.31.38.158:2888:3888 server.3=0.0.0.0:2888:3888 autopurge.snapRetainCount=3 autopurge.purgeInterval=24 initLimit=5 syncLimit=2We need to assign a unique ID for all the Zookeeper nodes. -- On Node 1 echo "1" > /var/lib/zookeeper/myid --On Node 2 echo "2" > /var/lib/zookeeper/myid --On Node 3 echo "3" > /var/lib/zookeeper/myidNow we need to configure Kafka broker. So edit the /etc/kafka/server.properties on all the kafka nodes. --On Node 1 broker.id.generation.enable=true delete.topic.enable=true listeners=PLAINTEXT://:9092 zookeeper.connect=172.31.47.152:2181,172.31.38.158:2181,172.31.46.207:2181 log.dirs=/kafkadata/kafka log.retention.hours=168 num.partitions=1 --On Node 2 broker.id.generation.enable=true delete.topic.enable=true listeners=PLAINTEXT://:9092 log.dirs=/kafkadata/kafka zookeeper.connect=172.31.47.152:2181,172.31.38.158:2181,172.31.46.207:2181 log.retention.hours=168 num.partitions=1 -- On Node 3 broker.id.generation.enable=true delete.topic.enable=true listeners=PLAINTEXT://:9092 log.dirs=/kafkadata/kafka zookeeper.connect=172.31.47.152:2181,172.31.38.158:2181,172.31.46.207:2181 num.partitions=1 log.retention.hours=168The next step is optimizing the Java JVM Heap size, In many places kafka will go down due to the less heap size. So Im allocating 50% of the Memory to Heap. But make sure more Heap size also bad. Please refer some documentation to set this value for very heavy systems. vi /usr/bin/kafka-server-start export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"The another major problem in the kafka system is the open file descriptors. So we need to allow the kafka to open at least up to 100000 files. vi /etc/pam.d/common-session session required pam_limits.so vi /etc/security/limits.conf * soft nofile 10000 * hard nofile 100000 cp-kafka soft nofile 10000 cp-kafka hard nofile 100000Here the cp-kafka is the default user for the kafka process. Create Kafka data dir: mkdir -p /kafkadata/kafka chown -R cp-kafka:confluent /kafkadata/kafka chmode 710 /kafkadata/kafkaStart the Kafka cluster: sudo systemctl start confluent-zookeeper sudo systemctl start confluent-kafka sudo systemctl start confluent-schema-registryMake sure the Kafka has to automatically starts after the Ec2 restart. sudo systemctl enable confluent-zookeeper sudo systemctl enable confluent-kafka sudo systemctl enable confluent-schema-registryNow our kafka cluster is ready. To check the list of system topics run the following command. kafka-topics --list --zookeeper localhost:2181 __confluent.support.metricsSetup Debezium: Install the confluent connector and debezium MySQL connector on all the producer nodes. apt-get update sudo apt-get install default-jre wget -qO - https://packages.confluent.io/deb/5.3/archive.key | sudo apt-key add - sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.3 stable main" sudo apt-get update && sudo apt-get install confluent-hub-client confluent-common confluent-kafka-connect-s3 confluent-kafka-2.12Configuration: Edit the /etc/kafka/connect-distributed.properties on all the producer nodes to make our producer will run on a distributed manner. -- On all the connector nodes bootstrap.servers=172.31.47.152:9092,172.31.38.158:9092,172.31.46.207:9092 group.id=debezium-cluster plugin.path=/usr/share/java,/usr/share/confluent-hub-componentsInstall Debezium MySQL Connector: confluent-hub install debezium/debezium-connector-mysql:latestit’ll ask for making some changes just select Y for everything. Run the distributed connector as a service: vi /lib/systemd/system/confluent-connect-distributed.service [Unit] Description=Apache Kafka - connect-distributed Documentation=http://docs.confluent.io/ After=network.target [Service] Type=simple User=cp-kafka Group=confluent ExecStart=/usr/bin/connect-distributed /etc/kafka/connect-distributed.properties TimeoutStopSec=180 Restart=no [Install] WantedBy=multi-user.targetStart the Service: systemctl enable confluent-connect-distributed systemctl start confluent-connect-distributedConfigure Debezium MySQL Connector: Create a mysql.json file which contains the MySQL information and other formatting options. { "name": "mysql-connector-db01", "config": { "name": "mysql-connector-db01", "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.server.id": "1", "tasks.max": "3", "database.history.kafka.bootstrap.servers": "172.31.47.152:9092,172.31.38.158:9092,172.31.46.207:9092", "database.history.kafka.topic": "schema-changes.mysql", "database.server.name": "mysql-db01", "database.hostname": "172.31.84.129", "database.port": "3306", "database.user": "bhuvi", "database.password": "my_stong_password", "database.whitelist": "proddb,test", "internal.key.converter.schemas.enable": "false", "key.converter.schemas.enable": "false", "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter", "internal.value.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.add.source.fields": "ts_ms", "tombstones.on.delete": false } } “database.history.kafka.bootstrap.servers” - Kafka Servers IP. “database.whitelist” - List of databases to get the CDC. key.converter and value.converter and transforms parameters - By default Debezium output will have more detailed information. But I don’t want all of those information. Im only interested in to get the new row and the timestamp when its inserted. If you don’t want to customize anythings then just remove everything after the database.whitelist Register the MySQL Connector: curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d @mysql.jsonCheck the status: curl GET localhost:8083/connectors/mysql-connector-db01/status { "name": "mysql-connector-db01", "connector": { "state": "RUNNING", "worker_id": "172.31.94.191:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "172.31.94.191:8083" } ], "type": "source" }Test the MySQL Consumer: Now insert something into any tables in proddb or test (because we have whilelisted only these databaes to capture the CDC. use test; create table rohi (id int, fn varchar(10), ln varchar(10), phone int ); insert into rohi values (2, 'rohit', 'ayare','87611');We can get these values from the Kafker brokers. Open any one the kafka node and run the below command. I prefer confluent cli for this. By default it’ll not be available, so download manually. curl -L https://cnfl.io/cli | sh -s -- -b /usr/bin/Listen the below topic: mysql-db01.test.rohi This is the combination of servername.databasename.tablename servername(you mentioned this in as a server name in mysql json file). confluent local consume mysql-db01.test.rohi ---- The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html ----- {"id":1,"fn":"rohit","ln":"ayare","phone":87611,"__ts_ms":1576757407000}Setup S3 Sink connector in All Producer Nodes: I want to send this data to S3 bucket. So you must have an EC2 IAM role which has access to the target S3 bucket. Or install awscli and configure access and secret key(but its not recommended) Install S3 Connector: confluent-hub install confluentinc/kafka-connect-s3:latestCreate s3.json file. { "name": "s3-sink-db01", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.bucket.name": "bhuvi-datalake", "name": "s3-sink-db01", "tasks.max": "3", "s3.region": "us-east-1", "s3.part.size": "5242880", "s3.compression.type": "gzip", "timezone": "UTC", "locale": "en", "flush.size": "10000", "rotate.interval.ms": "3600000", "topics.regex": "mysql-db01.(.*)", "internal.key.converter.schemas.enable": "false", "key.converter.schemas.enable": "false", "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "internal.value.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner", "path.format": "YYYY/MM/dd/HH", "partition.duration.ms": "3600000", "rotate.schedule.interval.ms": "3600000" } } "topics.regex": "mysql-db01" - It’ll send the data only from the topics which has mysql-db01 as prefix. In our case all the MySQL databases related topics will start with this prefix. "flush.size" - The data will uploaded to S3 only after these many number of records stored. Or after "rotate.schedule.interval.ms" this duration. Register this S3 sink connector: curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d @s3Check the Status: curl GET localhost:8083/connectors/s3-sink-db01/status { "name": "s3-sink-db01", "connector": { "state": "RUNNING", "worker_id": "172.31.94.191:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "172.31.94.191:8083" }, { "id": 1, "state": "RUNNING", "worker_id": "172.31.94.191:8083" }, { "id": 2, "state": "RUNNING", "worker_id": "172.31.94.191:8083" } ], "type": "sink" }Test the S3 sync: Insert the 10000 rows into the rohi table. Then check the S3 bucket. It’ll save the data in JSON format with GZIP compression. Also in a HOUR wise partitions. Monitoring: Refer this post to setup monitoring for MySQL Connector. More Tuning: Replication Factor is the other main parameter to the data durability. Use internal IP addresses as much as you can. By default debezium uses 1 Partition per topic. You can configure this based on your work load. But more partitions more through put needed. References: Setup Kafka in production by confluent How to choose number of partition Open file descriptors for Kafka Kafka best practices in AWS Debezium documentation Customize debezium output with SMT Debezium Series blogs: Build Production Grade Debezium Cluster With Confluent Kafka Monitor Debezium MySQL Connector With Prometheus And Grafana Debezium MySQL Snapshot From Read Replica With GTID Debezium MySQL Snapshot From Read Replica And Resume From Master Debezium MySQL Snapshot For AWS RDS Aurora From Backup Snaphot RealTime CDC From MySQL Using AWS MSK With Debezium [Less]
Posted over 4 years ago by Bhuvanesh R
We are living in the DataLake world. Now almost every oraganization wants their reporting in Near Real Time. Kafka is of the best streaming platform for realtime reporting. Based on the Kafka connector, RedHat designed the Debezium which is an ... [More] OpenSource product and high recommended for real time CDC from transnational databases. I referred many blogs to setup this cluster. But I found just basic installation steps. So I setup this cluster for AWS with Production grade and publishing this blog. A shot intro: Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred. Basic Tech Terms: Kafka Broker: Brokers are the core for the kafka streaming, they’ll keep your messages and giving it to the consumers. Zookeeper: It’ll maintain the cluster status and node status. It’ll help to make the Kafka’s availability. Producers: The component who will send the messages(data) to the Broker. Consumers: The component who will get the messages from the Queue for further analytics. Confluent: Confluent is having their own steaming platform which basically using Apache Kafka under the hood. But it has more features. Here Debezium is our data producer and S3sink is our consumer. For this setup, Im going to stream the MySQL data changes to S3 with customized format. AWS Architecture: Kafka and Zookeepers are installed on the same EC2. We we’ll deploy 3 node confluent Kafka cluster. Each node will be in a different availability zone. 172.31.47.152 - Zone A 172.31.38.158 - Zone B 172.31.46.207 - Zone C For Producer(debezium) and Consumer(S3sink) will be hosted on the same Ec2. We’ll 3 nodes for this. 172.31.47.12 - Zone A 172.31.38.183 - Zone B 172.31.46.136 - Zone C Instance Type: Kafka nodes are generally needs Memory and Network Optimized. You can choose either Persistent and ephemeral storage. I prefer Persistent SSD Disks for Kafka storage. So add n GB size disk to your Kafka broker nodes. For Normal work loads its better to go with R5 instance Family. Mount the Volume in /kafkadata location. Security Group: Use a new Security group which allows the below ports. Installation: Install the Java and Kafka on all the Broker nodes. -- Install OpenJDK apt-get -y update sudo apt-get -y install default-jre -- Install Confluent Kafka platform wget -qO - https://packages.confluent.io/deb/5.3/archive.key | sudo apt-key add - sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.3 stable main" sudo apt-get update && sudo apt-get install confluent-platform-2.12Configuration: We need to configure Zookeeper and Kafaka properties, Edit the /etc/kafka/zookeeper.properties on all the kafka nodes -- On Node 1 dataDir=/var/lib/zookeeper clientPort=2181 maxClientCnxns=0 server.1=0.0.0.0:2888:3888 server.2=172.31.38.158:2888:3888 server.3=172.31.46.207:2888:3888 autopurge.snapRetainCount=3 autopurge.purgeInterval=24 initLimit=5 syncLimit=2 -- On Node 2 dataDir=/var/lib/zookeeper clientPort=2181 maxClientCnxns=0 server.1=172.31.47.152:2888:3888 server.2=0.0.0.0:2888:3888 server.3=172.31.46.207:2888:3888 autopurge.snapRetainCount=3 autopurge.purgeInterval=24 initLimit=5 syncLimit=2 -- On Node 3 dataDir=/var/lib/zookeeper clientPort=2181 maxClientCnxns=0 server.1=172.31.47.152:2888:3888 server.2=172.31.38.158:2888:3888 server.3=0.0.0.0:2888:3888 autopurge.snapRetainCount=3 autopurge.purgeInterval=24 initLimit=5 syncLimit=2We need to assign a unique ID for all the Zookeeper nodes. -- On Node 1 echo "1" > /var/lib/zookeeper/myid --On Node 2 echo "2" > /var/lib/zookeeper/myid --On Node 3 echo "3" > /var/lib/zookeeper/myidNow we need to configure Kafka broker. So edit the /etc/kafka/server.properties on all the kafka nodes. --On Node 1 broker.id.generation.enable=true delete.topic.enable=true listeners=PLAINTEXT://:9092 zookeeper.connect=172.31.47.152:2181,172.31.38.158:2181,172.31.46.207:2181 log.dirs=/kafkadata/kafka log.retention.hours=168 num.partitions=1 --On Node 2 broker.id.generation.enable=true delete.topic.enable=true listeners=PLAINTEXT://:9092 log.dirs=/kafkadata/kafka zookeeper.connect=172.31.47.152:2181,172.31.38.158:2181,172.31.46.207:2181 log.retention.hours=168 num.partitions=1 -- On Node 3 broker.id.generation.enable=true delete.topic.enable=true listeners=PLAINTEXT://:9092 log.dirs=/kafkadata/kafka zookeeper.connect=172.31.47.152:2181,172.31.38.158:2181,172.31.46.207:2181 num.partitions=1 log.retention.hours=168The next step is optimizing the Java JVM Heap size, In many places kafka will go down due to the less heap size. So Im allocating 50% of the Memory to Heap. But make sure more Heap size also bad. Please refer some documentation to set this value for very heavy systems. vi /usr/bin/kafka-server-start export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"The another major problem in the kafka system is the open file descriptors. So we need to allow the kafka to open at least up to 100000 files. vi /etc/pam.d/common-session session required pam_limits.so vi /etc/security/limits.conf * soft nofile 10000 * hard nofile 100000 cp-kafka soft nofile 10000 cp-kafka hard nofile 100000Here the cp-kafka is the default user for the kafka process. Create Kafka data dir: mkdir -p /kafkadata/kafka chown -R cp-kafka:confluent /kafkadata/kafka chmode 710 /kafkadata/kafkaStart the Kafka cluster: sudo systemctl start confluent-zookeeper sudo systemctl start confluent-kafka sudo systemctl start confluent-schema-registryMake sure the Kafka has to automatically starts after the Ec2 restart. sudo systemctl enable confluent-zookeeper sudo systemctl enable confluent-kafka sudo systemctl enable confluent-schema-registryNow our kafka cluster is ready. To check the list of system topics run the following command. kafka-topics --list --zookeeper localhost:2181 __confluent.support.metricsSetup Debezium: Install the confluent connector and debezium MySQL connector on all the producer nodes. apt-get update sudo apt-get install default-jre wget -qO - https://packages.confluent.io/deb/5.3/archive.key | sudo apt-key add - sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.3 stable main" sudo apt-get update && sudo apt-get install confluent-hub-client confluent-common confluent-kafka-connect-s3 confluent-kafka-2.12Configuration: Edit the /etc/kafka/connect-distributed.properties on all the producer nodes to make our producer will run on a distributed manner. -- On all the connector nodes bootstrap.servers=172.31.47.152:9092,172.31.38.158:9092,172.31.46.207:9092 group.id=debezium-cluster plugin.path=/usr/share/java,/usr/share/confluent-hub-componentsInstall Debezium MySQL Connector: confluent-hub install debezium/debezium-connector-mysql:latestit’ll ask for making some changes just select Y for everything. Run the distributed connector as a service: vi /lib/systemd/system/confluent-connect-distributed.service [Unit] Description=Apache Kafka - connect-distributed Documentation=http://docs.confluent.io/ After=network.target [Service] Type=simple User=cp-kafka Group=confluent ExecStart=/usr/bin/connect-distributed /etc/kafka/connect-distributed.properties TimeoutStopSec=180 Restart=no [Install] WantedBy=multi-user.targetStart the Service: systemctl enable confluent-connect-distributed systemctl start confluent-connect-distributedConfigure Debezium MySQL Connector: Create a mysql.json file which contains the MySQL information and other formatting options. { "name": "mysql-connector-db01", "config": { "name": "mysql-connector-db01", "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.server.id": "1", "tasks.max": "3", "database.history.kafka.bootstrap.servers": "172.31.47.152:9092,172.31.38.158:9092,172.31.46.207:9092", "database.history.kafka.topic": "schema-changes.mysql", "database.server.name": "mysql-db01", "database.hostname": "172.31.84.129", "database.port": "3306", "database.user": "bhuvi", "database.password": "my_stong_password", "database.whitelist": "proddb,test", "internal.key.converter.schemas.enable": "false", "key.converter.schemas.enable": "false", "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter", "internal.value.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" "transforms.unwrap.add.source.fields": "ts_ms", } } “database.history.kafka.bootstrap.servers” - Kafka Servers IP. “database.whitelist” - List of databases to get the CDC. key.converter and value.converter and transforms parameters - By default Debezium output will have more detailed information. But I don’t want all of those information. Im only interested in to get the new row and the timestamp when its inserted. If you don’t want to customize anythings then just remove everything after the database.whitelist Register the MySQL Connector: curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d @mysql.jsonCheck the status: curl GET localhost:8083/connectors/mysql-connector-db01/status { "name": "mysql-connector-db01", "connector": { "state": "RUNNING", "worker_id": "172.31.94.191:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "172.31.94.191:8083" } ], "type": "source" }Test the MySQL Consumer: Now insert something into any tables in proddb or test (because we have whilelisted only these databaes to capture the CDC. use test; create table rohi (id int), fn varchar(10), ln varchar(10), phone int ); insert into rohi values (2, 'rohit', 'ayare','87611');We can get these values from the Kafker brokers. Open any one the kafka node and run the below command. I prefer confluent cli for this. By default it’ll not be available, so download manually. curl -L https://cnfl.io/cli | sh -s -- -b /usr/bin/Listen the below topic: mysql-db01.test.rohi This is the combination of servername.databasename.tablename servername(you mentioned this in as a server name in mysql json file). confluent local consume mysql-db01.test.rohi ---- The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html ----- {"id":1,"fn":"rohit","ln":"ayare","phone":87611,"__ts_ms":1576757407000}Setup S3 Sink connector in All Producer Nodes: I want to send this data to S3 bucket. So you must have an EC2 IAM role which has access to the target S3 bucket. Or install awscli and configure access and secret key(but its not recommended) Create s3.json file. { "name": "s3-sink-db01", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.bucket.name": "bhuvi-datalake", "name": "s3-sink-db01", "tasks.max": "3", "s3.region": "us-east-1", "s3.part.size": "5242880", "s3.compression.type": "gzip", "timezone": "UTC", "locale": "en", "flush.size": "10000", "rotate.interval.ms": "3600000", "topics.regex": "mysql-db01.(.*)", "internal.key.converter.schemas.enable": "false", "key.converter.schemas.enable": "false", "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "internal.value.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner", "path.format": "YYYY/MM/dd/HH", "partition.duration.ms": "3600000", "rotate.schedule.interval.ms": "3600000" } } "topics.regex": "mysql-db01" - It’ll send the data only from the topics which has mysql-db01 as prefix. In our case all the MySQL databases related topics will start with this prefix. "flush.size" - The data will uploaded to S3 only after these many number of records stored. Or after "rotate.schedule.interval.ms" this duration. Register this S3 sink connector: curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d @s3Check the Status: curl GET localhost:8083/connectors/s3-sink-db01/status { "name": "s3-sink-db01", "connector": { "state": "RUNNING", "worker_id": "172.31.94.191:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "172.31.94.191:8083" }, { "id": 1, "state": "RUNNING", "worker_id": "172.31.94.191:8083" }, { "id": 2, "state": "RUNNING", "worker_id": "172.31.94.191:8083" } ], "type": "sink" }Test the S3 sync: Insert the 10000 rows into the rohi table. Then check the S3 bucket. It’ll save the data in JSON format with GZIP compression. Also in a HOUR wise partitions. More Tuning: Replication Factor is the other main parameter to the data durability. Use internal IP addresses as much as you can. By default debezium uses 1 Partition per topic. You can configure this based on your work load. But more partitions more through put needed. References: Setup Kafka in production by confluent How to choose number of partition Open file descriptors for Kafka Kafka best practices in AWS Debezium documentation Customize debezium output with SMT [Less]