利用生产者消费者设计模式来完成的拉取Kafka数据的简单示例。

package com.bay.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.log4j.BasicConfigurator;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Properties;

/**
 * Author by BayMin, Date on 2019/7/25.
 */
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        BasicConfigurator.configure();
        ShareResource shareResource = new ShareResource();
        new Thread(new Producer(shareResource), "Thread_Producer").start();
        new Thread(new Consumer(shareResource), "Thread_Consumer").start();
    }
}

class Producer implements Runnable {
    private ShareResource shareResource;

    Producer(ShareResource shareResource) {
        this.shareResource = shareResource;
    }

    @SuppressWarnings("InfiniteLoopStatement")
    @Override
    public void run() {
        String topic = "test";
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop011:9092;hadoop012:9092;hadoop013:9092");
        props.put("group.id", "group01");
        props.put("zookeeper.connect", "hadoop010:2181,hadoop011:2181,hadoop012:2181,hadoop013:2181");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(topic));
        int flag = 0;
        while (true) {
            ConsumerRecords<String, String> polls = consumer.poll(100);
            if (polls.count() == 0)
                flag += 1;
            else
                flag = 0;
            for (ConsumerRecord<String, String> poll : polls) {
                shareResource.push(poll.value().concat("\n"));
            }
            if (flag > 10)
                shareResource.forcePush();
        }
    }
}

class Consumer implements Runnable {
    private ShareResource shareResource;

    Consumer(ShareResource shareResource) {
        this.shareResource = shareResource;
    }

    @SuppressWarnings("InfiniteLoopStatement")
    @Override
    public void run() {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd-HH-mm");
        FileOutputStream fos = null;
        long startTime = System.currentTimeMillis();
        String fileName = "C:\\Users\\lenovo\\Desktop\\12\\Kafka_data_" + format.format(startTime) + ".txt";
        try {
            fos = new FileOutputStream(fileName);
            while (true) {
                long currentTime = System.currentTimeMillis();
                if ((currentTime - startTime) >= ShareResource.getGAP()) {
                    fileName = "C:\\Users\\lenovo\\Desktop\\12\\Kafka_data_" + format.format(currentTime) + ".txt";
                    fos = new FileOutputStream(fileName);
                    startTime = currentTime;
                }
                shareResource.pop(fos);
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } finally {
            try {
                assert fos != null;
                fos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

class ShareResource {
    private StringBuffer data = new StringBuffer();
    private final static int BUFFER_SIZE = 1024 * 1024;
    private final static int GAP = 5 * 60 * 1000;
    private static boolean isEmpty = true;
    private static boolean isForce = false;

    void push(String message) {
        synchronized ("") {
            try {
                if (this.data.toString().getBytes().length < BUFFER_SIZE)
                    isEmpty = true;
                if (!isEmpty)
                    "".wait();
                this.data.append(message);
                if (this.data.toString().getBytes().length >= BUFFER_SIZE) {
                    isEmpty = false;
                    "".notify();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    void forcePush() {
        synchronized ("") {
            isForce = true;
            "".notify();
        }
    }

    void pop(FileOutputStream fos) {
        synchronized ("") {
            try {
                if (this.data.toString().getBytes().length >= BUFFER_SIZE)
                    isEmpty = false;
                if (isEmpty)
                    "".wait();
                if (this.data.length() != 0) {
                    fos.write(this.data.toString().getBytes());
                    this.data.delete(0, this.data.length());
                    isEmpty = true;
                    if (!isForce)
                        "".notify();
                    isForce = false;
                }
            } catch (InterruptedException | IOException e) {
                e.printStackTrace();
            }
        }
    }

    static int getGAP() {
        return GAP;
    }
}

模拟生成Kafka数据:

package com.bay.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.BasicConfigurator;

import java.util.Date;
import java.util.Properties;

/**
 * Author by BayMin, Date on 2019/7/25.
 */
public class KafkaProducerDemo {
    public static void main(String[] args) {
        BasicConfigurator.configure();
        String topic = "test";
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "hadoop011:9092;hadoop012:9092;hadoop013:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> stringStringKafkaProducer;
        stringStringKafkaProducer = new KafkaProducer<>(properties);
        // 模拟生成数据
        for (int i = 1; i < 10000; i++) {
            String msg = i + "  " + new Date().toString();
            stringStringKafkaProducer.send(new ProducerRecord<>(topic, msg));
        }
        stringStringKafkaProducer.close();
    }
}

时至今日,你依旧是我的光芒。