Skip to content

Commit

Permalink
初始化拓展
Browse files Browse the repository at this point in the history
  • Loading branch information
wenzhenxi committed Sep 27, 2016
1 parent 6817320 commit 5917f1b
Show file tree
Hide file tree
Showing 12 changed files with 756 additions and 201 deletions.
131 changes: 131 additions & 0 deletions Consumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
<?php
// Copyright 2016 The See-KafKa Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.

// 通过offset和group来获取消息(必须设置group)
const KAFKA_OFFSET_STORED = RD_KAFKA_OFFSET_STORED;
// 从尾部开始获取新的massage
const KAFKA_OFFSET_END = RD_KAFKA_OFFSET_END;
// 从头部获取massage
const KAFKA_OFFSET_BEGINNING = RD_KAFKA_OFFSET_BEGINNING;


/**
* KafKa-Consumer类
* @author : @喵了个咪<[email protected]>
*/
class KafKa_Consumer {

protected $topic = null;

protected $timeout = 10;

protected $partition;

/**
* KafKa-Consumer 构造函数
*
* @param string $BrokerList
* @param \RdKafka\Conf $KafKaConf
* @param \RdKafka\TopicConf $TopicConf
* @param string $Topic
*/
public function __construct($BrokerList, $KafKaConf, $TopicConf, $Topic) {

$rk = new RdKafka\Consumer($KafKaConf);

$rk->addBrokers($BrokerList);

$this->topic = $rk->newTopic($Topic, $TopicConf);


}

public function setTimeout($timeout) {
$this->timeout = $timeout;
}

/**
* 开启Consumer
*
* @param $partition
* @param int $offset
*/
public function consumerStart($partition = 0, $offset = KAFKA_OFFSET_STORED) {
$this->partition = $partition;
$this->topic->consumeStart($this->partition, $offset);
}

/**
* 关闭consumer 断开连接
*/
public function consumerStop() {
$this->topic->consumeStop($this->partition);
}


/**
* 每次获取单条Massage(多用于队列脚本)
*
* @return null|Kafka_Message
* @throws KafKa_Exception_Base
*/
public function consume() {
$message = $this->topic->consume($this->partition, $this->timeout * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
return $message;
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
return null;
break;
default:
throw new KafKa_Exception_Base($message->errstr(), $message->err);
break;
}
}

/**
* 批量获取Massage
*
* @param int $partition
* @param int $maxSize
* @param int $offset
*
* @return array
* @throws KafKa_Exception_Base
*/
public function getMassage($partition, $maxSize, $offset = KAFKA_OFFSET_STORED) {

$retList = array();

$this->consumerStart($partition, $offset);
for ($i = 0; $i < $maxSize; $i++) {
$message = $this->consume();
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$retList[] = $message;
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
break 2;
default:
throw new KafKa_Exception_Base($message->errstr(), $message->err);
break;
}
}
$this->consumerStop();
return $retList;
}

}
9 changes: 9 additions & 0 deletions Exception/Base.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

/**
* KafKa-Exception 异常类
* @author : @喵了个咪<[email protected]>
*/
class KafKa_Exception_Base extends Exception {

}
37 changes: 37 additions & 0 deletions Exception/err.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

const RD_KAFKA_RESP_ERR__BEGIN = -200;
const RD_KAFKA_RESP_ERR__BAD_MSG = -199;
const RD_KAFKA_RESP_ERR__BAD_COMPRESSION = -198;
const RD_KAFKA_RESP_ERR__DESTROY = -197;
const RD_KAFKA_RESP_ERR__FAIL = -196;
const RD_KAFKA_RESP_ERR__TRANSPORT = -195;
const RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE = -194;
const RD_KAFKA_RESP_ERR__RESOLVE = -193;
const RD_KAFKA_RESP_ERR__MSG_TIMED_OUT = -192;
const RD_KAFKA_RESP_ERR__PARTITION_EOF = -191;
const RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION = -190;
const RD_KAFKA_RESP_ERR__FS = -189;
const RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC = -188;
const RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN = -187;
const RD_KAFKA_RESP_ERR__INVALID_ARG = -186;
const RD_KAFKA_RESP_ERR__TIMED_OUT = -185;
const RD_KAFKA_RESP_ERR__QUEUE_FULL = -184;
const RD_KAFKA_RESP_ERR__ISR_INSUFF = -183;
const RD_KAFKA_RESP_ERR__END = -100;
const RD_KAFKA_RESP_ERR_UNKNOWN = -1;
const RD_KAFKA_RESP_ERR_NO_ERROR = 0;
const RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE = 1;
const RD_KAFKA_RESP_ERR_INVALID_MSG = 2;
const RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART = 3;
const RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE = 4;
const RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE = 5;
const RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION = 6;
const RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT = 7;
const RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE = 8;
const RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE = 9;
const RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE = 10;
const RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH = 11;
const RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE = 12;


24 changes: 24 additions & 0 deletions KafKa.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php
// Copyright 2016 The See-KafKa Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.

/**
* KafKa引入文件
* @author : @喵了个咪<[email protected]>
*/

include_once "Consumer.php";
include_once "Lite.php";
include_once "Producer.php";
include_once "Exception/Base.php";
Loading

0 comments on commit 5917f1b

Please sign in to comment.